Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
memoriav
Memobase 2020
services
Import Process
Media-Linker
Commits
e85da650
Commit
e85da650
authored
Oct 20, 2020
by
Matthias
Browse files
make tests run
parent
86b718b3
Pipeline
#15791
failed with stages
in 1 minute and 59 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
src/main/kotlin/KafkaTopology.kt
View file @
e85da650
...
...
@@ -77,7 +77,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
.
transformValues
(
HeaderExtractionTransformSupplier
<
String
>())
.
mapValues
{
value
->
createModel
(
value
)
}
.
mapValues
{
value
->
extractSubjects
(
value
)
}
.
mapValues
{
value
->
createThumbnail
(
value
)
}
//
.mapValues { value -> createThumbnail(value) }
.
branch
(
Predicate
{
_
,
value
->
containsDigitalObjectWithoutLocator
(
value
.
second
)
},
Predicate
{
_
,
_
->
true
}
...
...
@@ -107,7 +107,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
val
unchangedWrittenResources
=
instantiationBranch
[
1
]
.
mapValues
{
value
->
val
out
=
StringWriter
()
value
.
first
.
write
(
out
,
Constant
.
rdfParserLang
)
value
.
first
.
first
.
write
(
out
,
Constant
.
rdfParserLang
)
out
.
toString
().
trim
()
}
...
...
@@ -156,15 +156,14 @@ class KafkaTopology(private val settings: SettingsLoader) {
}
val
value
=
originalIdentifier
.
getProperty
(
RICO
.
identifier
).
string
val
files
=
createMediaFileList
(
data
.
first
.
second
.
recordSetId
)
// TODO: instead of looping 'files', loop only given uri below with a fixed set of extensions
sftpClient
.
exists
(
data
.
first
.
second
.
recordSetId
+
"/"
+
value
+
".jpg"
)
for
(
file
in
files
)
{
if
(
file
.
contains
(
value
))
{
link
=
"${Constant.sftpPathPrefix}$file"
val
fileExtensions
=
arrayOf
(
"jpg"
,
"jpeg"
,
"png"
,
"mp3"
,
"mp4"
)
for
(
extension
in
fileExtensions
)
{
val
filePath
=
data
.
first
.
second
.
recordSetId
+
"/"
+
value
+
"."
+
extension
if
(
sftpClient
.
exists
(
filePath
))
{
link
=
"${Constant.sftpPathPrefix}$filePath"
val
literal
=
ResourceFactory
.
createPlainLiteral
(
link
)
digitalObject
.
addLiteral
(
EBUCORE
.
locator
,
literal
)
data
.
first
.
first
.
createLiteral
(
digitalObject
.
toString
(),
true
)
return
Pair
(
data
.
first
.
first
,
Report
(
...
...
@@ -173,6 +172,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
ReportMessages
.
reportSuccess
(
digitalObject
.
uri
,
link
)
)
)
break
}
}
return
Pair
(
...
...
@@ -185,14 +185,17 @@ class KafkaTopology(private val settings: SettingsLoader) {
)
}
private
fun
createThumbnail
(
key
:
String
,
data
:
Pair
<
Pair
<
Model
,
HeaderMetadata
>,
List
<
Resource
>>):
Pair
<
Model
,
Report
>
{
/*
private fun createThumbnail(key: data: Pair<Pair<Model, HeaderMetadata>, List<Resource>>): Pair<Model, Report> {
// TODO: same here as in enrichSftpLocator but for thum
val thumbnail = data.first.first.createResource()
thumbnail.addProperty(RDF.type, RICO.Instantiation)
thumbnail.addProperty(RICO.type, "thumbnail")
thumbnail
.
addProperty
(
RICO
.
type
,
"thumbna
il"
)
thumbnail.addProperty(
EBUCORE.locator, "locator of f
il
e
")
return Pair(data.first.first, new Report("id00", "undefined","just a sample report"))
}
*/
}
src/main/kotlin/Service.kt
View file @
e85da650
...
...
@@ -17,14 +17,9 @@
*/
package
org.memobase
import
ch.memobase.Effect
import
ch.memobase.EffectsRegistry
import
ch.memobase.ShutdownMessage
import
org.apache.kafka.streams.KafkaStreams
import
org.apache.logging.log4j.LogManager
import
org.memobase.settings.SettingsLoader
import
scala.Some
import
scala.runtime.BoxedUnit
import
kotlin.system.exitProcess
class
Service
(
file
:
String
=
"app.yml"
)
{
...
...
@@ -44,18 +39,7 @@ class Service(file: String = "app.yml") {
private
val
appId
=
settings
.
kafkaStreamsSettings
.
getProperty
(
"application.id"
)
val
builder
=
KafkaTopology
(
settings
).
prepare
()
private
val
registry
=
EffectsRegistry
()
private
val
shutdownEffect
=
Effect
(
"shutdown"
,
this
::
exit
,
Some
(
"Shutting down application"
))
fun
run
()
{
registry
.
register
(
ShutdownMessage
(
appId
.
replace
(
"-normalization-service"
,
""
),
"normalization-service"
,
"termination"
),
shutdownEffect
)
registry
.
run
(
builder
,
"import-process-admin"
)
val
stream
=
KafkaStreams
(
builder
.
build
(),
settings
.
kafkaStreamsSettings
)
stream
.
use
{
it
.
start
()
...
...
@@ -63,10 +47,7 @@ class Service(file: String = "app.yml") {
log
.
info
(
"Service is running."
)
Thread
.
sleep
(
10_000L
)
}
exitProcess
(
0
)
}
}
private
fun
exit
():
BoxedUnit
{
exitProcess
(
0
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment