Unverified Commit b848e9a7 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

add dry run mode

parent 99f667ac
Pipeline #22169 passed with stages
in 3 minutes and 34 seconds
......@@ -38,7 +38,7 @@ object App
x
}
.map(rec => buildRecordProcessor(rec.key(), rec.headers()))
.map(rec => deleteInES(rec))
.map(rec => if (rec.dryRun) existsInES(rec) else deleteInES(rec))
.map(rec => deleteMariaDBEntries(rec))
.map(rec => deleteBinaries(rec))
.foreach(rec => reportResult(rec))
......
......@@ -18,7 +18,12 @@
package ch.memobase
import ch.memobase.models.{ProcessedRecord, ProcessingFatal, ProcessingIgnore, ProcessingSuccess}
import ch.memobase.models.{
ProcessedRecord,
ProcessingFatal,
ProcessingIgnore,
ProcessingSuccess
}
import org.apache.logging.log4j.scala.Logging
import java.io.File
......@@ -34,7 +39,9 @@ trait BinaryHandler {
val result = record.binaries
.map(_.replaceFirst("file://", ""))
.map(fileName => (new File(fileName), fileName))
.map(file => (file._1.delete, file._2))
.map(file =>
(if (record.dryRun) file._1.exists else file._1.delete, file._2)
)
.foldLeft[List[String]](List())((errors, x) =>
if (!x._1) {
logger.warn(s"Deletion of file ${x._2} failed!")
......@@ -44,13 +51,17 @@ trait BinaryHandler {
}
)
if (result.isEmpty) {
logger.debug(s"Deletion of binaries for record ${record.recordId} successful")
logger.debug(
s"Deletion of binaries for record ${record.recordId} successful"
)
record.setBinariesStatus(
ProcessingSuccess,
"Binaries deletion successful"
)
} else {
logger.warn(s"Deletion of binaries for record ${record.recordId} failed")
logger.warn(
s"Deletion of binaries for record ${record.recordId} failed"
)
record.setBinariesStatus(ProcessingFatal, result.mkString("; "))
}
} else {
......
......@@ -183,7 +183,11 @@ trait MariadbClient {
deleteRows(s"$digitalObjectId-poster")
val introDeletionResult =
deleteRows(s"$digitalObjectId-intro")
Try(connection.commit()) match {
(if (record.dryRun) {
Try(connection.rollback())
} else {
Try(connection.commit())
}) match {
case Success(_) =>
val updatedRecord = updateDigitalObjectResult(
record,
......
......@@ -36,6 +36,35 @@ trait MemobaseElasticClient {
ElasticClient(JavaClient(props))
}
def existsInES(record: ProcessedRecord): ProcessedRecord = {
client
.execute {
logger.info(s"Deleting ${record.recordId} in index $esIndex")
exists(record.recordId, esIndex)
}
.map {
case success: RequestSuccess[Boolean] if success.result =>
logger.debug(s"Record ${record.recordId} exists in Elasticsearch")
record.setRecordStatus(
ProcessingSuccess,
"Record exists in Elasticsearch"
)
case _: RequestSuccess[_] =>
logger.debug(s"Record ${record.recordId} does not exist in Elasticsearch")
record.setRecordStatus(
ProcessingIgnore,
"Record does not exist in Elasticearch"
)
case failure: RequestFailure =>
logger.warn(s"Check of existence for record ${record.recordId} failed: ${failure.error.reason}")
record.setRecordStatus(
ProcessingFatal,
s"Check for existence in Elasticsearch failed: ${failure.error.reason}"
)
}.await
}
def deleteInES(record: ProcessedRecord): ProcessedRecord = {
client
.execute {
......@@ -50,7 +79,7 @@ trait MemobaseElasticClient {
"Record deletion in Elasticsearch successful"
)
case _: RequestSuccess[_] =>
logger.debug(s"Deletion of record ${record.recordId} failed since record does not exists")
logger.debug(s"Deletion of record ${record.recordId} failed since record does not exist")
record.setRecordStatus(
ProcessingIgnore,
"Record does not exist in Elasticearch"
......
......@@ -44,13 +44,17 @@ trait MessageHandler {
records
}
def buildRecordProcessor(recordId: String, headers: Headers): ProcessedRecord =
def buildRecordProcessor(
recordId: String,
headers: Headers
): ProcessedRecord =
ProcessedRecord(
recordId.replace("https://memobase.ch/record/", ""),
None,
List(),
ProcessResults(),
headers
headers,
headers.headers("dryRun").asScala.last.value()(0).toInt == 1
)
def consumerClose(): Unit = {
......
......@@ -25,7 +25,8 @@ case class ProcessedRecord(
thumbnailId: Option[String],
binaries: List[String],
processResults: ProcessResults,
headers: Headers
headers: Headers,
dryRun: Boolean
) {
def setRecordStatus(status: ProcessingStatus, msg: String): ProcessedRecord =
this.copy(processResults =
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment