Unverified Commit 6411b2b5 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

more logs

parent 6b811c95
Pipeline #22260 passed with stages
in 5 minutes and 13 seconds
...@@ -39,7 +39,8 @@ object App ...@@ -39,7 +39,8 @@ object App
Stream Stream
.continually(poll) .continually(poll)
.takeWhile(records => records.nonEmpty) .takeWhile(records => records.nonEmpty)
.flatMap(records => .flatMap(records => {
logger.info("Processing records batch with size ${records.size}")
records records
.flatMap { record => .flatMap { record =>
Report(record) match { Report(record) match {
...@@ -55,12 +56,11 @@ object App ...@@ -55,12 +56,11 @@ object App
} }
.filter(record => filters.forall(f => f(record))) .filter(record => filters.forall(f => f(record)))
.foldLeft(HashSet[DeleteMessage]())((agg, record) => { .foldLeft(HashSet[DeleteMessage]())((agg, record) => {
logger.info(s"Length of delete messages: ${agg.size}") logger.info(s"Size of delete messages: ${agg.size}")
agg + DeleteMessage(record, sessionId) agg + DeleteMessage(record, sessionId)
} }
) )
}
) )
.foreach(id => sendDelete(id, dryRun)) .foreach(id => sendDelete(id, dryRun))
} catch { } catch {
......
...@@ -44,6 +44,7 @@ trait AppSettings { ...@@ -44,6 +44,7 @@ trait AppSettings {
props.put("group.id", clientId) props.put("group.id", clientId)
props.put("allow.auto.create.topics", "false") props.put("allow.auto.create.topics", "false")
props.put("auto.offset.reset", "earliest") props.put("auto.offset.reset", "earliest")
props.put("enable.auto.commit", "false")
props.put("max.poll.records", "5000") props.put("max.poll.records", "5000")
props props
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment