Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit eb10d165 authored by Lionel Walter's avatar Lionel Walter
Browse files

Refactor KafkaTopics

parent 6f032471
from kafka import KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from flask_restful import current_app
from kafka.errors import KafkaError
from import_api_app.models.Error import ImportApiError
import time
def list_topics():
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
raise ImportApiError(message)
return topics
# It also deletes the topics if it already exists
def create_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
current_topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
raise ImportApiError(message)
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete)
current_app.logger.error(message)
raise ImportApiError(message)
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
new_topic_list = []
for topic in topics:
# todo use multiple partitions
new_topic_list.append(NewTopic(name=topic, num_partitions=1, replication_factor=1))
try:
admin_client.create_topics(new_topic_list)
except KafkaError:
message = "Problem when creating new topics :" + ", ".join(topics)
current_app.logger.error(message)
return {'error': message}, 500
return {
'success': "Topics created : " + ", ".join(topics)
}
def delete_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
try:
admin_client.delete_topics(topics)
except KafkaError:
message = "Problem when deleting topics (probably they don't exist) :" + ", ".join(topics)
current_app.logger.error(message)
raise ImportApiError(message)
return {
'success': "Topics deleted : " + ", ".join(topics)
}
from flask_restful import Resource, current_app, reqparse
from kafka import KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
import time
from kafka.errors import KafkaError
from flask_restful import Resource, reqparse
from import_api_app.models.Error import ImportApiError
from import_api_app.models.KafkaTopics import create_topics, list_topics, delete_topics
class KafkaTopics(Resource):
......@@ -16,16 +15,10 @@ class KafkaTopics(Resource):
200:
description: Topic list
"""
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
return {'error': message}, 500
topics = list_topics()
except ImportApiError as e:
return {'error': e.message}, 500
return {
"topics": list(topics)
......@@ -56,53 +49,10 @@ class KafkaTopics(Resource):
args = parser.parse_args()
topics = args['topics']
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
current_topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
return {'error': message}, 500
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :"+", ".join(topic_to_delete)
current_app.logger.error(message)
return {'error': message}, 500
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
new_topic_list = []
for topic in topics:
# todo use multiple partitions
new_topic_list.append(NewTopic(name=topic, num_partitions=1, replication_factor=1))
try:
admin_client.create_topics(new_topic_list)
except KafkaError:
message = "Problem when creating new topics :"+", ".join(topics)
current_app.logger.error(message)
return {'error': message}, 500
return {
'success': "Topics created : "+", ".join(topics)
}
return create_topics(topics)
except ImportApiError as e:
return {'error': e.message}, 500
def delete(self):
"""
......@@ -122,24 +72,12 @@ class KafkaTopics(Resource):
200:
description: Topic list
"""
parser = reqparse.RequestParser()
parser.add_argument('topics', action='append')
args = parser.parse_args()
topics = args['topics']
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
try:
admin_client.delete_topics(topics)
except KafkaError:
message = "Problem when deleting topics (probably they don't exist) :"+", ".join(topics)
current_app.logger.error(message)
return {'error': message}, 500
return {
'success': "Topics deleted : "+", ".join(topics)
}
return delete_topics(topics)
except ImportApiError as e:
return {'error': e.message}, 500
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment