Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit ed49e425 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Remove kafka stuff as it is no longer necessary.

parent 193b087f
from kafka import KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from flask_restful import current_app
from kafka.errors import KafkaError
from import_api_app.helpers.Error import ImportApiError
import time
def list_topics():
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
raise ImportApiError(message)
return topics
# It also deletes the topics if it already exists if the
# delete_if_already_exists flag is set to true
def create_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
current_topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
raise ImportApiError(message)
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete)
current_app.logger.error(message)
raise ImportApiError(message)
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
new_topic_list = []
for topic in topics:
# todo use multiple partitions
new_topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=1))
try:
admin_client.create_topics(new_topic_list)
except KafkaError:
message = "Problem when creating new topics :" + ", ".join(topics)+" e.message()"
current_app.logger.error(message)
raise ImportApiError(message)
return {
'success': "Topics created : " + ", ".join(topics)
}
def delete_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
try:
admin_client.delete_topics(topics)
except KafkaError:
message = "Problem when deleting topics (probably they don't exist) :" + ", ".join(topics)
current_app.logger.error(message)
raise ImportApiError(message)
return {
'success': "Topics deleted : " + ", ".join(topics)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment