Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit 046526b9 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Implement embedded sftp server

parent b999a7fd
Pipeline #8138 failed with stages
in 1 minute and 46 seconds
......@@ -68,6 +68,18 @@ dependencies {
testImplementation "org.apache.kafka:kafka-clients:$kafkaV:test"
testImplementation "org.apache.kafka:kafka_2.11:$kafkaV"
testImplementation "org.apache.kafka:kafka_2.11:$kafkaV:test"
// https://mvnrepository.com/artifact/com.github.marschall/memoryfilesystem
testCompile group: 'com.github.marschall', name: 'memoryfilesystem', version: '2.1.0'
// https://mvnrepository.com/artifact/org.apache.sshd/sshd-core
testCompile group: 'org.apache.sshd', name: 'sshd-core', version: '2.4.0'
// https://mvnrepository.com/artifact/org.apache.sshd/sshd-sftp
testCompile group: 'org.apache.sshd', name: 'sshd-sftp', version: '2.4.0'
}
compileKotlin {
......
......@@ -33,10 +33,10 @@ import java.io.File
import java.io.StringReader
import java.nio.charset.Charset
class KafkaTopology(private val settings: SettingsLoader
) {
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("KafkaTopology")
private val sftpClient = SftpClient(settings.sftpSettings)
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val csvHeaderCount = settings.appSettings.getProperty("app.csv.header.count").toInt()
private val csvUseHeaderProperties = settings.appSettings.getProperty("app.csv.header.line")!!.toBoolean()
private val csvUsedHeaderIndex = settings.appSettings.getProperty("app.csv.header.index").toInt()
......
/*
* record-parser
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.github.marschall.memoryfilesystem.MemoryFileSystemBuilder.newLinux
import org.apache.sshd.server.SshServer
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory
import java.io.Closeable
import java.io.IOException
import java.io.InputStream
import java.nio.charset.Charset
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.PathMatcher
import java.nio.file.WatchService
import java.nio.file.FileStore
import java.nio.file.attribute.UserPrincipalLookupService
import java.nio.file.spi.FileSystemProvider
class EmbeddedSftpServer(port: Int, user: String, password: String) : Closeable {
private val credentials = mapOf(Pair(user, password))
private val fileSystem: FileSystem = newLinux().build("EmbeddedSftpServerFileSystem@" + hashCode())
private val server: SshServer = SshServer.setUpDefaultServer()
init {
server.port = port
server.keyPairProvider = SimpleGeneratorHostKeyProvider()
server.setPasswordAuthenticator { authUser, authPassword, _ -> authenticate(authUser, authPassword) }
server.subsystemFactories = listOf(SftpSubsystemFactory())
/* When a channel is closed SshServer calls close() on the file system.
* In order to use the file system for multiple channels/sessions we
* have to use a file system wrapper whose close() does nothing.
*/
server.setFileSystemFactory { DoNotClose(fileSystem) }
server.start()
}
/**
* Put a text file on the SFTP folder. The file is available by the
* specified path.
* @param path the path to the file.
* @param content the files content.
* @param encoding the encoding of the file.
* @throws IOException if the file cannot be written.
*/
@Throws(IOException::class)
fun putFile(
path: String,
content: String,
encoding: Charset = Charset.defaultCharset()
) {
val contentAsBytes = content.toByteArray(encoding)
putFile(path, contentAsBytes)
}
/**
* Put a file on the SFTP folder. The file is available by the specified
* path.
* @param path the path to the file.
* @param content the files content.
* @throws IOException if the file cannot be written.
*/
@Throws(IOException::class)
fun putFile(
path: String,
content: ByteArray
) {
val pathAsObject = fileSystem.getPath(path)
ensureDirectoryOfPathExists(pathAsObject)
Files.write(pathAsObject, content)
}
/**
* Put a file on the SFTP folder. The file is available by the specified
* path. The file content is read from an `InputStream`.
* @param path the path to the file.
* @param inputStream an `InputStream` that provides the file's content.
* @throws IOException if the file cannot be written or the input stream
* cannot be read.
*/
@Throws(IOException::class)
fun putFile(
path: String,
inputStream: InputStream
) {
val pathAsObject = fileSystem.getPath(path)
ensureDirectoryOfPathExists(pathAsObject)
Files.copy(inputStream, pathAsObject)
}
/**
* Create a directory on the SFTP server.
* @param path the directory's path.
* @throws IOException if the directory cannot be created.
*/
@Throws(IOException::class)
fun createDirectory(
path: String
) {
val pathAsObject = fileSystem.getPath(path)
Files.createDirectories(pathAsObject)
}
/**
* Create multiple directories on the SFTP server.
* @param paths the directories' paths.
* @throws IOException if at least one directory cannot be created.
*/
@Throws(IOException::class)
fun createDirectories(
vararg paths: String
) {
for (path in paths) createDirectory(path)
}
/**
* Get a text file from the SFTP server.
* @param path the path to the file.
* @return the content of the text file.
* @throws IOException if the file cannot be read.
* @throws IllegalStateException if not called from within a test.
*/
@Throws(IOException::class)
fun getFileContent(
path: String,
encoding: Charset = Charset.defaultCharset()
): String {
return getFileContent(path).toString(encoding)
}
/**
* Get a file from the SFTP server.
* @param path the path to the file.
* @return the content of the file.
* @throws IOException if the file cannot be read.
* @throws IllegalStateException if not called from within a test.
*/
@Throws(IOException::class)
fun getFileContent(
path: String
): ByteArray {
val pathAsObject = fileSystem.getPath(path)
return Files.readAllBytes(pathAsObject)
}
/**
* Checks the existence of a file. returns `true` iff the file exists
* and it is not a directory.
* @param path the path to the file.
* @return `true` iff the file exists and it is not a directory.
* @throws IllegalStateException if not called from within a test.
*/
fun existsFile(
path: String
): Boolean {
val pathAsObject = fileSystem.getPath(path)
return Files.exists(pathAsObject) && !Files.isDirectory(pathAsObject)
}
private fun authenticate(
username: String,
password: String
): Boolean {
return (credentials.isEmpty()
|| credentials[username] == password)
}
@Throws(IOException::class)
private fun ensureDirectoryOfPathExists(
path: Path
) {
val directory = path.parent
if (directory != null && directory != path.root) Files.createDirectories(directory)
}
private class DoNotClose internal constructor(
val fileSystem: FileSystem
) : FileSystem() {
override fun provider(): FileSystemProvider {
return fileSystem.provider()
}
@Throws(IOException::class)
override fun close() {
//will not be closed
}
override fun isOpen(): Boolean {
return fileSystem.isOpen
}
override fun isReadOnly(): Boolean {
return fileSystem.isReadOnly
}
override fun getSeparator(): String {
return fileSystem.separator
}
override fun getRootDirectories(): Iterable<Path> {
return fileSystem.rootDirectories
}
override fun getFileStores(): Iterable<FileStore> {
return fileSystem.fileStores
}
override fun supportedFileAttributeViews(): Set<String> {
return fileSystem.supportedFileAttributeViews()
}
override fun getPath(
first: String,
vararg more: String
): Path {
return fileSystem.getPath(first, *more)
}
override fun getPathMatcher(
syntaxAndPattern: String
): PathMatcher {
return fileSystem.getPathMatcher(syntaxAndPattern)
}
override fun getUserPrincipalLookupService(): UserPrincipalLookupService {
return fileSystem.userPrincipalLookupService
}
@Throws(IOException::class)
override fun newWatchService(): WatchService {
return fileSystem.newWatchService()
}
}
override fun close() {
this.fileSystem.close()
this.server.close()
}
}
\ No newline at end of file
/*
* record-parser
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.junit.jupiter.api.extension.BeforeAllCallback
import org.junit.jupiter.api.extension.ExtensionContext
import java.io.File
import java.io.FileInputStream
class EmbeddedSftpServerExtension : BeforeAllCallback {
lateinit var sftpServer: EmbeddedSftpServer
private val port: Int = 22000
private val user: String = "user"
private val password: String = "password"
override fun beforeAll(context: ExtensionContext?) {
sftpServer = EmbeddedSftpServer(port, user, password)
sftpServer.createDirectories("/memobase/test_institution_1/test_record_set_1")
sftpServer.putFile("/memobase/test_institution_1/test_record_set_1/test.csv", FileInputStream(File("src/test/resources/data/test.csv")))
val wrapper = SftpWrapper(sftpServer)
if (context != null) {
context.getStore(ExtensionContext.Namespace.create(EmbeddedSftpServerExtension::class.java))
.put("sftp", wrapper)
Runtime.getRuntime().addShutdownHook(Thread(Runnable {
wrapper.close()
}))
}
}
class SftpWrapper(private val embedded: EmbeddedSftpServer) : ExtensionContext.Store.CloseableResource {
override fun close() {
embedded.close()
}
}
}
\ No newline at end of file
......@@ -28,14 +28,13 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.api.extension.ExtendWith
import java.io.File
import java.lang.IllegalStateException
import java.nio.charset.Charset
import kotlin.test.assertNotNull
import kotlin.test.assertNull
//@ExtendWith(EmbeddedKafkaExtension::class)
@ExtendWith(EmbeddedSftpServerExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Tests {
private val log = LogManager.getLogger("TestLogger")
......
{
"path": "./test_institution_1/test_record_set_1/brandt_metadaten.csv",
"path": "/memobase/test_institution_1/test_record_set_1/test.csv",
"format": "CSV"
}
\ No newline at end of file
{"format":"CSV","data":"ID Original ID,,Inhalt Haupttitel,Inhalt Beschreibung Bemerkung,Inhalt Beschreibung,Kontext Erstellung,Kontext AutorIn,AutorIn,Inhalt Genre,Inhalt Schlagworte,Kontext Aufnahmeort,Kontext Beschreibung Bemerkung,System Information Streaming,,System Information Dokumenttyp,Technische Informationen Trägerformat (des Originals),Technische Informationen Tonaufnahmeverfahren ,Technische Informationen Film Farbe,Technische Informationen Dauer,Technische Informationen Film Bemerkung\nAVGR13716,https://www.gr.ch/Exemplare/13716,Wintersport in Arosa,,\"Pferderennen am Obersee bei strahlendem Sonnenschein, viel Publikum, Gedränge vor Wettbüro, Reiter in Armeeuniform, Fotografen, Skijöring \u2013 Eisfest mit kostümierten Teilnehmer/innen vor Hotel Altein bei Nacht \u2013 Pferderennen am Obersee \u2013 Eiskunstlauf \u2013 Pferderennen, diesmal winterlicher \u2013 Schanzenspringen im Skigelände und viel Volk um die Alpgebäude Carmenna \u2013 Skifahrer im Aufstieg, Winterwanderer und nochmals Sprünge auf der Schneeschanze, Gruppe Skifahrer in wilder Schussfahrt, Wartende um die Hütten \u2013 Eishockey-Match \u2013 Impressionen von einem Abfahrtsrennen und Rundsicht über Arosa und Umgebung\",\"1920, 1920-1929, genaues Datum nicht eruierbar\",\"Brandt, Carl\",Autor/in,Dokumentarfilm; Amateurfilm,Pferdesport; Ski alpin; Skispringen; Eishochey; Tourismus,Arosa,Eisfest: teilweise identische Aufnahmen in AVGR12097 \u201eAnkunft David Zogg\u201c ; Schanzenspringen auf Carmenna: teilweise identische Aufnahmen in AVGR12115 \u201eTouristen auf dem Tschuggen\u201c,https://s3-eu-west-1.amazonaws.com/streaming.av-portal.gr.ch/13716/AVGR13716.mov,Intranet,Film,\"35-mm-Film, Negativ und Positiv, Nitrat\",stumm,s/w getönt,0:17:02,\"Vorhandene Elemente: AVGR9942: Negativ, Nitrat (CS, Z 986-172.8); AVGR9943: Positiv Nitrat (CS, Z 986-172.7); AVGR12098: Interpositiv / Marron 2366, Kopie 2016 (KBG); AVGR13715: Internegativ 2234, Kopie 2016 (KBG); AVGR13716: Positivkopie Farbe 2383, Kopie 2016, eingefärbte Sequenzen (KBG)\""}
\ No newline at end of file
{"format":"CSV","data":"Exemplar-AVGRNr,Permalink,Titel-Title,Titel-ZusatzTitel,Titel-Beschreibung,Titel-ProduktionsjahrdesOriginals,Titel-FilmPersonen,Titel-Funktionen,Titel-Genre,\"Titel-Genres, Titel-Beschreibung (Intel.)\",Titel-Drehort,Titel-Weiteres,Titel-Stream-Url,Titel-Benutzerzugang,Medium-Materialbezeichnung,Medium-MedienFormat,Medium-Ton,Medium-Farbe,Medium-Dauer,Medium-Bandlaenge\nAVGR13716,https://www.gr.ch/Exemplare/13716,Wintersport in Arosa,,\"Pferderennen am Obersee bei strahlendem Sonnenschein, viel Publikum, Gedränge vor Wettbüro, Reiter in Armeeuniform, Fotografen, Skijöring \u2013 Eisfest mit kostümierten Teilnehmer/innen vor Hotel Altein bei Nacht \u2013 Pferderennen am Obersee \u2013 Eiskunstlauf \u2013 Pferderennen, diesmal winterlicher \u2013 Schanzenspringen im Skigelände und viel Volk um die Alpgebäude Carmenna \u2013 Skifahrer im Aufstieg, Winterwanderer und nochmals Sprünge auf der Schneeschanze, Gruppe Skifahrer in wilder Schussfahrt, Wartende um die Hütten \u2013 Eishockey-Match \u2013 Impressionen von einem Abfahrtsrennen und Rundsicht über Arosa und Umgebung\",\"1920, 1920-1929, genaues Datum nicht eruierbar\",\"Brandt, Carl\",Autor/in,Dokumentarfilm; Amateurfilm,Pferdesport; Ski alpin; Skispringen; Eishochey; Tourismus,Arosa,Eisfest: teilweise identische Aufnahmen in AVGR12097 \u201eAnkunft David Zogg\u201c ; Schanzenspringen auf Carmenna: teilweise identische Aufnahmen in AVGR12115 \u201eTouristen auf dem Tschuggen\u201c,https://s3-eu-west-1.amazonaws.com/streaming.av-portal.gr.ch/13716/AVGR13716.mov,Intranet,Film,\"35-mm-Film, Negativ und Positiv, Nitrat\",stumm,s/w getönt,0:17:02,\"Vorhandene Elemente: AVGR9942: Negativ, Nitrat (CS, Z 986-172.8); AVGR9943: Positiv Nitrat (CS, Z 986-172.7); AVGR12098: Interpositiv / Marron 2366, Kopie 2016 (KBG); AVGR13715: Internegativ 2234, Kopie 2016 (KBG); AVGR13716: Positivkopie Farbe 2383, Kopie 2016, eingefärbte Sequenzen (KBG)\""}
\ No newline at end of file
This diff is collapsed.
sftp:
host: sb-uingest1.swissbib.unibas.ch
port: 22
host: localhost
port: 22000
user: user
password: password
app:
......@@ -8,7 +8,7 @@ app:
header:
count: 3
line: true
index: 2
index: 3
identifier: 0
kafka:
producer:
......
sftp:
host: sb-uingest1.swissbib.unibas.ch
port: 22
host: localhost
port: 22000
user: user
password: password
app:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment