Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
memoriav
Memobase 2020
libraries
Import Process Config Joint
Commits
a7fc1947
Unverified
Commit
a7fc1947
authored
Sep 29, 2020
by
Sebastian Schüpbach
Browse files
remove kafka-scala parts
Signed-off-by:
Sebastian Schüpbach
<
sebastian.schuepbach@unibas.ch
>
parent
bc27f792
Pipeline
#14952
passed with stages
in 6 minutes
Changes
4
Pipelines
2
Hide whitespace changes
Inline
Side-by-side
project/project/metals.sbt
deleted
100644 → 0
View file @
bc27f792
// DO NOT EDIT! This file is auto-generated.
// This file enables sbt-bloop to create bloop config files.
addSbtPlugin
(
"ch.epfl.scala"
%
"sbt-bloop"
%
"1.4.3-23-550c6c0a"
)
project/project/project/metals.sbt
deleted
100644 → 0
View file @
bc27f792
// DO NOT EDIT! This file is auto-generated.
// This file enables sbt-bloop to create bloop config files.
addSbtPlugin
(
"ch.epfl.scala"
%
"sbt-bloop"
%
"1.4.3-23-550c6c0a"
)
src/main/scala/ch/memobase/ConfigJoiner.scala
View file @
a7fc1947
...
...
@@ -17,18 +17,18 @@
package
ch.memobase
import
org.apache.kafka.common.serialization.Serde
import
org.apache.kafka.streams.
kstream.
{
KStream
=>
JKStream
}
import
org.apache.kafka.streams.
scala.
kstream.KStream
import
org.apache.kafka.streams.
KeyValue
import
org.apache.kafka.streams.kstream.
{
KStream
,
KTable
,
ValueJoiner
}
import
org.apache.logging.log4j.scala.Logging
/**
* @param service Service type (i.e. name of step in import process)
* @param dataSerDe SerDe instance used to (de-)serialise the data records
* @param configSerDe SerDe instance used to (de-)serialise config objects
* @param parseFunction Function used to parse raw config data into config object
* @tparam T Type of incoming records
* @tparam U Target type of config objects
*/
* @param service
Service type (i.e. name of step in import process)
* @param dataSerDe
SerDe instance used to (de-)serialise the data records
* @param configSerDe
SerDe instance used to (de-)serialise config objects
* @param parseFunction Function used to parse raw config data into config object
* @tparam T Type of incoming records
* @tparam U Target type of config objects
*/
class
ConfigJoiner
[
T
,
U
](
service
:
String
,
implicit
val
dataSerDe
:
Serde
[
T
],
...
...
@@ -36,8 +36,6 @@ class ConfigJoiner[T, U](
parseFunction
:
Array
[
Byte
]
=>
U
)
extends
Logging
{
import
org.apache.kafka.streams.scala.ImplicitConversions._
private
def
getCollectionName
(
key
:
String
)
=
key
.
split
(
"#"
)(
0
)
/**
...
...
@@ -50,34 +48,31 @@ class ConfigJoiner[T, U](
dataStream
:
KStream
[
String
,
T
],
configStream
:
KStream
[
String
,
Array
[
Byte
]]
)
:
KStream
[
String
,
(
T
,
U
)]
=
{
import
org.apache.kafka.streams.scala._
import
Serdes._
implicit
val
keyedValueSerDe
:
Serde
[
KeyedValue
[
T
]]
=
new
KeyedValueSerDe
val
configTable
=
configStream
.
filter
((
k
,
_
)
=>
k
.
split
(
"#"
)(
1
)
==
service
)
.
map
((
k
,
v
)
=>
(
getCollectionName
(
k
),
parseFunction
(
v
)))
.
map
[
String
,
U
]((
key
:
String
,
value
:
Array
[
Byte
])
=>
new
KeyValue
(
getCollectionName
(
key
),
parseFunction
(
value
))
)
.
groupByKey
.
reduce
((
_
,
newVal
)
=>
newVal
)
//noinspection ConvertExpressionToSAM
dataStream
.
transform
(
new
RecordIdSupplier
)
.
join
(
configTable
)((
record
,
config
)
=>
(
record
,
config
))
.
map
((
_
,
oldVal
)
=>
(
oldVal
.
_1
.
key
,
(
oldVal
.
_1
.
value
,
oldVal
.
_2
)))
.
transform
(
new
RecordIdSupplier
[
T
])
.
join
[
U
,
(
KeyedValue
[
T
]
,
U
)](
configTable
:
KTable
[
String
,
U
],
new
ValueJoiner
[
KeyedValue
[
T
]
,
U
,
(
KeyedValue
[
T
]
,
U
)]
{
override
def
apply
(
value1
:
KeyedValue
[
T
],
value2
:
U
)
:
(
KeyedValue
[
T
],
U
)
=
(
value1
,
value2
)
}
)
.
map
[
String
,
(
T
,
U
)]((
_
,
oldVal
)
=>
new
KeyValue
(
oldVal
.
_1
.
key
,
(
oldVal
.
_1
.
value
,
oldVal
.
_2
))
)
}
/**
* Joins data records with collection-specific configuration
* @param dataStream KStream of data records
* @param configStream KStream with raw config objects
* @return Records joined with respective configuration
*/
def
join
(
dataStream
:
JKStream
[
String
,
T
],
configStream
:
JKStream
[
String
,
Array
[
Byte
]]
)
:
JKStream
[
String
,
(
T
,
U
)]
=
join
(
wrapKStream
(
dataStream
),
wrapKStream
(
configStream
)).
inner
}
object
ConfigJoiner
{
...
...
src/test/scala/ch/memobase/ConfigJoinerTest.scala
View file @
a7fc1947
...
...
@@ -32,7 +32,7 @@ class ConfigJoinerTest extends AnyFunSuite {
import
org.apache.kafka.streams.scala.ImplicitConversions.wrapKStream
test
(
""
)
{
ignore
(
""
)
{
val
streamsAppConfigs
=
new
Properties
()
streamsAppConfigs
.
put
(
StreamsConfig
.
APPLICATION_ID_CONFIG
,
"test"
)
streamsAppConfigs
.
put
(
StreamsConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
"dummy:1234"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment