Unverified Commit 222b931f authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

delete records in api index as well

parent 199ed678
Pipeline #46759 failed with stages
in 2 minutes and 49 seconds
......@@ -11,17 +11,17 @@ lazy val root = (project in file("."))
.enablePlugins(GitVersioning)
.settings(
name := "Record Deleter",
assemblyJarName in assembly := "app.jar",
test in assembly := {},
assemblyMergeStrategy in assembly := {
assembly / assemblyJarName := "app.jar",
assembly / test := {},
assembly / assemblyMergeStrategy := {
case "log4j.properties" => MergeStrategy.first
case "module-info.class" => MergeStrategy.discard
case "log4j2.xml" => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
mainClass in assembly := Some("ch.memobase.App"),
assembly / mainClass := Some("ch.memobase.App"),
resolvers ++= Seq(
"Memobase Utils" at "https://dl.bintray.com/memoriav/memobase"
),
......
......@@ -13,7 +13,8 @@ reportingTopicName: mb-dd-reporting-prod
groupId: prod-deleter
elasticHostConfigs: prod-elastic-configs
elasticIndexConfigs: prod-documents-index
elasticFrontendIndexConfigs: prod-documents-index
elasticApiIndexConfigs: prod-index-jsonld
mediaVolumeClaimName: "media-volume-claim"
mediaFolderRootPath: "/data"
......
......@@ -13,7 +13,8 @@ reportingTopicName: mb-dd-reporting-stage
groupId: stage-deleter
elasticHostConfigs: prod-elastic-configs
elasticIndexConfigs: stage-documents-index
elasticFrontendIndexConfigs: stage-documents-index
elasticApiIndexConfigs: stage-index-jsonld
mediaVolumeClaimName: "stage-media-volume-claim"
mediaFolderRootPath: "/data"
......
......@@ -13,7 +13,8 @@ reportingTopicName: mb-dd-reporting-prod
groupId: test-deleter
elasticHostConfigs: prod-elastic-configs
elasticIndexConfigs: test-documents-index
elasticFrontendIndexConfigs: test-documents-index
elasticApiIndexConfigs: test-index-jsonld
mediaVolumeClaimName: "test-media-volume-claim"
mediaFolderRootPath: "/data"
......
......@@ -38,8 +38,6 @@ spec:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.elasticHostConfigs }}"
- configMapRef:
name: "{{ .Values.elasticIndexConfigs }}"
- configMapRef:
name: "{{ .Values.mariadbDatabaseConfigs }}"
- configMapRef:
......@@ -55,6 +53,16 @@ spec:
secretKeyRef:
name: {{ .Values.mariadbUserSecrets }}
key: MARIADB_USER
- name: ELASTIC_FRONTEND_INDEX:
valueFrom:
configMapKeyRef:
name: {{ .Values.elasticFrontendIndexConfigs }}
key: ELASTIC_INDEX
- name: ELASTIC_API_INDEX:
valueFrom:
configMapKeyRef:
name: {{ .Values.elasticApiIndexConfigs }}
key: ELASTIC_INDEX
restartPolicy: Always
volumes:
- name: media-volume
......
......@@ -19,7 +19,8 @@ reportingTopicName: placeholder
clientId: placeholder
elasticHostConfigs: placeholder
elasticIndexConfigs: placeholder
elasticFrontendIndexConfigs: placeholder
elasticApiIndexConfigs: placeholder
mediaVolumeClaimName: placeholder
mediaFolderRootPath: placeholder
......
......@@ -15,4 +15,4 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
sbt.version=1.3.13
sbt.version=1.6.2
// DO NOT EDIT! This file is auto-generated.
// This file enables sbt-bloop to create bloop config files.
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.4.4-13-408f4d80")
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.4.13")
......@@ -38,7 +38,14 @@ object App
x
}
.map(rec => buildRecordProcessor(rec.key(), rec.headers()))
.map(rec => if (rec.dryRun) existsInES(rec) else deleteInES(rec))
.map(rec =>
if (rec.dryRun) existsInES(rec, esFrontendIndex)
else deleteInES(rec, esFrontendIndex)
)
.map(rec =>
if (rec.dryRun) existsInES(rec, esApiIndex)
else deleteInES(rec, esApiIndex)
)
.map(rec => deleteMariaDBEntries(rec))
.map(rec => deleteBinaries(rec))
.map(rec => deleteCachedBinaries(rec))
......
......@@ -59,12 +59,16 @@ trait AppSettings {
val reportingTopic: String = sys.env("TOPIC_PROCESS")
val esHost: String = {
val host = sys.env("ELASTIC_HOST")
val scheme = if (host.startsWith("http://")) {""} else {"http://"}
val scheme = if (host.startsWith("http://")) { "" }
else { "http://" }
s"$scheme$host:${sys.env("ELASTIC_PORT")}"
}
val esIndex: String = sys.env("ELASTIC_INDEX")
val mariaDBHost: String = s"jdbc:mariadb://${sys.env("MARIADB_HOST")}:${sys.env("MARIADB_PORT")}/${sys.env("MARIADB_DATABASE")}"
val mariaDBPassword: String = sys.env("MARIADB_PASSWORD").replaceAll("\\s+$", "")
val esFrontendIndex: String = sys.env("ELASTIC_FRONTEND_INDEX")
val esApiIndex: String = sys.env("ELASTIC_API_INDEX")
val mariaDBHost: String =
s"jdbc:mariadb://${sys.env("MARIADB_HOST")}:${sys.env("MARIADB_PORT")}/${sys.env("MARIADB_DATABASE")}"
val mariaDBPassword: String =
sys.env("MARIADB_PASSWORD").replaceAll("\\s+$", "")
val mariaDBTables: List[String] = sys
.env("MARIADB_TABLES")
.split(", *")
......
......@@ -18,12 +18,22 @@
package ch.memobase
import ch.memobase.models.{ProcessedRecord, ProcessingFatal, ProcessingIgnore, ProcessingSuccess}
import ch.memobase.models.{
ProcessedRecord,
ProcessingFatal,
ProcessingIgnore,
ProcessingSuccess
}
import com.sksamuel.elastic4s
import com.sksamuel.elastic4s.ElasticApi.deleteById
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.{
ElasticClient,
ElasticProperties,
RequestFailure,
RequestSuccess
}
import org.apache.logging.log4j.scala.Logging
import scala.concurrent.ExecutionContext.Implicits.global
......@@ -36,60 +46,81 @@ trait MemobaseElasticClient {
ElasticClient(JavaClient(props))
}
def existsInES(record: ProcessedRecord): ProcessedRecord = {
def existsInES(record: ProcessedRecord, esIndex: String): ProcessedRecord = {
client
.execute {
logger.info(s"Deleting ${record.recordId} in index $esIndex")
logger.info(
s"Deleting ${record.recordId} in Elasticsearch index $esIndex"
)
exists(record.recordId, esIndex)
}
.map {
case success: RequestSuccess[Boolean] if success.result =>
logger.debug(s"Record ${record.recordId} exists in Elasticsearch")
logger.debug(
s"Record ${record.recordId} exists in Elasticsearch index $esIndex"
)
record.setRecordStatus(
ProcessingSuccess,
"Record exists in Elasticsearch"
)
case _: RequestSuccess[_] =>
logger.debug(s"Record ${record.recordId} does not exist in Elasticsearch")
logger.debug(
s"Record ${record.recordId} does not exist in Elasticsearch index $esIndex"
)
record.setRecordStatus(
ProcessingIgnore,
"Record does not exist in Elasticearch"
"Record does not exist in Elasticearch index $esIndex"
)
case failure: RequestFailure =>
logger.warn(s"Check of existence for record ${record.recordId} failed: ${failure.error.reason}")
logger.warn(
s"Check of existence for record ${record.recordId} in Elasticsearch index $esIndex failed: ${failure.error.reason}"
)
record.setRecordStatus(
ProcessingFatal,
s"Check for existence in Elasticsearch failed: ${failure.error.reason}"
s"Check for existence in Elasticsearch index $esIndex failed: ${failure.error.reason}"
)
}.await
}
.await
}
def deleteInES(record: ProcessedRecord): ProcessedRecord = {
def deleteInES(record: ProcessedRecord, esIndex: String): ProcessedRecord = {
client
.execute {
logger.info(s"Deleting ${record.recordId} in index $esIndex${if (record.dryRun) " (dry-run mode)" else ""}")
logger.info(
s"Deleting ${record.recordId} in Elasticsearch index $esIndex${if (record.dryRun) " (dry-run mode)"
else ""}"
)
deleteById(esIndex, record.recordId)
}
.map {
case success: RequestSuccess[_] if success.status < 400 =>
logger.debug(s"Deletion of record ${record.recordId} successful${if (record.dryRun) " (dry-run mode)" else ""}")
logger.debug(
s"Deletion of record ${record.recordId} in Elasticsearch index $esIndex successful${if (record.dryRun) " (dry-run mode)"
else ""}"
)
record.setRecordStatus(
ProcessingSuccess,
"Record deletion in Elasticsearch successful"
"Record deletion in Elasticsearch index $esIndex successful"
)
case _: RequestSuccess[_] =>
logger.debug(s"Deletion of record ${record.recordId} failed since record does not exist${if (record.dryRun) " (dry-run mode)" else ""}")
logger.debug(
s"Deletion of record ${record.recordId} in Elasticsearch index $esIndex failed since record does not exist${if (record.dryRun) " (dry-run mode)" else ""}"
)
record.setRecordStatus(
ProcessingIgnore,
"Record does not exist in Elasticearch"
"Record does not exist in Elasticearch index $esIndex"
)
case failure: RequestFailure =>
logger.warn(s"Deletion of record ${record.recordId} failed${if (record.dryRun) " (dry-run mode)" else ""}: ${failure.error.reason}")
logger.warn(
s"Deletion of record ${record.recordId} in Elasticsearch index $esIndex failed${if (record.dryRun) " (dry-run mode)"
else ""}: ${failure.error.reason}"
)
record.setRecordStatus(
ProcessingFatal,
s"Record deletion in Elasticsearch failed${if (record.dryRun) " (dry-run mode)" else ""}: ${failure.error.reason}"
s"Record deletion in Elasticsearch index $esIndex failed${if (record.dryRun) " (dry-run mode)"
else ""}: ${failure.error.reason}"
)
}
.await
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment