Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
memoriav
Memobase 2020
services
Import Process
Media-Linker
Commits
86b718b3
Commit
86b718b3
authored
Oct 13, 2020
by
Matthias
Browse files
some steps for adding thumbnails (code breaks!)
parent
379962dc
Pipeline
#15571
failed with stages
in 1 minute and 33 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
build.gradle
View file @
86b718b3
...
...
@@ -47,8 +47,7 @@ dependencies {
// JSON Parser
implementation
'com.beust:klaxon:5.2'
implementation
'org.memobase:memobase-service-utilities:0.14.2'
implementation
'ch.memobase:import-process-effects-registry_2.12:0.2.1'
implementation
'org.memobase:memobase-service-utilities:0.16.0'
implementation
'org.apache.jena:apache-jena:3.14.0'
...
...
src/main/kotlin/KafkaTopology.kt
View file @
86b718b3
...
...
@@ -21,6 +21,7 @@ package org.memobase
import
org.apache.jena.rdf.model.Model
import
org.apache.jena.rdf.model.ModelFactory
import
org.apache.jena.rdf.model.Resource
import
org.apache.jena.rdf.model.ResourceFactory
import
org.apache.jena.rdf.model.impl.StatementImpl
import
org.apache.kafka.streams.StreamsBuilder
import
org.apache.kafka.streams.kstream.Predicate
...
...
@@ -32,6 +33,8 @@ import org.memobase.reports.ReportMessages
import
org.memobase.reports.ReportStatus
import
org.memobase.settings.SettingsLoader
import
org.memobase.sftp.SftpClient
import
settings.HeaderExtractionTransformSupplier
import
settings.HeaderMetadata
import
java.io.StringReader
import
java.io.StringWriter
import
kotlin.system.exitProcess
...
...
@@ -50,21 +53,20 @@ class KafkaTopology(private val settings: SettingsLoader) {
private
val
sftpClient
=
SftpClient
(
settings
.
sftpSettings
)
private
val
sftpBasePath
=
appSettings
.
getProperty
(
Constant
.
sftpBasePathPropertyName
)
private
val
files
=
createMediaFileList
(
settings
.
appSettings
.
getProperty
(
Constant
.
recordSetIdPropertyName
))
private
fun
createMediaFileList
(
setting
:
String
):
List
<
String
>
{
return
try
{
val
list
=
sftpClient
.
listFiles
(
"$sftpBasePath/$setting/${Constant.mediaFolderName}"
)
log
.
info
(
"Files found on sftp server: $list."
)
list
}
catch
(
ex
:
Exception
)
{
//TODO: Report erstellen
ex
.
printStackTrace
()
log
.
error
(
ex
.
localizedMessage
+
": $sftpBasePath/$setting/${Constant.mediaFolderName}"
)
exitProcess
(
1
)
}
}
private
val
reportingTopic
=
"${settings.processReportTopic}
-${Constant.topicReportingSuffix}
"
private
val
reportingTopic
=
"${settings.processReportTopic}"
fun
prepare
():
StreamsBuilder
{
val
builder
=
StreamsBuilder
()
...
...
@@ -72,8 +74,10 @@ class KafkaTopology(private val settings: SettingsLoader) {
val
stream
=
builder
.
stream
<
String
,
String
>(
settings
.
inputTopic
)
val
instantiationBranch
=
stream
.
transformValues
(
HeaderExtractionTransformSupplier
<
String
>())
.
mapValues
{
value
->
createModel
(
value
)
}
.
mapValues
{
value
->
extractSubjects
(
value
)
}
.
mapValues
{
value
->
createThumbnail
(
value
)
}
.
branch
(
Predicate
{
_
,
value
->
containsDigitalObjectWithoutLocator
(
value
.
second
)
},
Predicate
{
_
,
_
->
true
}
...
...
@@ -97,7 +101,6 @@ class KafkaTopology(private val settings: SettingsLoader) {
ReportStatus
.
success
,
"Create sftp file path to instantiation."
).
toJson
()
}
.
to
(
reportingTopic
)
...
...
@@ -118,8 +121,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
return
builder
}
private
fun
extractSubjects
(
model
:
Model
):
Pair
<
Model
,
List
<
Resource
>>
{
return
Pair
(
model
,
model
.
listSubjects
().
toList
())
private
fun
extractSubjects
(
input
:
Pair
<
Model
,
HeaderMetadata
>):
Pair
<
Pair
<
Model
,
HeaderMetadata
>
,
List
<
Resource
>>
{
return
Pair
(
input
,
input
.
first
.
listSubjects
().
toList
())
}
private
fun
containsDigitalObjectWithoutLocator
(
res
:
List
<
Resource
>):
Boolean
{
...
...
@@ -135,13 +138,13 @@ class KafkaTopology(private val settings: SettingsLoader) {
}
}
private
fun
createModel
(
data
:
String
):
Model
{
private
fun
createModel
(
data
:
Pair
<
String
,
HeaderMetadata
>):
Pair
<
Model
,
HeaderMetadata
>
{
val
model
=
ModelFactory
.
createDefaultModel
()
model
.
read
(
StringReader
(
data
),
""
,
Constant
.
rdfParserLang
)
return
model
model
.
read
(
StringReader
(
data
.
first
),
""
,
Constant
.
rdfParserLang
)
return
Pair
(
model
,
data
.
second
)
}
private
fun
enrichSftpLocator
(
key
:
String
,
data
:
Pair
<
Model
,
List
<
Resource
>>):
Pair
<
Model
,
Report
>
{
private
fun
enrichSftpLocator
(
key
:
String
,
data
:
Pair
<
Pair
<
Model
,
HeaderMetadata
>
,
List
<
Resource
>>):
Pair
<
Model
,
Report
>
{
var
link
=
""
val
digitalObject
=
data
.
second
.
first
{
it
.
hasProperty
(
RICO
.
type
,
Constant
.
digitalObject
)
&&
!
it
.
hasProperty
(
EBUCORE
.
locator
)
}
...
...
@@ -149,16 +152,21 @@ class KafkaTopology(private val settings: SettingsLoader) {
val
originalIdentifier
=
try
{
getOriginalIdentifiers
(
data
.
second
)[
0
]
}
catch
(
ex
:
IndexOutOfBoundsException
)
{
return
Pair
(
data
.
first
,
Report
(
key
,
ReportStatus
.
failure
,
ReportMessages
.
noOriginalIdentifier
(
key
)))
return
Pair
(
data
.
first
.
first
,
Report
(
key
,
ReportStatus
.
failure
,
ReportMessages
.
noOriginalIdentifier
(
key
)))
}
val
value
=
originalIdentifier
.
getProperty
(
RICO
.
identifier
)
val
value
=
originalIdentifier
.
getProperty
(
RICO
.
identifier
).
string
val
files
=
createMediaFileList
(
data
.
first
.
second
.
recordSetId
)
// TODO: instead of looping 'files', loop only given uri below with a fixed set of extensions
sftpClient
.
exists
(
data
.
first
.
second
.
recordSetId
+
"/"
+
value
+
".jpg"
)
for
(
file
in
files
)
{
if
(
file
.
contains
(
value
.
string
))
{
if
(
file
.
contains
(
value
))
{
link
=
"${Constant.sftpPathPrefix}$file"
val
literal
=
data
.
first
.
createLiteral
(
link
)
d
ata
.
first
.
add
(
StatementImpl
(
digitalObject
,
EBUCORE
.
locator
,
literal
)
)
val
literal
=
ResourceFactory
.
create
Plain
Literal
(
link
)
d
igitalObject
.
addLiteral
(
EBUCORE
.
locator
,
literal
)
return
Pair
(
data
.
first
,
data
.
first
.
first
,
Report
(
digitalObject
.
uri
,
ReportStatus
.
success
,
...
...
@@ -168,7 +176,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
}
}
return
Pair
(
data
.
first
,
data
.
first
.
first
,
Report
(
digitalObject
.
uri
,
ReportStatus
.
failure
,
...
...
@@ -176,4 +184,15 @@ class KafkaTopology(private val settings: SettingsLoader) {
)
)
}
private
fun
createThumbnail
(
key
:
String
,
data
:
Pair
<
Pair
<
Model
,
HeaderMetadata
>,
List
<
Resource
>>):
Pair
<
Model
,
Report
>
{
// TODO: same here as in enrichSftpLocator but for thum
val
thumbnail
=
data
.
first
.
first
.
createResource
()
thumbnail
.
addProperty
(
RDF
.
type
,
RICO
.
Instantiation
)
thumbnail
.
addProperty
(
RICO
.
type
,
"thumbnail"
)
thumbnail
.
addProperty
(
RICO
.
type
,
"thumbnail"
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment