Commit a71ae647 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Improves logging

parent 1e5f7363
Pipeline #9933 passed with stages
in 5 minutes and 29 seconds
......@@ -25,6 +25,7 @@ import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager
import org.memobase.builder.ResourceBuilder
import org.memobase.mapping.MappingConfig
import org.memobase.mapping.exceptions.InvalidMappingException
import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader
......@@ -38,7 +39,7 @@ class KafkaTopology(private val settings: SettingsLoader
val stream = builder.stream<String, String>(settings.inputTopic)
stream.flatMapValues { value -> parseJsonFile(value) }
.mapValues { value -> buildResources(value) }
.flatMapValues { value -> buildResources(value) }
.flatMap { _, value -> writeResource(value) }
.to(settings.outputTopic)
return builder.build()
......@@ -55,17 +56,22 @@ class KafkaTopology(private val settings: SettingsLoader
}
}
private fun buildResources(value: Map<String, String>): ResourceBuilder {
return ResourceBuilder(
config.uriField,
config.recordType,
config.recordFieldMappers,
config.physicalObjectFieldMappers,
config.digitalObjectFieldMappers,
value,
settings.appSettings.getProperty("institutionId"),
settings.appSettings.getProperty("recordSetId")
)
private fun buildResources(value: Map<String, String>): List<ResourceBuilder> {
return try {
listOf(ResourceBuilder(
config.uriField,
config.recordType,
config.recordFieldMappers,
config.physicalObjectFieldMappers,
config.digitalObjectFieldMappers,
value,
settings.appSettings.getProperty("institutionId"),
settings.appSettings.getProperty("recordSetId")
))
} catch (ex: InvalidMappingException) {
log.error(ex.localizedMessage)
emptyList()
}
}
private fun writeResource(value: ResourceBuilder): List<KeyValue<String, String>> {
......
......@@ -22,7 +22,6 @@ import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.memobase.mapping.exceptions.InvalidMappingException
import org.memobase.mapping.mappers.IFieldMapper
import org.memobase.rdf.EBUCORE
import java.io.StringWriter
class ResourceBuilder(
......@@ -40,7 +39,7 @@ class ResourceBuilder(
val recordId = if (source.containsKey(recordIdField)) {
source[recordIdField] as String
} else {
throw InvalidMappingException("No id for record found in field '$recordIdField'.")
throw InvalidMappingException("No id for record found in field '$recordIdField' for source $source.")
}
val record =
Record(recordId, recordType, recordSetId, institutionId)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment