Skip to content

Commit 5988502

Browse files
author
Nikolai Kolesnikov
committed
Added enhanced monitoring for MemoryDB connectors, added the safe mode flag, improved the putStats
1 parent ef68948 commit 5988502

File tree

1 file changed

+152
-38
lines changed

1 file changed

+152
-38
lines changed

glue/sbin/memorydb/CQLReplicator.scala

Lines changed: 152 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import com.amazonaws.services.glue.GlueContext
99
import com.amazonaws.services.glue.log.GlueLogger
1010
import com.amazonaws.services.glue.util.GlueArgParser
1111
import com.amazonaws.services.glue.util.Job
12+
import com.amazonaws.services.glue.util.JsonOptions
1213
import org.apache.spark.SparkConf
1314
import org.apache.spark.SparkContext
1415
import org.apache.spark.sql.Dataset
@@ -35,6 +36,8 @@ import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3, AmazonS3ClientBuilde
3536
import com.amazonaws.services.s3.model.{ObjectMetadata, DeleteObjectsRequest, ObjectListing, S3ObjectSummary}
3637
import com.amazonaws.services.s3.model.S3Object
3738
import com.amazonaws.services.s3.model.S3ObjectInputStream
39+
import com.amazonaws.ClientConfiguration
40+
import com.amazonaws.retry.RetryPolicy
3841

3942
import com.datastax.spark.connector.cql._
4043
import com.datastax.spark.connector._
@@ -49,6 +52,8 @@ import scala.collection.mutable.StringBuilder
4952

5053
import org.json4s._
5154
import org.json4s.jackson.JsonMethods._
55+
import org.json4s.jackson.Serialization
56+
import org.json4s.jackson.Serialization.write
5257

5358
import java.util.Base64
5459
import java.nio.charset.StandardCharsets
@@ -75,11 +80,39 @@ class RedisConnectionException(s: String) extends RuntimeException {
7580
println(s)
7681
}
7782

83+
class StatsS3Exception(s: String) extends RuntimeException {
84+
println(s)
85+
}
86+
87+
sealed trait Stats
88+
89+
case class DiscoveryStats(tile: Int, primaryKeys: Long, updatedTimestamp: String) extends Stats
90+
91+
case class ReplicationStats(tile: Int, primaryKeys: Long, updatedPrimaryKeys: Long, insertedPrimaryKeys: Long, deletedPrimaryKeys: Long, updatedTimestamp: String) extends Stats
92+
7893
case class RedisConfig(pwd: String, usr: String, clusterDnsName: String, clusterPort: Int, sslEnabled: Boolean, maxAttempts: Int, useXXHash64Key: Boolean, connectionTimeout: Int, soTimeout: Int, xxHash64Seed: Long)
7994

8095
object GlueApp {
8196
def main(sysArgs: Array[String]) {
8297

98+
def readReplicationStatsObject(s3Client: com.amazonaws.services.s3.AmazonS3, bucket: String, key: String): ReplicationStats = {
99+
Try {
100+
val s3Object = s3Client.getObject(bucket, key)
101+
val src = Source.fromInputStream(s3Object.getObjectContent())
102+
val json = src.getLines.mkString
103+
src.close()
104+
json
105+
} match {
106+
case Failure(_) => {
107+
ReplicationStats(0, 0, 0, 0, 0, LocalDateTime.now().toString)
108+
}
109+
case Success(json) => {
110+
implicit val formats = DefaultFormats
111+
parse(json).extract[ReplicationStats]
112+
}
113+
}
114+
}
115+
83116
def readRedisConfigFile(s3Client: com.amazonaws.services.s3.AmazonS3, bucket: String, key: String): RedisConfig = {
84117
val s3Object = s3Client.getObject(bucket, key)
85118
val src = Source.fromInputStream(s3Object.getObjectContent())
@@ -190,19 +223,28 @@ object GlueApp {
190223
}
191224
}
192225

193-
val WAIT_TIME = 60000
194226
val sparkContext: SparkContext = new SparkContext()
195227
val glueContext: GlueContext = new GlueContext(sparkContext)
196228
val sparkSession: SparkSession = glueContext.getSparkSession
197229
val sparkConf: SparkConf = sparkContext.getConf
198230
val logger = new GlueLogger
199231
import sparkSession.implicits._
200232

201-
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "TILE", "TOTAL_TILES", "PROCESS_TYPE", "SOURCE_KS", "SOURCE_TBL", "TARGET_KS", "TARGET_TBL", "WRITETIME_COLUMN", "TTL_COLUMN", "S3_LANDING_ZONE", "OFFLOAD_LARGE_OBJECTS").toArray)
233+
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "TILE", "TOTAL_TILES", "PROCESS_TYPE", "SOURCE_KS", "SOURCE_TBL", "TARGET_KS", "TARGET_TBL", "WRITETIME_COLUMN", "TTL_COLUMN", "S3_LANDING_ZONE", "OFFLOAD_LARGE_OBJECTS", "REPLICATION_POINT_IN_TIME", "SAFE_MODE").toArray)
202234
Job.init(args("JOB_NAME"), glueContext, args.asJava)
203235
val jobRunId = args("JOB_RUN_ID")
204236
val currentTile = args("TILE").toInt
205237
val totalTiles = args("TOTAL_TILES").toInt
238+
val safeMode = args("SAFE_MODE")
239+
// Internal configuration
240+
val WAIT_TIME = safeMode match {
241+
case "true" => 25000
242+
case _ => 0
243+
}
244+
val cachingMode = safeMode match {
245+
case "true" => StorageLevel.DISK_ONLY
246+
case _ => StorageLevel.MEMORY_AND_DISK_SER
247+
}
206248
// Internal configuration+
207249
sparkSession.conf.set(s"spark.sql.catalog.ledgerCatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")
208250
sparkSession.conf.set(s"spark.sql.catalog.sourceCluster", "com.datastax.spark.connector.datasource.CassandraCatalog")
@@ -230,9 +272,13 @@ object GlueApp {
230272
val source = s"sourceCluster.$srcKeyspaceName.$srcTableName"
231273
val ttlColumn = args("TTL_COLUMN")
232274
val olo = args("OFFLOAD_LARGE_OBJECTS")
275+
val replicationPointInTime = args("REPLICATION_POINT_IN_TIME").toLong
276+
val defaultPartitions = scala.math.max(2, (sparkContext.defaultParallelism / 2 - 2))
233277

234278
//AmazonS3Client to check if a stop requested issued
235-
val s3client = new AmazonS3Client()
279+
val s3ClientConf = new ClientConfiguration().withRetryPolicy(RetryPolicy.builder().withMaxErrorRetry(5).build())
280+
val s3client = AmazonS3ClientBuilder.standard().withClientConfiguration(s3ClientConf).build()
281+
236282
val redisConfig = readRedisConfigFile(s3client, bcktName, "artifacts/RedisConnector.conf")
237283
preFlightCheck(cassandraConn, srcKeyspaceName, srcTableName, "source")
238284
logger.info("[Cassandra] Preflight check is completed")
@@ -316,8 +362,32 @@ object GlueApp {
316362
deleteObjects(s3client.listObjects(bucket, key))
317363
}
318364

319-
def putStats(bucket: String, key: String, metric: String, value: String): Unit = {
320-
s3client.putObject(bucket, s"$key/$metric", value)
365+
def putStats(bucket: String, key: String, objectName: String, stats: Stats): Unit = {
366+
implicit val formats = DefaultFormats
367+
val (newContent, message) = stats match {
368+
case ds: DiscoveryStats =>
369+
(write(ds), s"Flushing the discovery stats: $key/$objectName")
370+
case rs: ReplicationStats =>
371+
val content = readReplicationStatsObject(s3client, bucket, s"$key/$objectName")
372+
val insertedAggr = content.insertedPrimaryKeys + rs.insertedPrimaryKeys
373+
val updatedAggr = content.updatedPrimaryKeys + rs.updatedPrimaryKeys
374+
val deletedAggr = content.deletedPrimaryKeys + rs.deletedPrimaryKeys
375+
val historicallyInserted = content.primaryKeys + rs.primaryKeys
376+
(write(ReplicationStats(currentTile,
377+
historicallyInserted,
378+
updatedAggr,
379+
insertedAggr,
380+
deletedAggr,
381+
LocalDateTime.now().toString)),
382+
s"Flushing the replication stats: $key/$objectName")
383+
case _ => throw new StatsS3Exception("Unknown stats type")
384+
}
385+
Try {
386+
s3client.putObject(bucket, s"$key/$objectName", newContent)
387+
} match {
388+
case Failure(_) => throw new StatsS3Exception(s"Can't persist the stats to the S3 bucket $bucket")
389+
case Success(_) => logger.info(message)
390+
}
321391
}
322392

323393
def getTTLvalue(jvalue: org.json4s.JValue): BigInt = {
@@ -432,12 +502,15 @@ object GlueApp {
432502
whereStmt.toString
433503
}
434504

435-
def persistToRedis(df: DataFrame, op: String, tile: Int): Unit = {
436-
// Try !df.rdd.isEmpty
437-
// Try df.take(1).isEmpty
438-
if (!df.isEmpty) {
439-
persistToTarget(shuffleDfV2(df.drop("ts", "group")), columns, columnsPos, tile, op)
505+
def persistToRedis(df: DataFrame, op: String, tile: Int): Long = {
506+
val cnt = df.isEmpty match {
507+
case false => {
508+
persistToTarget(shuffleDfV2(df.drop("ts", "group")), columns, columnsPos, tile, op)
509+
df.count()
510+
}
511+
case true => 0
440512
}
513+
cnt
441514
}
442515

443516
def dataReplicationProcess() {
@@ -457,41 +530,60 @@ object GlueApp {
457530
locations.foreach(location => {
458531

459532
val loc = location._1
460-
val sourceDf = sparkSession.read.option("inferSchema", "true").parquet(s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/$loc")
533+
val sourcePath = s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/$loc"
534+
val sourceDf = glueContext.getSourceWithFormat(
535+
connectionType = "s3",
536+
format = "parquet",
537+
options = JsonOptions(s"""{"paths": ["$sourcePath"]}""")
538+
).getDynamicFrame().toDF()
461539
val tile = location._2
462540
val numPartitions = sourceDf.rdd.getNumPartitions
463541
logger.info(s"Number of partitions $numPartitions")
464-
persistToRedis(sourceDf, "insert", tile)
542+
val inserted = persistToRedis(sourceDf, "insert", tile)
543+
465544
session.execute(s"INSERT INTO migration.ledger(ks,tbl,tile,ver,load_status,dt_load, offload_status) VALUES('$srcKeyspaceName','$srcTableName',$tile,'head','SUCCESS', toTimestamp(now()), '')")
466-
val cnt = sourceDf.count()
467-
val content = s"""{"tile":$tile, "primaryKeys":$cnt}"""
545+
546+
val content = ReplicationStats(tile, inserted, 0, 0, 0, LocalDateTime.now().toString)
468547
putStats(landingZone.replaceAll("s3://", ""), s"$srcKeyspaceName/$srcTableName/stats/replication/$tile", "count.json", content)
469548
}
470549
)
471550
}
472-
// [Optimize] to ((tails > 0) && (heads > 0 || heads == 0))
473551
if ((heads > 0 && tails > 0) || (heads == 0 && tails > 0)) {
474552
logger.info("Processing delta...")
475-
val dfTail = sparkSession.read.option("inferSchema", "true").parquet(s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$currentTile.tail").persist(StorageLevel.MEMORY_AND_DISK_SER)
476-
val dfHead = sparkSession.read.option("inferSchema", "true").parquet(s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$currentTile.head").persist(StorageLevel.MEMORY_AND_DISK_SER)
477-
val newInsertsDF = dfTail.as("tail").join(dfHead.as("head"), cond, "leftanti").persist(StorageLevel.MEMORY_AND_DISK_SER)
478-
val newDeletesDF = dfHead.as("head").join(dfTail.as("tail"), cond, "leftanti").persist(StorageLevel.MEMORY_AND_DISK_SER)
553+
val pathTail = s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$currentTile.tail"
554+
val dfTail = glueContext.getSourceWithFormat(
555+
connectionType = "s3",
556+
format = "parquet",
557+
options = JsonOptions(s"""{"paths": ["$pathTail"]}""")
558+
).getDynamicFrame().toDF().drop("group").persist(cachingMode)
559+
560+
val pathHead = s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$currentTile.head"
561+
val dfHead = glueContext.getSourceWithFormat(
562+
connectionType = "s3",
563+
format = "parquet",
564+
options = JsonOptions(s"""{"paths": ["$pathHead"]}""")
565+
).getDynamicFrame().toDF().drop("group").persist(cachingMode)
566+
val newInsertsDF = dfTail.as("tail").join(dfHead.as("head"), cond, "leftanti").persist(cachingMode)
567+
val newDeletesDF = dfHead.as("head").join(dfTail.as("tail"), cond, "leftanti").persist(cachingMode)
568+
479569
columnTs match {
480570
case "None" => {
481-
persistToRedis(newInsertsDF, "insert", currentTile)
482-
persistToRedis(newDeletesDF, "delete", currentTile)
571+
val inserted = persistToRedis(newInsertsDF, "insert", currentTile)
572+
val deleted = persistToRedis(newDeletesDF, "delete", currentTile)
573+
val content = ReplicationStats(currentTile, 0, 0, inserted, deleted, LocalDateTime.now().toString)
574+
putStats(landingZone.replaceAll("s3://", ""), s"$srcKeyspaceName/$srcTableName/stats/replication/$currentTile", "count.json", content)
483575
}
484576
case _ => {
485577
val newUpdatesDF = dfTail.as("tail").
486-
// Broadcast join
487578
join(broadcast(dfHead.as("head")), cond, "inner").
488579
filter($"tail.ts" > $"head.ts").
489-
// [Optimize] selectExpr to select
490580
selectExpr(pks.map(x => s"tail.$x"): _*).
491-
persist(StorageLevel.MEMORY_AND_DISK_SER)
492-
persistToRedis(newInsertsDF, "insert", currentTile)
493-
persistToRedis(newUpdatesDF, "update", currentTile)
494-
persistToRedis(newDeletesDF, "delete", currentTile)
581+
persist(cachingMode)
582+
val inserted = persistToRedis(newInsertsDF, "insert", currentTile)
583+
val updated = persistToRedis(newUpdatesDF, "update", currentTile)
584+
val deleted = persistToRedis(newDeletesDF, "delete", currentTile)
585+
val content = ReplicationStats(currentTile, 0, updated, inserted, deleted, LocalDateTime.now().toString)
586+
putStats(landingZone.replaceAll("s3://", ""), s"$srcKeyspaceName/$srcTableName/stats/replication/$currentTile", "count.json", content)
495587
newUpdatesDF.unpersist()
496588
}
497589
}
@@ -511,9 +603,27 @@ object GlueApp {
511603
}
512604

513605
def keysDiscoveryProcess() {
514-
val primaryKeysDf = sparkSession.read.option("inferSchema", "true").table(source)
515-
val primaryKeysDfwithTS = primaryKeysDf.selectExpr(pkFinal.map(c => c): _*)
516-
val groupedPkDF = primaryKeysDfwithTS.withColumn("group", abs(xxhash64(pkFinalWithoutTs.map(c => col(c)): _*)) % totalTiles).repartition(col("group")).persist(StorageLevel.MEMORY_AND_DISK)
606+
val primaryKeysDf = columnTs match {
607+
case "None" =>
608+
sparkSession.read.option("inferSchema", "true").
609+
table(source).
610+
selectExpr(pkFinal.map(c => c): _*).
611+
persist(cachingMode)
612+
case ts if ts != "None" && replicationPointInTime == 0 =>
613+
sparkSession.read.option("inferSchema", "true").
614+
table(source).
615+
selectExpr(pkFinal.map(c => c): _*).
616+
persist(cachingMode)
617+
case ts if ts != "None" && replicationPointInTime > 0 =>
618+
sparkSession.read.option("inferSchema", "true").
619+
table(source).
620+
selectExpr(pkFinal.map(c => c): _*).
621+
filter(($"ts" > replicationPointInTime) && ($"ts".isNotNull)).
622+
persist(cachingMode)
623+
}
624+
625+
val groupedPkDF = primaryKeysDf.withColumn("group", abs(xxhash64(pkFinalWithoutTs.map(c => col(c)): _*)) % totalTiles).
626+
repartition(col("group")).persist(cachingMode)
517627
val tiles = (0 to totalTiles - 1).toList.par
518628
tiles.foreach(tile => {
519629
keyspacesConn.withSessionDo {
@@ -538,8 +648,13 @@ object GlueApp {
538648
if ((!tail.isEmpty && tailLoadStatus == "SUCCESS") && (!head.isEmpty && headLoadStatus == "SUCCESS")) {
539649
logger.info("Swapping the tail and the head")
540650

541-
val staged = groupedPkDF.where(col("group") === tile).repartition(pks.map(c => col(c)): _*)
542-
val oldTail = sparkSession.read.option("inferSchema", "true").parquet(s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$tile.tail").repartition(pks.map(c => col(c)): _*)
651+
val staged = groupedPkDF.where(col("group") === tile).repartition(defaultPartitions, pks.map(c => col(c)): _*)
652+
val oldTailPath = s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$tile.tail"
653+
val oldTail = glueContext.getSourceWithFormat(
654+
connectionType = "s3",
655+
format = "parquet",
656+
options = JsonOptions(s"""{"paths": ["$oldTailPath"]}""")
657+
).getDynamicFrame().toDF().repartition(defaultPartitions, pks.map(c => col(c)): _*)
543658

544659
oldTail.write.mode("overwrite").save(s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$tile.head")
545660
staged.write.mode("overwrite").save(s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$tile.tail")
@@ -555,27 +670,26 @@ object GlueApp {
555670
// The second round (tail and head)
556671
if (tail.isEmpty && (!head.isEmpty && headLoadStatus == "SUCCESS")) {
557672
logger.info("Loading a tail but keeping the head")
558-
val staged = groupedPkDF.where(col("group") === tile).repartition(pks.map(c => col(c)): _*)
673+
val staged = groupedPkDF.where(col("group") === tile).repartition(defaultPartitions, pks.map(c => col(c)): _*)
559674
staged.write.mode("overwrite").save(s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$tile.tail")
560675
session.execute(s"INSERT INTO migration.ledger(ks,tbl,tile,offload_status,dt_offload,location, ver, load_status, dt_load) VALUES('$srcKeyspaceName','$srcTableName',$tile, 'SUCCESS', toTimestamp(now()), 'tile_$tile.tail', 'tail','','')")
561-
562676
}
563677

564678
// Historical upload, the first round (head)
565679
if (tail.isEmpty && head.isEmpty) {
566680
logger.info("Loading a head")
567-
val staged = groupedPkDF.where(col("group") === tile).repartition(pks.map(c => col(c)): _*)
681+
val staged = groupedPkDF.where(col("group") === tile).repartition(defaultPartitions, pks.map(c => col(c)): _*)
568682
staged.write.mode("overwrite").save(s"$landingZone/$srcKeyspaceName/$srcTableName/primaryKeys/tile_$tile.head")
569683
session.execute(s"INSERT INTO migration.ledger(ks,tbl,tile,offload_status,dt_offload,location, ver, load_status, dt_load) VALUES('$srcKeyspaceName','$srcTableName',$tile, 'SUCCESS', toTimestamp(now()), 'tile_$tile.head', 'head','','')")
570-
val cnt = staged.count()
571-
val content = s"""{"tile":$tile, "primaryKeys":$cnt}"""
684+
val content = DiscoveryStats(tile, staged.count(), LocalDateTime.now().toString)
572685
putStats(landingZone.replaceAll("s3://", ""), s"$srcKeyspaceName/$srcTableName/stats/discovery/$tile", "count.json", content)
573686
}
574687
}
575688
session.close()
576689
}
577690
})
578691
groupedPkDF.unpersist()
692+
primaryKeysDf.unpersist()
579693
}
580694

581695
Iterator.continually(stopRequested(bcktName)).takeWhile(_ == false).foreach {
@@ -592,7 +706,7 @@ object GlueApp {
592706
sys.exit()
593707
}
594708
}
595-
// [Optimize] remove it
709+
logger.info(s"Cooldown period $WAIT_TIME ms")
596710
Thread.sleep(WAIT_TIME)
597711
}
598712
}

0 commit comments

Comments
 (0)