Unverified Commit 0982b7c7 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

add a 'dry run' mode

parent c00e14c1
Pipeline #22166 passed with stages
in 4 minutes and 58 seconds
...@@ -5,6 +5,7 @@ collection id, institution id, or record id. ...@@ -5,6 +5,7 @@ collection id, institution id, or record id.
## Usage ## Usage
``` ```sh
--from <yyyy-mm-dd --from <yyyy-mm-dd
--session-id --session-id
```
\ No newline at end of file
...@@ -34,7 +34,7 @@ object App ...@@ -34,7 +34,7 @@ object App
with Logging { with Logging {
parse(args) match { parse(args) match {
case Some((sessionId, filters)) => case Some((sessionId, filters, dryRun)) =>
try { try {
Stream Stream
.continually(poll) .continually(poll)
...@@ -56,7 +56,7 @@ object App ...@@ -56,7 +56,7 @@ object App
.filter(record => filters.forall(f => f(record))) .filter(record => filters.forall(f => f(record)))
.foldLeft(HashSet[DeleteMessage]())((agg, record) => agg + DeleteMessage(record, sessionId)) .foldLeft(HashSet[DeleteMessage]())((agg, record) => agg + DeleteMessage(record, sessionId))
) )
.foreach(id => sendDelete(id)) .foreach(id => sendDelete(id, dryRun))
} catch { } catch {
case e: Exception => case e: Exception =>
logger.error(e) logger.error(e)
......
...@@ -18,11 +18,11 @@ ...@@ -18,11 +18,11 @@
package ch.memobase package ch.memobase
import java.util.Calendar
import ch.memobase.models._ import ch.memobase.models._
import scopt.{OParser, OParserBuilder} import scopt.{OParser, OParserBuilder}
import java.util.Calendar
trait ArgParser { trait ArgParser {
self: MsgFilter => self: MsgFilter =>
...@@ -65,6 +65,9 @@ trait ArgParser { ...@@ -65,6 +65,9 @@ trait ArgParser {
.text("retains only records processed before timestamp") .text("retains only records processed before timestamp")
.maxOccurs(1) .maxOccurs(1)
.optional(), .optional(),
opt[Unit]('d', "dry-run")
.action((_, c) => c.copy(dryRun = true))
.text("dry-run delete"),
arg[String]("<session-id>") arg[String]("<session-id>")
.action((v, c) => c.copy(sessionId = v)) .action((v, c) => c.copy(sessionId = v))
.text("session id assigned to delete message") .text("session id assigned to delete message")
...@@ -73,7 +76,7 @@ trait ArgParser { ...@@ -73,7 +76,7 @@ trait ArgParser {
) )
} }
def parse(args: Array[String]): Option[(String, Seq[FilterFun])] = { def parse(args: Array[String]): Option[(String, Seq[FilterFun], Boolean)] = {
OParser.parse(parser, args, Args()) match { OParser.parse(parser, args, Args()) match {
case Some(config) => Some( case Some(config) => Some(
config.sessionId, config.sessionId,
...@@ -82,7 +85,9 @@ trait ArgParser { ...@@ -82,7 +85,9 @@ trait ArgParser {
config.institutionFilters, config.institutionFilters,
config.recordSetFilters, config.recordSetFilters,
config.recordFilters, config.recordFilters,
config.sessionFilters)) config.sessionFilters),
config.dryRun
)
case None => None case None => None
} }
} }
......
...@@ -28,13 +28,14 @@ abstract class KProducer { ...@@ -28,13 +28,14 @@ abstract class KProducer {
private lazy val producer = new KafkaProducer[String, String](producerProps) private lazy val producer = new KafkaProducer[String, String](producerProps)
def sendDelete(id: DeleteMessage): Unit = { def sendDelete(id: DeleteMessage, dryRun: Boolean): Unit = {
logger.info(s"Sending delete command for $id") logger.info(s"Sending delete command for $id")
val producerRecord = new ProducerRecord[String, String](outputTopic, id.recordId, null) val producerRecord = new ProducerRecord[String, String](outputTopic, id.recordId, null)
producerRecord.headers() producerRecord.headers()
.add("recordSetId", id.recordSetId.getBytes()) .add("recordSetId", id.recordSetId.getBytes())
.add("institutionId", id.institutionId.getBytes()) .add("institutionId", id.institutionId.getBytes())
.add("sessionId", id.sessionId.getBytes()) .add("sessionId", id.sessionId.getBytes())
.add("dryRun", List((if (dryRun) 1 else 0).toByte).toArray)
producer.send(producerRecord) producer.send(producerRecord)
} }
......
...@@ -30,4 +30,5 @@ case class Args(sessionId: String = "", ...@@ -30,4 +30,5 @@ case class Args(sessionId: String = "",
cal.setTimeInMillis(0) cal.setTimeInMillis(0)
cal cal
}, },
createdBeforeFilter: Calendar = Calendar.getInstance()) createdBeforeFilter: Calendar = Calendar.getInstance(),
dryRun: Boolean = false)
/*
* Import Process Delete
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import java.text.SimpleDateFormat
import ch.memobase.models.Report
import org.apache.logging.log4j.scala.Logging
import org.scalatest.funsuite.AnyFunSuite
class ArgParserTest extends AnyFunSuite {
test("should parse correctly") {
val argParser = new {} with MsgFilter with ArgParser with Logging {}
val parsedArgs = argParser.parse(Array("--record-set-filter", "LS-film", "asession"))
val date = new SimpleDateFormat("YYYY-MM-dd").parse("2020-01-02");
val report = Report("aKey", "recId", "fedora-ingest", date, "SUCCESS", "blabla", "LS-film", "aInstitution", "aSession")
assert(parsedArgs.get._2.forall(_ (report)))
}
}
/*
* Import Process Delete
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase.models
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.scalatest.funsuite.AnyFunSuite
class ReportTest extends AnyFunSuite {
test("awefwefwefwef wef") {
val json =
"""{
| "id": "id",
| "step": "text-file-validation",
| "timestamp": "2020-11-26T09:14:20.324",
| "status": "SUCCESS",
| "message": "successfully validated"
|}""".stripMargin
val consumerRecord = new ConsumerRecord("atopic", 0, 0, "id", json)
consumerRecord.headers().add("recordSetId", "recordSet".getBytes())
consumerRecord.headers().add("institutionId", "institution".getBytes())
consumerRecord.headers().add("sessionId", "session".getBytes())
val report = Report(consumerRecord)
report
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment