Commit 15b2ca4d authored by Lionel Walter's avatar Lionel Walter
Browse files

Add delete kafka topics and tests

parent 5853e9f1
Pipeline #10006 failed with stages
in 5 minutes and 21 seconds
......@@ -18,7 +18,14 @@ class KafkaTopics(Resource):
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
topics = consumer.topics()
try:
topics = consumer.topics()
except BaseException:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
return {'error': message}, 500
return {
"topics": list(topics)
}
......@@ -93,3 +100,43 @@ class KafkaTopics(Resource):
return {
'success': "Topics created : "+", ".join(topics)
}
def delete(self):
"""
Delete the Topics from the topics parameter.
---
tags:
- Kafka Topics
parameters:
- in: body
name: body
schema:
properties:
topics:
type: array
example: [test1, test2]
responses:
200:
description: Topic list
"""
parser = reqparse.RequestParser()
parser.add_argument('topics', action='append')
args = parser.parse_args()
topics = args['topics']
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
try:
admin_client.delete_topics(topics)
except BaseException:
message = "Problem when deleting topics (probably they don't exist) :"+", ".join(topics)
current_app.logger.error(message)
return {'error': message}, 500
return {
'success': "Topics deleted : "+", ".join(topics)
}
......@@ -67,3 +67,59 @@ def test_job_stop(client):
response = client.get('/v1/job/789/text-file-validation')
assert response.status_code == 404
def test_kafka_topics(client):
response = client.get('/v1/kafka/topics')
assert response.status_code == 200
topics = response.get_json()["topics"]
number_of_topics_before = len(topics)
response = client.post('/v1/kafka/topics', json={
"topics": [
"py-test1",
"py-test2"
]
})
assert response.status_code == 200
response = client.get('/v1/kafka/topics')
assert response.status_code == 200
topics = response.get_json()["topics"]
assert len(topics) == number_of_topics_before + 2
# recreate the topics
response = client.post('/v1/kafka/topics', json={
"topics": [
"py-test1",
"py-test2",
"py-test3"
]
})
assert response.status_code == 200
response = client.get('/v1/kafka/topics')
assert response.status_code == 200
topics = response.get_json()["topics"]
# only one new topic created
assert len(topics) == number_of_topics_before + 3
# remove topics
response = client.delete('/v1/kafka/topics', json={
"topics": [
"py-test1",
"py-test2",
"py-test3"
]
})
assert response.status_code == 200
assert "py-test1" in topics
assert "py-test2" in topics
assert "py-test3" in topics
response = client.get('/v1/kafka/topics')
assert response.status_code == 200
topics = response.get_json()["topics"]
assert len(topics) == number_of_topics_before
assert "py-test1" not in topics
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment