Unverified Commit e34b041d authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

commit messages to kafka; ignore none vals


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent db6bc073
Pipeline #19202 passed with stages
in 1 minute and 50 seconds
......@@ -317,11 +317,15 @@ class MediametadataToDB:
record_processor.thumbnail_ok(record_id, enriched_data)
else:
record_processor.thumbnail_fail(record_id)
record_processor.index()
ok = record_processor.index()
if ok:
consumer.commit()
if counter % 1000 == 0:
logging.info('{} messages read till now'.format(counter))
# arriving here means there are no new messages to poll from
record_processor.index()
ok = record_processor.index()
if ok:
consumer.commit()
except Exception as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status)
......
......@@ -41,11 +41,12 @@ class Indexer:
"""
Create SQL statement
"""
db_values = [record.get(f) for f in fields]
db_values.extend([record.get(f) for f in fields if f != 'sig'])
db_fields = ','.join(fields)
db_value_placeholders = ', '.join(['?' for _ in fields])
key_value = ", ".join([f"{f}=?" for f in fields if f != 'sig'])
db_values = [record[f] for f in fields if f in record and record[f]]
db_values.extend([record[f] for f in fields if f != 'sig' and f in record and record[f]])
db_fields = ','.join([f for f in fields if f in record and record[f]])
db_value_placeholders = ', '.join(['?' for f in fields if f in record and record[f]])
key_value = ", ".join([f"{f}=?" for f in fields
if f != 'sig' and f in record and record[f]])
# noinspection SqlNoDataSourceInspection
return 'INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {}'.format(
table_name, db_fields, db_value_placeholders, key_value), tuple(db_values)
......@@ -64,7 +65,7 @@ class Indexer:
self.mariadb_cursor.execute(metadata_stmt, metadata_values)
return True, ""
except mariadb.Error as ex:
logging.error("Problems in sql statement: {}".format(entities_stmt))
logging.error(f"Problems in sql statement ({entities_stmt}): {ex}")
return False, str(ex)
def commit(self):
......
from mediametadatatodb_app.resources.reporter import Reporter
from mediametadatatodb_app.resources.indexer import Indexer
import logging
from mediametadatatodb_app.resources.indexer import Indexer
from mediametadatatodb_app.resources.reporter import Reporter
class RecordProcessor:
def __init__(self):
......@@ -84,7 +85,8 @@ class RecordProcessor:
for key in self.processed_records.keys():
self.reporter.send_message(key, 'FATAL', f'Indexing failed: {ex}')
def index(self):
def index(self) -> bool:
ok_all = True
for key in self.processed_records.keys():
record = self.processed_records[key]
dig_obj_msg = \
......@@ -119,7 +121,9 @@ class RecordProcessor:
dig_obj_msg, thumbnail_msg, audio_snip_msg
))
else:
ok_all = False
self.indexer.rollback()
self.reporter.send_message(key, 'FATAL',
f'Indexing failed: {err_msg}')
self.processed_records.clear()
return ok_all
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from mediametadatatodb_app.resources import indexer
......@@ -32,9 +33,9 @@ class Test(unittest.TestCase):
indexer.Indexer._create_sql_stmt('metadata', record,
['sig', 'mimetype', 'height',
'width', 'duration', 'type'])
self.assertEqual(("INSERT INTO metadata (sig,mimetype,height,width,duration,type) VALUES"
" (?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE mimetype=?, height=?,"
" width=?, duration=?, type=?",
('test-001', 'image/jpeg', 20, 100, None, 'image',
'image/jpeg', 20, 100, None, 'image')),
self.assertEqual(("INSERT INTO metadata (sig,mimetype,height,width,type) VALUES"
" (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE mimetype=?, height=?,"
" width=?, type=?",
('test-001', 'image/jpeg', 20, 100, 'image',
'image/jpeg', 20, 100, 'image')),
(metadata_stmt, metadata_values))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment