Commit ebd4cfb7 authored by Jonas Waeber's avatar Jonas Waeber

Refactor input

parent dbf8fa11
apiVersion: v1 apiVersion: v1
kind: ConfigMap kind: ConfigMap
metadata: metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config" name: "{{ .Values.recordSetId }}-{{ .Values.shortSessionId }}-config"
namespace: memobase namespace: memobase
data: data:
APP_DIRECTORY: "{{ .Values.appDirectory }}" SESSION_ID: "{{ .Values.sessionId }}"
CLIENT_ID: "{{ .Values.processId }}-{{ .Values.jobName }}" RECORD_SET_ID: "{{ .Values.recordSetId }}"
TOPIC_IN: "{{ .Values.processId }}-{{ .Values.jobName }}" INSTITUTION_ID: "{{ .Values.institutionId }}"
TOPIC_PROCESS: "{{ .Values.processId }}-reporting" IS_PUBLISHED: "{{ .Values.isPublished }}"
\ No newline at end of file XML_RECORD_TAG: "{{.Values.xmlRecordTag }}"
XML_IDENTIIFER_FIELD_NAME: "{{ .Values.xmlIdentifierFieldName }}"
TABLE_SHEET_INDEX: "{{.Values.tableSheetIndex }}"
TABLE_HEADER_COUNT: "{{ .Values.tableHeaderCount }}"
TABLE_HEADER_INDEX: "{{ .Values.tableHeaderIndex }}"
TABLE_IDENTIFIER_INDEX: "{{ .Values.tableIdentifierIndex }}"
CLIENT_ID: "{{ .Values.recordSetId }}-{{ .Values.sessionId }}"
TOPIC_OUT: "{{.Values.topicName }}"
TOPIC_REPORTING: "{{ .Values.reportingTopicName }}"
\ No newline at end of file
apiVersion: batch/v1 apiVersion: batch/v1
kind: Job kind: Job
metadata: metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}" name: "{{ .Values.reportingTopicName }}-{{ .Values.shortSessionId }}"
namespace: memobase namespace: memobase
labels: labels:
institutionId: "{{ .Values.institutionId }}" institutionId: "{{ .Values.institutionId }}"
recordSetId: "{{ .Values.recordSetId }}" recordSetId: "{{ .Values.recordSetId }}"
jobType: "import-job" jobType: "text-file-validation"
spec: spec:
template: template:
spec: spec:
containers: containers:
- name: "{{ .Values.processId }}-{{ .Values.jobName }}" - name: "{{ .Values.recordSetId }}-{{ .Values.sessionId }}"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}" image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
envFrom: envFrom:
- secretRef: - secretRef:
...@@ -19,6 +19,6 @@ spec: ...@@ -19,6 +19,6 @@ spec:
- configMapRef: - configMapRef:
name: "{{ .Values.kafkaConfigs }}" name: "{{ .Values.kafkaConfigs }}"
- configMapRef: - configMapRef:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config" name: "{{ .Values.recordSetId }}-{{ .Values.shortSessionId }}-config"
restartPolicy: Never restartPolicy: Never
backoffLimit: 0 backoffLimit: 0
\ No newline at end of file
...@@ -9,20 +9,27 @@ tag: "latest" ...@@ -9,20 +9,27 @@ tag: "latest"
kafkaConfigs: prod-kafka-bootstrap-servers kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config sftpConfigs: internal-sftp-config
topicName: import-process-data-transform
reportingTopicName: import-process-reporting
###
############################################ # API Configs (Mandatory!)
## Values below should be defined via the # ###
## User Interface (Drupal) # sessionId: placeholder
############################################ shortSessionId: placeholder
jobName: text-file-validation
processId: p0001
institutionId: placeholder institutionId: placeholder
recordSetId: placeholder recordSetId: placeholder
## Needs to be set to the directory on the sftp server. isPublished: false
## this is a relative path built like this:
## "./{RECORD_SET_ID}" # Step 2
## The exact structure will be defined in task MEMO-196
appDirectory: placeholderValue # xml-data-transform
\ No newline at end of file xmlRecordTag: record
xmlIdentifierFieldName: id
# table-data-transform
tableSheetIndex: 1
tableHeaderCount: 1
tableHeaderIndex: 1
tableIdentifierIndex: 1
...@@ -28,6 +28,7 @@ class App { ...@@ -28,6 +28,7 @@ class App {
try { try {
val service = Service() val service = Service()
service.run() service.run()
exitProcess(0)
} catch (ex: Exception) { } catch (ex: Exception) {
ex.printStackTrace() ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message) log.error("Stopping application due to error: " + ex.message)
......
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
data class ProcessReport(
val id: String,
val status: String,
val total: Int,
val successes: Int,
val failures: Int
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
}
...@@ -21,23 +21,31 @@ import java.io.Closeable ...@@ -21,23 +21,31 @@ import java.io.Closeable
import java.util.Properties import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeader
class Producer(props: Properties, private val topic: String) : Closeable { class Producer(
props: Properties,
headers: Properties,
private val outputTopic: String,
private val reportingTopic: String
) : Closeable {
private val instance = KafkaProducer<String, String>(props) private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting" private val headers = transformHeaders(headers)
private fun transformHeaders(headers: Properties): List<Header> {
return headers.map { item ->
RecordHeader(item.key as String, (item.value as String).toByteArray())
}
}
fun sendMessage(key: String, message: Message) { fun sendMessage(key: String, message: Message) {
instance.send(ProducerRecord(topic, key, message.toJson())) instance.send(ProducerRecord(outputTopic, null, key, message.toJson(), headers))
} }
fun sendReport(report: Report) { fun sendReport(report: Report) {
instance.send(ProducerRecord(reportingTopic, report.id, report.toJson())) instance.send(ProducerRecord(reportingTopic, report.id, report.toJson()))
} }
fun sendJobReport(report: ProcessReport, topic: String) {
instance.send(ProducerRecord(topic, report.id, report.toJson()))
}
override fun close() { override fun close() {
instance.close() instance.close()
} }
......
...@@ -19,13 +19,38 @@ ...@@ -19,13 +19,38 @@
package org.memobase package org.memobase
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import java.time.LocalDateTime
data class Report( data class Report(
val id: String, val id: String,
val step: String = "text-file-validation",
val timestamp: String = LocalDateTime.now().toString(),
val status: String, val status: String,
val message: String val message: String
) { ) {
fun toJson(): String { fun toJson(): String {
return Klaxon().toJsonString(this) return Klaxon().toJsonString(this)
} }
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as Report
if (id != other.id) return false
if (step != other.step) return false
if (status != other.status) return false
if (message != other.message) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + step.hashCode()
result = 31 * result + status.hashCode()
result = 31 * result + message.hashCode()
return result
}
} }
...@@ -31,14 +31,26 @@ class Service(fileName: String = "app.yml") { ...@@ -31,14 +31,26 @@ class Service(fileName: String = "app.yml") {
val settings = val settings =
SettingsLoader( SettingsLoader(
listOf("directory"), listOf(
"sessionId",
"recordSetId",
"institutionId",
"isPublished",
"xmlRecordTag",
"xmlIdentifierFieldName",
"tableSheetIndex",
"tableHeaderCount",
"tableHeaderIndex",
"tableIdentifierIndex"
),
fileName, fileName,
useProducerConfig = true, useProducerConfig = true,
readSftpSettings = true readSftpSettings = true
) )
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic) private val producer = Producer(settings.kafkaProducerSettings, settings.appSettings, settings.outputTopic, settings.processReportTopic)
private val directory = settings.appSettings.getProperty("directory") private val recordSetId = settings.appSettings.getProperty("recordSetId")
private val sessionId = settings.appSettings.getProperty("sessionId")
private val validator = FileValidation() private val validator = FileValidation()
private val sftpClient = SftpClient(settings.sftpSettings) private val sftpClient = SftpClient(settings.sftpSettings)
...@@ -50,34 +62,32 @@ class Service(fileName: String = "app.yml") { ...@@ -50,34 +62,32 @@ class Service(fileName: String = "app.yml") {
sftpClient.use { sftp -> sftpClient.use { sftp ->
log.info("Connected to SFTP & Kafka.") log.info("Connected to SFTP & Kafka.")
val files = try { val files = try {
val fileList = sftp.listFiles(directory).map { File(it) } val fileList = sftp.listFiles(recordSetId).map { File(it) }
totalCount = fileList.size totalCount = fileList.size
fileList fileList
} catch (ex: SftpClientException) { } catch (ex: SftpClientException) {
ex.printStackTrace() ex.printStackTrace()
log.error("SFTP Exception: Could not compile the file list on sftp server in directory: $directory.") log.error("SFTP Exception: Could not compile the file list on sftp server in directory: './$recordSetId/'.")
val report = ProcessReport( val report = Report(
"text-file-validation", "$recordSetId#$sessionId",
status = ReportStatus.failure, status = ReportStatus.failure,
total = 0, message = "SFTP Exception: ${ex.localizedMessage}"
successes = 0,
failures = 0
) )
producer.sendJobReport(report, settings.processReportTopic) producer.sendReport(report)
exitProcess(1) exitProcess(1)
} }
log.info("Retrieved file list from sftp server at path: $directory") log.info("Retrieved file list from sftp server from folder: $recordSetId")
val reports = mutableListOf<Report>() val reports = mutableListOf<Report>()
try { try {
log.info("There are a total of ${files.size} files to validate.") log.info("Total files: ${files.size}.")
for (file in files) { for (file in files) {
log.info("Validate file $file.") log.info("Begin Validation: $file.")
val format = validator.validateExtension(file) val format = validator.validateExtension(file)
try { try {
val remoteFile = sftp.open(file) val remoteFile = sftp.open(file)
remoteFile.use { remoteFile.use {
val validationResult = validator.validate(it.RemoteFileInputStream(), format, file) val validationResult = validator.validate(it.RemoteFileInputStream(), format, file)
log.info("Validated file at path ${validationResult.first}") log.info("Validation ${validationResult.second.status}.")
producer.sendMessage(validationResult.second.id, validationResult.first) producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second) producer.sendReport(validationResult.second)
reports.add(validationResult.second) reports.add(validationResult.second)
...@@ -95,56 +105,28 @@ class Service(fileName: String = "app.yml") { ...@@ -95,56 +105,28 @@ class Service(fileName: String = "app.yml") {
} }
} }
log.info("Collected a total of ${reports.size} reports.") log.info("Collected a total of ${reports.size} reports.")
val failures = reports.count { report -> report.status == ReportStatus.failure }
if (failures > 0) {
log.warn("Validation ended with $failures failures!")
producer.sendJobReport(
ProcessReport(
"text-file-validation",
status = ReportStatus.failure,
total = totalCount,
failures = failures,
successes = totalCount - failures
),
settings.processReportTopic
)
} else {
log.info("Validation was successful!")
producer.sendJobReport(
ProcessReport(
"text-file-validation",
status = ReportStatus.success,
total = totalCount,
successes = reports.size,
failures = 0
),
settings.processReportTopic
)
}
} catch (ex: SftpClientException) { } catch (ex: SftpClientException) {
ex.printStackTrace() ex.printStackTrace()
log.error("SFTP Exception: ${ex.localizedMessage}.") log.error("SFTP Exception: ${ex.localizedMessage}.")
val report = ProcessReport( producer.sendReport(
"text-file-validation", Report(
status = ReportStatus.failure, "$recordSetId#$sessionId",
total = totalCount, status = ReportStatus.failure,
failures = reports.count { report -> report.status == ReportStatus.failure }, message = "SFTP Exception: ${ex.localizedMessage}."
successes = reports.count { report -> report.status == ReportStatus.success } )
) )
producer.sendJobReport(report, settings.processReportTopic) exitProcess(1)
} catch (ex: Exception) { } catch (ex: Exception) {
ex.printStackTrace() ex.printStackTrace()
log.error(ex.javaClass.canonicalName + ": " + ex.localizedMessage) log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}.")
producer.sendJobReport( producer.sendReport(
ProcessReport( Report(
"text-file-validation", "$recordSetId#$sessionId",
status = ReportStatus.failure, status = ReportStatus.failure,
total = totalCount, message = "Unknown Exception: ${ex.localizedMessage}."
failures = reports.count { report -> report.status == ReportStatus.failure }, )
successes = reports.count { report -> report.status == ReportStatus.success }
),
settings.processReportTopic
) )
exitProcess(1)
} }
} }
} }
......
...@@ -4,11 +4,20 @@ sftp: ...@@ -4,11 +4,20 @@ sftp:
user: ${SFTP_USER:?env} user: ${SFTP_USER:?env}
password: ${SFTP_PASSWORD:?env} password: ${SFTP_PASSWORD:?env}
app: app:
directory: ${APP_DIRECTORY:?env} sessionId: ${SESSION_ID:?env}
recordSetId: ${RECORD_SET_ID:?env}
institutionId: ${INSTITUTION_ID:?env}
isPublished: ${IS_PUBLISHED:?env}
xmlRecordTag: ${XML_RECORD_TAG:?env}
xmlIdentifierFieldName: ${XML_IDENTIIFER_FIELD_NAME:?env}
tableSheetIndex: ${TABLE_SHEET_INDEX:?env}
tableHeaderCount: ${TABLE_HEADER_COUNT:?env}
tableHeaderIndex: ${TABLE_HEADER_INDEX:?env}
tableIdentifierIndex: ${TABLE_IDENTIFIER_INDEX:?env}
kafka: kafka:
producer: producer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?env} bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?env}
client.id: ${CLIENT_ID:?env} client.id: ${CLIENT_ID:?env}
topic: topic:
out: ${TOPIC_IN:?env} out: ${TOPIC_OUT:?env}
process: ${TOPIC_PROCESS:?env} process: ${TOPIC_REPORTING:?env}
\ No newline at end of file \ No newline at end of file
...@@ -22,6 +22,5 @@ data class TestParams( ...@@ -22,6 +22,5 @@ data class TestParams(
val configFile: String, val configFile: String,
val expectedKey: String, val expectedKey: String,
val expectedValue: String, val expectedValue: String,
val expectedReportValue: Report, val expectedReportValue: Report
val expectedProcessReport: ProcessReport
) )
This diff is collapsed.
...@@ -4,11 +4,20 @@ sftp: ...@@ -4,11 +4,20 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_record_set_1 sessionId: session1
recordSetId: /testset1
institutionId: mrv
isPublished: false
xmlRecordTag: record
xmlIdentifierFieldName: id
tableSheetIndex: 1
tableHeaderCount: 1
tableHeaderIndex: 1
tableIdentifierIndex: 1
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1 client.id: test-file-validation-client
topic: topic:
out: sftp-reader-p1-j1 out: import-process-data-transform
process: p1-reporting process: import-process-reporting
\ No newline at end of file \ No newline at end of file
...@@ -4,11 +4,20 @@ sftp: ...@@ -4,11 +4,20 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_record_set_2 sessionId: session1
recordSetId: /testset2
institutionId: mrv
isPublished: false
xmlRecordTag: record
xmlIdentifierFieldName: id
tableSheetIndex: 1
tableHeaderCount: 1
tableHeaderIndex: 1
tableIdentifierIndex: 1
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1 client.id: test-file-validation-client
topic: topic:
out: sftp-reader-p1-j1 out: import-process-data-transform
process: p1-reporting process: import-process-reporting
\ No newline at end of file \ No newline at end of file
...@@ -4,11 +4,20 @@ sftp: ...@@ -4,11 +4,20 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_record_set_3 sessionId: session1
recordSetId: /testset3
institutionId: mrv
isPublished: false
xmlRecordTag: record
xmlIdentifierFieldName: id
tableSheetIndex: 1
tableHeaderCount: 1
tableHeaderIndex: 1
tableIdentifierIndex: 1
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1 client.id: test-file-validation-client
topic: topic:
out: sftp-reader-p1-j1 out: import-process-data-transform
process: p1-reporting process: import-process-reporting
\ No newline at end of file \ No newline at end of file
...@@ -4,11 +4,20 @@ sftp: ...@@ -4,11 +4,20 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_record_set_4 sessionId: session1
recordSetId: /testset4
institutionId: mrv
isPublished: false
xmlRecordTag: record
xmlIdentifierFieldName: id
tableSheetIndex: 1
tableHeaderCount: 1
tableHeaderIndex: 1
tableIdentifierIndex: 1
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1 client.id: test-file-validation-client
topic: topic:
out: sftp-reader-p1-j1 out: import-process-data-transform
process: p1-reporting process: import-process-reporting
\ No newline at end of file \ No newline at end of file
...@@ -4,11 +4,20 @@ sftp: ...@@ -4,11 +4,20 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_record_set_5 sessionId: session1
recordSetId: /testset5
institutionId: mrv
isPublished: false
xmlRecordTag: record
xmlIdentifierFieldName: id
tableSheetIndex: 1
tableHeaderCount: 1
tableHeaderIndex: 1
tableIdentifierIndex: 1