KafkaTopology.scala 5.6 KB
Newer Older
1
/*
2
3
 * rico2edm
 * Copyright (C) 2021  UB Basel
4
5
6
7
8
9
10
11
12
13
14
15
16
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17
 *
18
19
20
21
 */

package ch.memobase

22
import ch.memobase.edm.EDM
23
24
25
26
27
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream.KStream
import org.apache.kafka.streams.scala.{StreamsBuilder, _}
import org.apache.logging.log4j.scala.Logging
28
import ch.memobase.edm.ExtractionResult
29
30
31
32
33
34

import scala.util.Try

class KafkaTopology extends Logging {

  import KafkaTopologyUtils._
35
36
37
38
  //import org.apache.kafka.streams.scala.serialization.Serdes
  //changed in Kafka client 2.7.0
  //but with the above we do not have the correct implicits for builder streams
  //evaluation necessary - not now...
39
40
41
42
43
44
45
46
  import Serdes._

  def build(
             topicIn: String,
             topicOut: String,
             reportingTopic: String
           ): Topology = {
    val builder = new StreamsBuilder
47
    val edmBuilder = new EDM
48
49

    val source = builder.stream[String, String](topicIn)
50
51
52
    //val Array(noDigitalObject, noLocator, noPhoto, isPhoto) = source

    //we have to discuss, which documents should be delivered to Europeana
53
    val Array(noDigitalObject, noLocator, isEDMDeliverable) = source
54
      .branch(
55
56
        (_, v) => hasNoDigitalObject(v),
        (_, v) => hasNoLocator(v),
57
58
59
60
        (_, _) => true
      )

    //noinspection ConvertibleToMethodValue
61
62
    val Array(hasWarnings, isDeliverable, noEDM) = isEDMDeliverable
      .mapValues(edmBuilder.create(_))
63
64
65
66
67
68
69
      .branch(
        (_, v) => v.isSuccess && v.get.warnings.nonEmpty,
        (_, v) => v.isSuccess,
        (_, v) => v.isFailure
      )

    sendRecord(hasWarnings, topicOut)
70
    sendRecord(isDeliverable, topicOut)
71
72

    reportManifestCreationWarnings(hasWarnings, reportingTopic)
73
74
75
    reportSuccessfulEDMCreation(isDeliverable, reportingTopic)
    reportEDMCreationFailure(noEDM, reportingTopic)

76

77
78
79
80
81
    reportIgnoredRecord(
      noLocator,
      reportingTopic,
      "Digital object has no locator"
    )
82

83
84
85
86
87
88
89
90
    reportIgnoredRecord(
      noDigitalObject,
      reportingTopic,
      "Record has no digital object"
    )



91
92

    /*
93
94
95
96
97
98
99
    reportIgnoredRecord(noPhoto, reportingTopic, "Resource is not an image")
    reportIgnoredRecord(
      noDigitalObject,
      reportingTopic,
      "Record has no digital object"
    )

100
101
     */

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
    builder.build()
  }

  private def sendRecord(stream: KStream[String, Try[ExtractionResult[(String, String)]]],
                         topicOut: String): Unit = {
    stream
      .map((_, v) => (extractId(v.get.obj._1), v.get.obj._2))
      .to(topicOut)
  }

  private def reportManifestCreationWarnings(
                                              stream: KStream[String, Try[ExtractionResult[(String, String)]]],
                                              topicReport: String
                                            ): Unit =
    stream
      .map((k, v) =>
        (
119
120
121
          //todo: define OAI ID
          //s"https://memobase.ch/record/$k",
          k,
122
          ReportingObject(
123
124
125
            //todo: define OAI ID
            //s"https://memobase.ch/record/$k",
            k,
126
127
128
129
130
131
132
            ProcessingWarning,
            v.get.warnings.mkString("\n")
          ).toString
        )
      )
      .to(topicReport)

133
  private def reportSuccessfulEDMCreation(
134
135
136
137
138
139
                                                stream: KStream[String, Try[ExtractionResult[(String, String)]]],
                                                topicReport: String
                                              ): Unit =
    stream
      .map((k, _) =>
        (
140
141
          //s"https://memobase.ch/record/$k",
          k,
142
          ReportingObject(
143
144
            //s"https://memobase.ch/record/$k",
            k,
145
            ProcessingSuccess,
146
            "EDM document for Europeana successfully created"
147
148
149
150
151
          ).toString
        )
      )
      .to(topicReport)

152
  private def reportEDMCreationFailure(
153
154
155
156
157
158
                                             stream: KStream[String, Try[ExtractionResult[(String, String)]]],
                                             topicReport: String
                                           ): Unit =
    stream
      .map((k, v) =>
        (
159
160
          //s"https://memobase.ch/record/$k",
          k,
161
          ReportingObject(
162
163
            //s"https://memobase.ch/record/$k",
            k,
164
            ProcessingFatal,
165
            s"Error creating EDM document: ${v.failed.get.getMessage}"
166
167
168
169
170
171
172
173
174
175
176
177
178
          ).toString
        )
      )
      .to(topicReport)

  private def reportIgnoredRecord(
                                   stream: KStream[String, String],
                                   topicReport: String,
                                   message: String
                                 ): Unit =
    stream
      .map((k, _) =>
        (
179
180
          //s"https://memobase.ch/record/$k",
          k,
181
          ReportingObject(
182
183
            //s"https://memobase.ch/record/$k",
            k,
184
185
186
187
188
189
190
191
            ProcessingIgnore,
            message
          ).toString
        )
      )
      .to(topicReport)

}