Commit 3c1e2574 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fixed text file validation

Now properly closing resources.
parent d6661bf5
Pipeline #12320 passed with stages
in 5 minutes and 45 seconds
......@@ -41,7 +41,10 @@ dependencies {
//implementation "org.apache.kafka:kafka-streams:${kafkaV}"
compile group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation 'org.memobase:memobase-service-utilities:1.7.1'
implementation 'org.memobase:memobase-service-utilities:1.11.0'
// SFTP Client
// is needed because of a bug.
implementation 'com.hierynomus:sshj:0.27.0'
// JSON Parser
implementation 'com.beust:klaxon:5.2'
// CSV Reader
......
......@@ -53,11 +53,14 @@ class Service(fileName: String = "app.yml") {
log.info("Validate file $file.")
val format = validator.validateExtension(file)
try {
val validationResult = validator.validate(sftp.open(file), format, file)
log.info("Validated file at path ${validationResult.first}")
producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second)
reports.add(validationResult.second)
val remoteFile = sftp.open(file)
remoteFile.use {
val validationResult = validator.validate(it.RemoteFileInputStream(), format, file)
log.info("Validated file at path ${validationResult.first}")
producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second)
reports.add(validationResult.second)
}
} catch (ex: SftpClientException) {
log.error("SFTP Exception: ${ex.localizedMessage}")
val report = Report(
......@@ -93,6 +96,15 @@ class Service(fileName: String = "app.yml") {
settings.processReportTopic
)
}
} catch (ex: SftpClientException) {
ex.printStackTrace()
log.error("SFTP Exception: ${ex.localizedMessage}.")
val report = Report(
"text-file-validation",
status = ReportStatus.failure,
message = "SFTP Exception: ${ex.localizedMessage}."
)
producer.sendJobReport(report, settings.processReportTopic)
} catch (ex: Exception) {
ex.printStackTrace()
log.error(ex.javaClass.canonicalName + ": " + ex.localizedMessage)
......
/*
* text-file-validation
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import org.memobase.testing.EmbeddedKafkaExtension
@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestRealData {
@Disabled
@Test
fun `test adg files`() {
val service = Service("real_tests.yml")
service.run()
}
}
sftp:
host: mb-wf2.memobase.unibas.ch
port: 22
user: mb_sftp
password: ${SFTP_PASSWORD:?env}
app:
directory: ./ArchivioDonetta-Donetta
kafka:
producer:
bootstrap.servers: localhost:12345
client.id: test-client
topic:
out: test-out
process: test-process
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment