Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit 527a3795 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

update reporting

will now report success per message and filtered errors.
parent 566ba3fb
Pipeline #7592 failed with stages
in 1 minute and 28 seconds
......@@ -20,11 +20,13 @@ package org.memobase
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon
import com.beust.klaxon.json
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.dsl.csvWriter
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import java.io.ByteArrayOutputStream
import java.io.File
......@@ -34,10 +36,7 @@ import java.nio.charset.Charset
class KafkaTopology(private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("KafkaTopology")
private val producer = Producer(settings.kafkaProducerSettings, "${settings.outputTopic}-reporting")
private val sftpClient = SftpClient(settings.sftpSettings)
private val csvHeaderCount = settings.appSettings.getProperty("app.csv.header.count").toInt()
private val csvUseHeaderProperties = settings.appSettings.getProperty("app.csv.header.line")!!.toBoolean()
private val csvUsedHeaderIndex = settings.appSettings.getProperty("app.csv.header.index").toInt()
......@@ -46,17 +45,35 @@ class KafkaTopology(private val settings: SettingsLoader
fun build(): Topology {
val builder = StreamsBuilder()
val source = builder
val branchedSource = builder
.stream<String, String>(settings.inputTopic)
.filter { _, value -> filer(value) }
.branch(
Predicate { _, value -> filer(value) },
Predicate { _, _ -> true }
)
val processedValue = branchedSource[0]
.mapValues { _, value -> mapValues(value) }
.flatMap { _, value -> parser(value) }
source.to(settings.outputTopic)
.flatMapValues { _, value -> parser(value) }
// report filtered error message from previous job.
branchedSource[1]
.mapValues { key, _ -> reportToJson(Report(key, "Failure", "Ignored message as file validator reported error.", 0)) }
.to("${settings.outputTopic}-reporting")
// data branch
processedValue
.map { _, value -> value.first }
.to(settings.outputTopic)
// reporting branch
processedValue
.mapValues { _, value -> reportToJson(value.second) }
.to("${settings.outputTopic}-reporting")
return builder.build()
}
private fun filer(value: String): Boolean {
return !value.contains("ERROR")
}
......@@ -65,8 +82,8 @@ class KafkaTopology(private val settings: SettingsLoader
return Klaxon().parseJsonObject(StringReader(value))
}
private fun parser(value: JsonObject): List<KeyValue<String, String>> {
val resultMessages = mutableListOf<KeyValue<String, String>>()
private fun parser(value: JsonObject): List<Pair<KeyValue<String, String>, Report>> {
val resultMessages = mutableListOf<Pair<KeyValue<String, String>, Report>>()
sftpClient.open(File(value["path"] as String)).use {
val reader = csvReader().readAll(it.RemoteFileInputStream())
var headerProperties = emptyList<String>()
......@@ -76,6 +93,7 @@ class KafkaTopology(private val settings: SettingsLoader
if (count <= csvHeaderCount) {
if (csvUseHeaderProperties) {
if (count == csvUsedHeaderIndex) {
// TODO: check if there is a property name for each column!
headerProperties = line
}
}
......@@ -94,11 +112,21 @@ class KafkaTopology(private val settings: SettingsLoader
writeRow(line)
}
// TODO: check if there is an identifier for each row and that each value is unique!
val identifier = line[csvIdentifierIndex]
val result = KeyValue(identifier, output.toString(Charset.defaultCharset()).trim())
val data = output.toString(Charset.defaultCharset()).trim()
val message = json { obj(Pair("format", "CSV"), Pair("data", data)) }
val result = KeyValue(identifier, message.toJsonString())
output.reset()
resultMessages.add(result)
val report = Report(identifier, "SUCCESS","Successfully created record with identifier $identifier with format CSV!", 1)
resultMessages.add(Pair(result, report))
} }
return resultMessages
}
private fun reportToJson(value: Report): String {
return Klaxon().toJsonString(value)
}
}
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon
import java.io.Closeable
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
class Producer(props: Properties, private val topic: String) : Closeable {
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
private fun send(topic: String, key: String, message: String) {
instance.send(ProducerRecord(topic, key, message))
}
fun sendMessage(key: String, message: String, format: String) {
send(topic, key, JsonObject(mapOf(Pair("path", message), Pair("format", format))).toJsonString())
}
fun sendReport(report: Report) {
send(reportingTopic, report.id, Klaxon().toJsonString(report))
}
override fun close() {
instance.flush()
// TODO: Add report for messages sent count.
instance.close()
}
}
......@@ -18,10 +18,29 @@
package org.memobase
import java.time.Instant
data class Report(
val id: String,
val status: String,
val path: String,
val format: String,
val error: String
)
val message: String,
val recordCount: Int,
val timeStamp: String = Instant.now().toString()
) {
override fun equals(other: Any?): Boolean {
return when (other) {
null -> false
!is Report -> false
else -> hashCode() == other.hashCode()
}
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + status.hashCode()
result = 31 * result + message.hashCode()
result = 31 * result + recordCount.hashCode()
return result
}
}
......@@ -17,6 +17,7 @@
*/
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
......@@ -61,7 +62,20 @@ class Tests {
StringDeserializer(),
StringDeserializer()
)
assertNull(producerRecord)
val reportingRecord = testDriver.readOutput(
"${settingsLoader.outputTopic}-reporting",
StringDeserializer(),
StringDeserializer())
val report = Klaxon().parse<Report>(reportingRecord.value())
val expectedReport = Klaxon().parse<Report>(readFile("error_filter_report_output.json"))
assertAll("compare report",
{ assertNull(producerRecord) },
{ assertEquals("brandt_metadaten.csv", reportingRecord.key()) },
{ assertEquals(expectedReport, report) }
)
}
@Test
......@@ -85,7 +99,7 @@ class Tests {
assertNotNull(record)
assertAll("test csv file import",
{ assertEquals("AVGR13716", record.key()) },
{ assertEquals(readFile("csv_file_import_1_output.csv"), record.value()) }
{ assertEquals(readFile("csv_file_import_with_header_line_output.csv"), record.value()) }
)
}
......@@ -110,7 +124,7 @@ class Tests {
assertNotNull(record)
assertAll("test csv file import",
{ assertEquals("AVGR13716", record.key()) },
{ assertEquals(readFile("csv_file_import_2_output.csv"), record.value()) }
{ assertEquals(readFile("csv_file_import_without_header_line_output.csv"), record.value()) }
)
}
}
ID Original ID,,Inhalt Haupttitel,Inhalt Beschreibung Bemerkung,Inhalt Beschreibung,Kontext Erstellung,Kontext AutorIn,AutorIn,Inhalt Genre,Inhalt Schlagworte,Kontext Aufnahmeort,Kontext Beschreibung Bemerkung,System Information Streaming,,System Information Dokumenttyp,Technische Informationen Trägerformat (des Originals),Technische Informationen Tonaufnahmeverfahren ,Technische Informationen Film Farbe,Technische Informationen Dauer,Technische Informationen Film Bemerkung
AVGR13716,https://www.gr.ch/Exemplare/13716,Wintersport in Arosa,,"Pferderennen am Obersee bei strahlendem Sonnenschein, viel Publikum, Gedränge vor Wettbüro, Reiter in Armeeuniform, Fotografen, Skijöring – Eisfest mit kostümierten Teilnehmer/innen vor Hotel Altein bei Nacht – Pferderennen am Obersee – Eiskunstlauf – Pferderennen, diesmal winterlicher – Schanzenspringen im Skigelände und viel Volk um die Alpgebäude Carmenna – Skifahrer im Aufstieg, Winterwanderer und nochmals Sprünge auf der Schneeschanze, Gruppe Skifahrer in wilder Schussfahrt, Wartende um die Hütten – Eishockey-Match – Impressionen von einem Abfahrtsrennen und Rundsicht über Arosa und Umgebung","1920, 1920-1929, genaues Datum nicht eruierbar","Brandt, Carl",Autor/in,Dokumentarfilm; Amateurfilm,Pferdesport; Ski alpin; Skispringen; Eishochey; Tourismus,Arosa,Eisfest: teilweise identische Aufnahmen in AVGR12097 „Ankunft David Zogg“ ; Schanzenspringen auf Carmenna: teilweise identische Aufnahmen in AVGR12115 „Touristen auf dem Tschuggen“,https://s3-eu-west-1.amazonaws.com/streaming.av-portal.gr.ch/13716/AVGR13716.mov,Intranet,Film,"35-mm-Film, Negativ und Positiv, Nitrat",stumm,s/w getönt,0:17:02,"Vorhandene Elemente: AVGR9942: Negativ, Nitrat (CS, Z 986-172.8); AVGR9943: Positiv Nitrat (CS, Z 986-172.7); AVGR12098: Interpositiv / Marron 2366, Kopie 2016 (KBG); AVGR13715: Internegativ 2234, Kopie 2016 (KBG); AVGR13716: Positivkopie Farbe 2383, Kopie 2016, eingefärbte Sequenzen (KBG)"
\ No newline at end of file
AVGR13716,https://www.gr.ch/Exemplare/13716,Wintersport in Arosa,,"Pferderennen am Obersee bei strahlendem Sonnenschein, viel Publikum, Gedränge vor Wettbüro, Reiter in Armeeuniform, Fotografen, Skijöring – Eisfest mit kostümierten Teilnehmer/innen vor Hotel Altein bei Nacht – Pferderennen am Obersee – Eiskunstlauf – Pferderennen, diesmal winterlicher – Schanzenspringen im Skigelände und viel Volk um die Alpgebäude Carmenna – Skifahrer im Aufstieg, Winterwanderer und nochmals Sprünge auf der Schneeschanze, Gruppe Skifahrer in wilder Schussfahrt, Wartende um die Hütten – Eishockey-Match – Impressionen von einem Abfahrtsrennen und Rundsicht über Arosa und Umgebung","1920, 1920-1929, genaues Datum nicht eruierbar","Brandt, Carl",Autor/in,Dokumentarfilm; Amateurfilm,Pferdesport; Ski alpin; Skispringen; Eishochey; Tourismus,Arosa,Eisfest: teilweise identische Aufnahmen in AVGR12097 „Ankunft David Zogg“ ; Schanzenspringen auf Carmenna: teilweise identische Aufnahmen in AVGR12115 „Touristen auf dem Tschuggen“,https://s3-eu-west-1.amazonaws.com/streaming.av-portal.gr.ch/13716/AVGR13716.mov,Intranet,Film,"35-mm-Film, Negativ und Positiv, Nitrat",stumm,s/w getönt,0:17:02,"Vorhandene Elemente: AVGR9942: Negativ, Nitrat (CS, Z 986-172.8); AVGR9943: Positiv Nitrat (CS, Z 986-172.7); AVGR12098: Interpositiv / Marron 2366, Kopie 2016 (KBG); AVGR13715: Internegativ 2234, Kopie 2016 (KBG); AVGR13716: Positivkopie Farbe 2383, Kopie 2016, eingefärbte Sequenzen (KBG)"
\ No newline at end of file
{"format":"CSV","data":"ID Original ID,,Inhalt Haupttitel,Inhalt Beschreibung Bemerkung,Inhalt Beschreibung,Kontext Erstellung,Kontext AutorIn,AutorIn,Inhalt Genre,Inhalt Schlagworte,Kontext Aufnahmeort,Kontext Beschreibung Bemerkung,System Information Streaming,,System Information Dokumenttyp,Technische Informationen Trägerformat (des Originals),Technische Informationen Tonaufnahmeverfahren ,Technische Informationen Film Farbe,Technische Informationen Dauer,Technische Informationen Film Bemerkung\nAVGR13716,https://www.gr.ch/Exemplare/13716,Wintersport in Arosa,,\"Pferderennen am Obersee bei strahlendem Sonnenschein, viel Publikum, Gedränge vor Wettbüro, Reiter in Armeeuniform, Fotografen, Skijöring \u2013 Eisfest mit kostümierten Teilnehmer/innen vor Hotel Altein bei Nacht \u2013 Pferderennen am Obersee \u2013 Eiskunstlauf \u2013 Pferderennen, diesmal winterlicher \u2013 Schanzenspringen im Skigelände und viel Volk um die Alpgebäude Carmenna \u2013 Skifahrer im Aufstieg, Winterwanderer und nochmals Sprünge auf der Schneeschanze, Gruppe Skifahrer in wilder Schussfahrt, Wartende um die Hütten \u2013 Eishockey-Match \u2013 Impressionen von einem Abfahrtsrennen und Rundsicht über Arosa und Umgebung\",\"1920, 1920-1929, genaues Datum nicht eruierbar\",\"Brandt, Carl\",Autor/in,Dokumentarfilm; Amateurfilm,Pferdesport; Ski alpin; Skispringen; Eishochey; Tourismus,Arosa,Eisfest: teilweise identische Aufnahmen in AVGR12097 \u201eAnkunft David Zogg\u201c ; Schanzenspringen auf Carmenna: teilweise identische Aufnahmen in AVGR12115 \u201eTouristen auf dem Tschuggen\u201c,https://s3-eu-west-1.amazonaws.com/streaming.av-portal.gr.ch/13716/AVGR13716.mov,Intranet,Film,\"35-mm-Film, Negativ und Positiv, Nitrat\",stumm,s/w getönt,0:17:02,\"Vorhandene Elemente: AVGR9942: Negativ, Nitrat (CS, Z 986-172.8); AVGR9943: Positiv Nitrat (CS, Z 986-172.7); AVGR12098: Interpositiv / Marron 2366, Kopie 2016 (KBG); AVGR13715: Internegativ 2234, Kopie 2016 (KBG); AVGR13716: Positivkopie Farbe 2383, Kopie 2016, eingefärbte Sequenzen (KBG)\""}
\ No newline at end of file
{"format":"CSV","data":"AVGR13716,https://www.gr.ch/Exemplare/13716,Wintersport in Arosa,,\"Pferderennen am Obersee bei strahlendem Sonnenschein, viel Publikum, Gedränge vor Wettbüro, Reiter in Armeeuniform, Fotografen, Skijöring \u2013 Eisfest mit kostümierten Teilnehmer/innen vor Hotel Altein bei Nacht \u2013 Pferderennen am Obersee \u2013 Eiskunstlauf \u2013 Pferderennen, diesmal winterlicher \u2013 Schanzenspringen im Skigelände und viel Volk um die Alpgebäude Carmenna \u2013 Skifahrer im Aufstieg, Winterwanderer und nochmals Sprünge auf der Schneeschanze, Gruppe Skifahrer in wilder Schussfahrt, Wartende um die Hütten \u2013 Eishockey-Match \u2013 Impressionen von einem Abfahrtsrennen und Rundsicht über Arosa und Umgebung\",\"1920, 1920-1929, genaues Datum nicht eruierbar\",\"Brandt, Carl\",Autor/in,Dokumentarfilm; Amateurfilm,Pferdesport; Ski alpin; Skispringen; Eishochey; Tourismus,Arosa,Eisfest: teilweise identische Aufnahmen in AVGR12097 \u201eAnkunft David Zogg\u201c ; Schanzenspringen auf Carmenna: teilweise identische Aufnahmen in AVGR12115 \u201eTouristen auf dem Tschuggen\u201c,https://s3-eu-west-1.amazonaws.com/streaming.av-portal.gr.ch/13716/AVGR13716.mov,Intranet,Film,\"35-mm-Film, Negativ und Positiv, Nitrat\",stumm,s/w getönt,0:17:02,\"Vorhandene Elemente: AVGR9942: Negativ, Nitrat (CS, Z 986-172.8); AVGR9943: Positiv Nitrat (CS, Z 986-172.7); AVGR12098: Interpositiv / Marron 2366, Kopie 2016 (KBG); AVGR13715: Internegativ 2234, Kopie 2016 (KBG); AVGR13716: Positivkopie Farbe 2383, Kopie 2016, eingefärbte Sequenzen (KBG)\""}
\ No newline at end of file
{"recordCount": 0, "id" : "brandt_metadaten.csv", "message" : "Ignored message as file validator reported error.", "status" : "Failure", "timeStamp" : "2020-04-14T06:54:47.414175Z"}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment