Commit 9d238661 authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

WIP: Add service implementation

parent b71de6b9
......@@ -44,7 +44,7 @@ dependencies {
implementation 'org.fcrepo.client:fcrepo-java-client:0.4.0'
implementation 'org.memobase:memobase-service-utilities:1.2.1'
implementation 'org.memobase:memobase-service-utilities:1.4.1'
implementation 'org.memobase:fedora-client:0.2.1'
// KOTLIN IMPORTS
......@@ -52,9 +52,10 @@ dependencies {
implementation "org.jetbrains.kotlin:kotlin-script-runtime:1.3.71"
implementation "org.jetbrains.kotlin:kotlin-reflect:1.3.71"
// JUNIT
testCompile("org.junit.jupiter:junit-jupiter:5.4.2")
// TESTS
testImplementation 'org.assertj:assertj-core:3.15.0'
testImplementation 'org.mockito:mockito-core:2.7.22'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.6.0'
}
compileKotlin {
......
......@@ -26,7 +26,7 @@ class App {
private val log = LogManager.getLogger("FedorIngestApp")
@JvmStatic fun main(args: Array<String>) {
try {
val service = Service()
val service = Service.create()
service.run()
} catch (ex: Exception) {
ex.printStackTrace()
......
package org.memobase
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.sftp.SftpClient
import java.net.URI
class Ingest (
private val sftpClient: SftpClient,
private val fedoraClient: FedoraClient) {
private val log = LogManager.getLogger("IngestService")
fun ingest(id: String): Report {
log.info("Ingest record $id.")
return try {
// fedoraClient.createOrUpdateRdfResource()
throw FcrepoOperationFailedException(
URI("http://mb-fed1.memobase.unibas.ch:8080/fcrepo/rest"), 501, "Not Implemented."
)
Report(
id = id,
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(id)
)
} catch (e: FcrepoOperationFailedException) {
Report(
id = id,
status = org.memobase.ReportStatus.failure,
message = org.memobase.ReportMessages.ingestFailed(id)
)
}
}
}
\ No newline at end of file
/*
* text-file-validation
* fedora-ingest
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
......@@ -18,31 +18,112 @@
package org.memobase
import java.io.File
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClientImpl
// import org.bouncycastle.cms.RecipientId.password
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import org.memobase.fedora.*
import java.io.File
import java.net.URI
import java.util.*
// import java.util.*
class Service(fileName: String = "app.yml") {
private val log: Logger = LogManager.getLogger("FedoraIngestService")
val settings =
SettingsLoader(
// TODO: Add app configs
listOf("directory"),
listOf(
"fedora.internalBaseUrl",
"fedora.externalBaseUrl",
"fedora.username",
"fedora.password"),
fileName,
useProducerConfig = true,
useConsumerConfig = true,
readSftpSettings = true
)
private val consumer = Consumer(settings.kafkaConsumerSettings, settings.inputTopic)
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val ingester = Ingester(
SftpClient(settings.sftpSettings),
createFedoraClient(settings.appSettings))
companion object {
const val JOB_REPORT_ID = "fedora-ingest"
fun createFedoraClient(appSettings: Properties): FedoraClient {
val settingsPrefix = "fedora."
return FedoraClientImpl.builder()
.credentials(appSettings.getProperty(settingsPrefix + "username"),
appSettings.getProperty(settingsPrefix + "password"))
.urls(appSettings.getProperty(settingsPrefix + "internalBaseUrl"),
appSettings.getProperty(settingsPrefix + "password"))
.build()
}
}
private val sftpClient = SftpClient(settings.sftpSettings)
fun run() {
consumer.use { consumer ->
producer.use { producer ->
log.info("Connected to Kafka.")
val reports = mutableListOf<Report>()
while (true) {
val recordsToIngest = consumer.fetchRecords()
for (record in recordsToIngest) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
reports.add(ingestReport)
}
val failures = reports.count { report -> report.status == ReportStatus.failure }
if (failures > 0) {
log.warn("Ingest ended with $failures failures!")
producer.sendJobReport(
Report(JOB_REPORT_ID,
status = ReportStatus.failure,
message = ReportMessages.processFailure(failures, reports.size)),
settings.processReportTopic
)
} else {
log.info("Ingest was successful!")
producer.sendJobReport(
Report(JOB_REPORT_ID,
status = ReportStatus.success,
message = ReportMessages.processSuccess(reports.size)),
settings.processReportTopic
)
}
}
}
}
}
// TODO: Implement logic
private fun processRecord(record: ConsumerRecord<String, String>): Report {
return try {
ingester.ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (e: FcrepoOperationFailedException) {
Report(
id = record.key(),
status = org.memobase.ReportStatus.failure,
message = org.memobase.ReportMessages.ingestFailed(record.key())
)
}
}
}
/*
* text-file-validation
* fedora-ingest
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
......@@ -23,4 +23,22 @@ object ReportStatus {
const val failure = "FAILURE"
}
object ReportMessages {
fun processFailure(failures: Int, total: Int): String {
return "Failed to ingest $failures of $total files."
}
fun processSuccess(total: Int): String {
return "Successfully ingested $total files."
}
fun ingestedRecord(id: String): String {
return "Ingested record $id."
}
fun ingestFailed(id: String): String {
return "Failed to ingest record $id."
}
}
/*
* fedora-ingest-service
* Copyright (C) 2019 Memobase
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.fedora.FedoraClientImpl
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import org.memobase.testing.EmbeddedKafkaExtension
import org.memobase.testing.EmbeddedSftpServer
import java.io.FileInputStream
import java.time.Duration
import java.util.*
import java.util.stream.Stream
@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ServiceTest {
private val log = LogManager.getLogger("LocalTestsLogger")
private val sftpServer = EmbeddedSftpServer(22000, "user", "password")
private val adminClient =
AdminClient.create(mapOf(Pair(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")))
init {
// TODO: setup embedded sftpServer with test data
sftpServer.createDirectories(
"/memobase/test_institution_1/test_record_set_1/"
)
sftpServer.putFile(
"/memobase/test_institution_1/test_record_set_1/BAZ-MEI_77252-1.txt",
FileInputStream("src/test/resources/data/binary.txt")
)
}
private val inputProducer: KafkaProducer<String, String>
private val reportConsumer: KafkaConsumer<String, String>
init {
val producerProps = Properties()
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "test-group-1")
// FIXME:
// producerProps.setProperty(ProducerConfig.GROUP_ID_CONFIG, "test-group-1")
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.qualifiedName)
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.qualifiedName)
// FIXME:
// producerProps.setProperty(ProducerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
inputProducer = KafkaProducer(producerProps)
val consumerProps = Properties()
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test-group-1")
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group-1")
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
reportConsumer = KafkaConsumer(consumerProps)
reportConsumer.subscribe(listOf("ingest-p1-j1", "ingest-p1-j1-reporting", "p1-reporting"))
}
// TODO: Change to correct test
@ParameterizedTest
@MethodSource("kafkaTests")
fun testRun(params: TestParams) {
val service = Service(params.configFile)
service.run()
val topic = service.settings.outputTopic
val reportingTopic = service.settings.outputTopic + "-reporting"
val processReportingTopic = service.settings.processReportTopic
val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
var result = reportConsumer.poll(Duration.ofMillis(10))
while (totalConsumerRecords.size != 3) {
if (result.count() > 0) {
totalConsumerRecords.addAll(result.asIterable())
}
log.error(result.count())
result = reportConsumer.poll(Duration.ofMillis(10))
}
assertThat(totalConsumerRecords.find { value -> value.topic() == topic })
.describedAs("Message Test")
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedValue)
assertThat(totalConsumerRecords.find { value -> value.topic() == reportingTopic })
.describedAs("Report Test")
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedReportValue)
assertThat(totalConsumerRecords.find { value -> value.topic() == processReportingTopic })
.describedAs("Process Report Test")
.hasFieldOrPropertyWithValue("key", params.expectedProcessReport.id)
.hasFieldOrPropertyWithValue("value", Klaxon().toJsonString(params.expectedProcessReport))
}
private fun kafkaTests() = Stream.of(
TestParams(
"test1.yml",
expectedKey = "MEI_49884",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_1/test_record_set_1/brandt.csv\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "brandt.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_1/test_record_set_1/brandt.csv with format CSV."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
)
)
}
......@@ -94,63 +94,63 @@ class Tests {
// TODO: Change to correct test
@ParameterizedTest
@MethodSource("directoryReaderTests")
fun `read valid csv from sftp`(params: TestParams) {
val service = Service(params.configFile)
service.run()
val topic = service.settings.outputTopic
val reportingTopic = service.settings.outputTopic + "-reporting"
val processReportingTopic = service.settings.processReportTopic
val settings = SettingsLoader(
listOf(
"institutionId",
"recordSetId",
"configs"
),
fileName = "kafkaTest${params.count}.yml",
useStreamsConfig = true
)
val testDriver = TopologyTestDriver(KafkaTopology(settings).build(), settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
// TODO: Iterate over mesages from test resources and send them to kafka
testDriver.pipeInput(
factory.create(
settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json")
)
)
val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
var result = consumer.poll(Duration.ofMillis(10))
while (totalConsumerRecords.size != 3) {
if (result.count() > 0) {
totalConsumerRecords.addAll(result.asIterable())
}
log.error(result.count())
result = consumer.poll(Duration.ofMillis(10))
}
assertThat(totalConsumerRecords.find { value -> value.topic() == topic })
.describedAs("Message Test")
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedValue)
assertThat(totalConsumerRecords.find { value -> value.topic() == reportingTopic })
.describedAs("Report Test")
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedReportValue)
assertThat(totalConsumerRecords.find { value -> value.topic() == processReportingTopic })
.describedAs("Process Report Test")
.hasFieldOrPropertyWithValue("key", params.expectedProcessReport.id)
.hasFieldOrPropertyWithValue("value", Klaxon().toJsonString(params.expectedProcessReport))
}
// @ParameterizedTest
// @MethodSource("directoryReaderTests")
// fun `read valid csv from sftp`(params: TestParams) {
// val service = Service(params.configFile)
// service.run()
//
// val topic = service.settings.outputTopic
// val reportingTopic = service.settings.outputTopic + "-reporting"
// val processReportingTopic = service.settings.processReportTopic
//
// val settings = SettingsLoader(
// listOf(
// "institutionId",
// "recordSetId",
// "configs"
// ),
// fileName = "kafkaTest${params.count}.yml",
// useStreamsConfig = true
// )
// val testDriver = TopologyTestDriver(KafkaTopology(settings).build(), settings.kafkaStreamsSettings)
// val factory = ConsumerRecordFactory(
// StringSerializer(), StringSerializer()
// )
//
// // TODO: Iterate over mesages from test resources and send them to kafka
// testDriver.pipeInput(
// factory.create(
// settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json")
// )
// )
//
// val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
//
// var result = consumer.poll(Duration.ofMillis(10))
// while (totalConsumerRecords.size != 3) {
// if (result.count() > 0) {
// totalConsumerRecords.addAll(result.asIterable())
// }
// log.error(result.count())
// result = consumer.poll(Duration.ofMillis(10))
// }
//
// assertThat(totalConsumerRecords.find { value -> value.topic() == topic })
// .describedAs("Message Test")
// .hasFieldOrPropertyWithValue("key", params.expectedKey)
// .hasFieldOrPropertyWithValue("value", params.expectedValue)
//
// assertThat(totalConsumerRecords.find { value -> value.topic() == reportingTopic })
// .describedAs("Report Test")
// .hasFieldOrPropertyWithValue("key", params.expectedKey)
// .hasFieldOrPropertyWithValue("value", params.expectedReportValue)
//
// assertThat(totalConsumerRecords.find { value -> value.topic() == processReportingTopic })
// .describedAs("Process Report Test")
// .hasFieldOrPropertyWithValue("key", params.expectedProcessReport.id)
// .hasFieldOrPropertyWithValue("value", Klaxon().toJsonString(params.expectedProcessReport))
// }
private fun kafkaTests() = Stream.of(
TestParams(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment