Unverified Commit b245e7ce authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

write transactions to dedicated topics

parent 13b484a4
......@@ -6,3 +6,6 @@ metadata:
data:
TOPIC_OUT: "{{ .Values.outputTopic }}"
TOPIC_PROCESS: "{{ .Values.reportingTopic }}"
TOPIC_TRANSACTIONS_RECORDS: "{{ .Values.recordsTransactionsTopic }}"
TOPIC_TRANSACTIONS_RECORD_SETS: "{{ .Values.recordSetsTransactionsTopic }}"
TOPIC_TRANSACTIONS_INSTITUTIONS: "{{ .Values.institutionsTransactionsTopic }}"
......@@ -11,4 +11,7 @@ deploymentName: pp-import-process-bridge
kafkaConfigs: prod-kafka-bootstrap-servers
outputTopic: fedora-output-json
inputTopic: import-process-ingest
reportingTopic: import-process-reporting
\ No newline at end of file
reportingTopic: import-process-reporting
recordsTransactionsTopic: import-process-transactions-records
recordSetsTransactionsTopic: import-process-transactions-record-sets
institutionsTransactionsTopic: import-process-transactions-institutions
\ No newline at end of file
......@@ -37,6 +37,7 @@ import org.apache.logging.log4j.LogManager
import org.memobase.model.MemobaseModel
import org.memobase.model.ProcessResult
import org.memobase.model.ProcessStatus
import org.memobase.model.Transaction
class KafkaTopology(
private val settings: SettingsLoader
......@@ -71,12 +72,24 @@ class KafkaTopology(
outputData[0]
.to(settings.outputTopic + "-records")
outputData[0]
.mapValues { key, _ -> writeTransaction(key) }
.to(settings.appSettings.getProperty("topicTransactionsRecords"))
outputData[1]
.to(settings.outputTopic + "-record-sets")
outputData[1]
.mapValues { key, _ -> writeTransaction(key) }
.to(settings.appSettings.getProperty("topicTransactionsRecordSets"))
outputData[2]
.to(settings.outputTopic + "-institutions")
outputData[2]
.mapValues { key, _ -> writeTransaction(key) }
.to(settings.appSettings.getProperty("topicTransactionsInstitutions"))
processOutcome[0]
.mapValues { key, value -> writeReport(key, value) }
.to(settings.processReportTopic)
......@@ -126,4 +139,8 @@ class KafkaTopology(
private fun writeReport(id: String, input: ProcessResult<String>): String {
return Report(id, input.status.toString(), input.message, "import-process-bridge").toJson()
}
private fun writeTransaction(id: String): String {
return Transaction(id).toJson()
}
}
package org.memobase.model
import java.io.StringWriter
import org.apache.jena.graph.Factory
import org.apache.jena.rdf.model.impl.ModelCom
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import java.io.StringWriter
class MemobaseModel : ModelCom(Factory.createGraphMem()) {
override fun toString(): String {
......
package org.memobase.model
import java.time.LocalDateTime
data class Transaction(
val id: String,
val action: String = "CREATE",
val timestamp: String = LocalDateTime.now().toString()
) {
fun toJson(): String {
return "{\"id\": \"${id.replace("[\n\"]", "")}\", \"action\": \"$action\", \"timestamp\": \"$timestamp\"}"
}
}
kafka:
app:
topicTransactionsRecords: ${TOPIC_TRANSACTIONS_RECORDS}
topicTransactionsRecordSets: ${TOPIC_TRANSACTIONS_RECORD_SETS}
topicTransactionsInstitutions: ${TOPIC_TRANSACTIONS_INSTITUTIONS}
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
application.id: ${APPLICATION_ID:?system}
......
import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
import java.io.File
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.memobase.model.MemobaseModel
import java.io.File
import java.io.StringWriter
import java.nio.charset.StandardCharsets
internal class KafkaTopologyTest {
@Test
......@@ -17,5 +13,4 @@ internal class KafkaTopologyTest {
RDFDataMgr.read(model, input, Lang.NT)
assert(model.toString().contains("isPublished\":true"))
}
}
\ No newline at end of file
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment