@@ -10,10 +10,10 @@ JOB_NAME=CQLReplicator
10
10
TILES=2
11
11
PROCESS_TYPE_DISCOVERY=discovery
12
12
PROCESS_TYPE_REPLICATION=replication
13
- SOURCE_KS=ks_test_cql_replicator
14
- SOURCE_TBL=test_cql_replicator
15
- TARGET_KS=ks_test_cql_replicator
16
- TARGET_TBL=test_cql_replicator
13
+ SOURCE_KS=" "
14
+ SOURCE_TBL=" "
15
+ TARGET_KS=" "
16
+ TARGET_TBL=" "
17
17
WRITETIME_COLUMN=" None"
18
18
TTL_COLUMN=" None"
19
19
S3_LANDING_ZONE=" "
@@ -41,6 +41,7 @@ SKIP_GLUE_CONNECTOR=false
41
41
SKIP_KEYSPACES_LEDGER=false
42
42
JSON_MAPPING=" "
43
43
REPLICATION_POINT_IN_TIME=0
44
+ REPLICATION_STATS_ENABLED=false
44
45
OS=$( uname -a | awk ' {print $1}' )
45
46
46
47
# Progress bar configuration
@@ -105,6 +106,25 @@ function check_input() {
105
106
return 0
106
107
}
107
108
109
+ print_stat_table () {
110
+ # Assign the arguments to variables
111
+ local tile=$1
112
+ local inserts=$2
113
+ local updates=$3
114
+ local deletes=$4
115
+ local timestamp=$5
116
+ local head=$6
117
+ if [[ $head == true ]]; then
118
+ echo " +------------------------------------------------------------------------+"
119
+ # Print the table header with a border
120
+ printf " | %-8s | %-8s | %-8s | %-8s | %-20s |\n" " Tile" " Inserts" " Updates" " Deletes" " Timestamp"
121
+ echo " +------------------------------------------------------------------------+"
122
+ fi
123
+ # Print the table data with a border
124
+ printf " | %-8d | %-8d | %-8d | %-8d | %-20s |\n" $tile $inserts $updates $deletes " $timestamp "
125
+ echo " +------------------------------------------------------------------------+"
126
+ }
127
+
108
128
function check_discovery_runs() {
109
129
local rs
110
130
local mode
@@ -176,7 +196,7 @@ function uploader_helper() {
176
196
progress $curr_pos $final_pos " Uploading $artifact_name "
177
197
if ls " $path_to_conf /$artifact_name " > /dev/null
178
198
then
179
- progress $next_pos $final_pos " Uploading $artifact_name "
199
+ progress $next_pos $final_pos " Uploading $artifact_name "
180
200
aws s3 cp " $path_to_conf " /" $artifact_name " " $S3_LANDING_ZONE " /artifacts/" $artifact_name " > /dev/null
181
201
else
182
202
log " ERROR: $path_to_conf /$artifact_name not found"
@@ -305,7 +325,7 @@ function Init {
305
325
sleep 25
306
326
if ls " $path_to_scala " /CQLReplicator.scala
307
327
then
308
- progress 3 5 " Uploading CQLReplicator.scala "
328
+ progress 3 5 " Uploading CQLReplicator.scala "
309
329
aws s3 cp " $path_to_scala " /CQLReplicator.scala " $glue_bucket_artifacts " /scripts/CQLReplicator.scala > /dev/null
310
330
else
311
331
log " ERROR: $path_to_scala /CQLReplicator.scala not found"
@@ -316,7 +336,7 @@ function Init {
316
336
# Create Glue Connector
317
337
local glue_conn_name
318
338
if [[ $SKIP_GLUE_CONNECTOR == false ]]; then
319
- progress 3 5 " Creating Glue connector and CQLReplicator job "
339
+ progress 3 5 " Creating Glue connector and CQLReplicator job "
320
340
glue_conn_name=$( echo cql-replicator-" $( uuidgen) " | tr ' [:upper:]' ' [:lower:]' )
321
341
aws glue create-connection --connection-input ' {
322
342
"Name":"' $glue_conn_name ' ",
@@ -395,76 +415,30 @@ function Init {
395
415
fi
396
416
397
417
if [[ $SKIP_KEYSPACES_LEDGER == true ]]; then
398
- progress 4 5 " Skipping CQLReplicator's internal keyspace"
399
- progress 5 5 " Skipping CQLReplicator's internal table"
418
+ progress 4 5 " Skipping CQLReplicator's internal keyspace "
419
+ progress 5 5 " Skipping CQLReplicator's internal table "
400
420
fi
401
421
402
422
if [[ $SKIP_KEYSPACES_LEDGER == false ]]; then
403
- progress 4 5 " Creating CQLReplicator's internal keyspace and table "
423
+ progress 4 5 " Creating CQLReplicator's internal resources "
404
424
# Create a keyspace - migration
405
425
aws keyspaces create-keyspace --keyspace-name migration --region " $AWS_REGION " > /dev/null
406
426
sleep 20
407
427
408
428
# Create a table - ledger
409
429
aws keyspaces create-table --keyspace-name migration --table-name ledger --schema-definition ' {
410
- "allColumns": [
411
- {
412
- "name": "ks",
413
- "type": "text"
414
- },
415
- {
416
- "name": "tbl",
417
- "type": "text"
418
- },
419
- {
420
- "name": "tile",
421
- "type": "int"
422
- },
423
- {
424
- "name": "ver",
425
- "type": "text"
426
- },
427
- {
428
- "name": "dt_load",
429
- "type": "timestamp"
430
- },
431
- {
432
- "name": "dt_offload",
433
- "type": "timestamp"
434
- },
435
- {
436
- "name": "load_status",
437
- "type": "text"
438
- },
439
- {
440
- "name": "location",
441
- "type": "text"
442
- },
443
- {
444
- "name": "offload_status",
445
- "type": "text"
446
- }
447
- ],
448
- "partitionKeys": [
449
- {
450
- "name": "ks"
451
- },
452
- {
453
- "name": "tbl"
454
- }
455
- ],
456
- "clusteringKeys": [
457
- {
458
- "name": "tile",
459
- "orderBy": "ASC"
460
- },
461
- {
462
- "name": "ver",
463
- "orderBy": "ASC"
464
- }
465
- ]
466
- }' --region " $AWS_REGION " > /dev/null
467
- progress 5 5 " Creating CQLReplicator's internal keyspace and table"
430
+ "allColumns": [ { "name": "ks", "type": "text" },
431
+ { "name": "tbl", "type": "text" },
432
+ { "name": "tile", "type": "int" },
433
+ { "name": "ver", "type": "text" },
434
+ { "name": "dt_load", "type": "timestamp" },
435
+ { "name": "dt_offload", "type": "timestamp" },
436
+ { "name": "load_status", "type": "text" },
437
+ { "name": "location", "type": "text" },
438
+ { "name": "offload_status", "type": "text" } ],
439
+ "partitionKeys": [ { "name": "ks" }, { "name": "tbl" } ],
440
+ "clusteringKeys": [ { "name": "tile", "orderBy": "ASC" }, { "name": "ver", "orderBy": "ASC" } ] }' --region " $AWS_REGION " > /dev/null
441
+ progress 5 5 " Creating CQLReplicator's internal resources "
468
442
fi
469
443
470
444
log " Deploy is completed"
@@ -514,7 +488,7 @@ function Start_Discovery {
514
488
function Start_Replication {
515
489
cnt=0
516
490
KEYS_PER_TILE=$( aws s3 cp " $S3_LANDING_ZONE " /" $SOURCE_KS " /" $SOURCE_TBL " /stats/discovery/" $cnt " /count.json - | head | jq ' .primaryKeys' )
517
- log " Average primary keys per tile is $KEYS_PER_TILE "
491
+ log " Sampled primary keys per tile is $KEYS_PER_TILE "
518
492
local workers=$(( 2 + KEYS_PER_TILE/ ROWS_PER_WORKER ))
519
493
while [ $cnt -lt $TILES ]
520
494
do
@@ -727,6 +701,10 @@ while (( "$#" )); do
727
701
SKIP_KEYSPACES_LEDGER=true
728
702
shift 1
729
703
;;
704
+ --replication-stats-enabled)
705
+ REPLICATION_STATS_ENABLED=true
706
+ shift 1
707
+ ;;
730
708
--)
731
709
shift
732
710
break
@@ -768,6 +746,21 @@ function Gather_Stats() {
768
746
then
769
747
total_per_tile=$( aws s3 cp " $S3_LANDING_ZONE " /" $SOURCE_KS " /" $SOURCE_TBL " /stats/" $process_type " /" $tile " /count.json - | head | jq ' .primaryKeys' ) && REPLICATED_TOTAL=$(( REPLICATED_TOTAL + total_per_tile ))
770
748
fi
749
+ if [[ $REPLICATION_STATS_ENABLED == true ]]; then
750
+ local inserted=0
751
+ inserted=$( aws s3 cp " $S3_LANDING_ZONE " /" $SOURCE_KS " /" $SOURCE_TBL " /stats/" $process_type " /" $tile " /count.json - | head | jq ' .insertedPrimaryKeys' )
752
+ local updated=0
753
+ updated=$( aws s3 cp " $S3_LANDING_ZONE " /" $SOURCE_KS " /" $SOURCE_TBL " /stats/" $process_type " /" $tile " /count.json - | head | jq ' .updatedPrimaryKeys' )
754
+ local deleted=0
755
+ deleted=$( aws s3 cp " $S3_LANDING_ZONE " /" $SOURCE_KS " /" $SOURCE_TBL " /stats/" $process_type " /" $tile " /count.json - | head | jq ' .deletedPrimaryKeys' )
756
+ local timestamp=" "
757
+ timestamp=$( aws s3 cp " $S3_LANDING_ZONE " /" $SOURCE_KS " /" $SOURCE_TBL " /stats/" $process_type " /" $tile " /count.json - | head | jq ' .updatedTimestamp' )
758
+ local header=true
759
+ if [[ $tile != 0 ]]; then
760
+ header=false
761
+ fi
762
+ print_stat_table " $tile " " $inserted " " $updated " " $deleted " " $timestamp " " $header "
763
+ fi
771
764
fi
772
765
fi
773
766
}
@@ -786,6 +779,10 @@ if [[ $STATE == stats ]]; then
786
779
check_input " $SOURCE_TBL " " ERROR: source table name is empty, must be provided"
787
780
check_input " $S3_LANDING_ZONE " " ERROR: landing zone must be provided"
788
781
check_input " $AWS_REGION " " ERROR: landing zone must be provided"
782
+ check_input " $SOURCE_KS " " ERROR: source keyspace name is empty, must be provided"
783
+ check_input " $SOURCE_TBL " " ERROR: source table name is empty, must be provided"
784
+ check_input " $TARGET_TBL " " ERROR: target table name is empty, must be provided"
785
+ check_input " $TARGET_KS " " ERROR: target keyspace name is empty, must be provided"
789
786
# the barrier without checking if the discovery job is running
790
787
barrier " false"
791
788
tile=0
@@ -795,7 +792,14 @@ if [[ $STATE == stats ]]; then
795
792
Gather_Stats $tile " replication"
796
793
(( tile++ ))
797
794
done
798
-
799
795
log " Discovered rows in" " $SOURCE_KS " ." $SOURCE_TBL " is " $DISCOVERED_TOTAL "
800
796
log " Replicated rows in" " $TARGET_KS " ." $TARGET_TBL " is " $REPLICATED_TOTAL "
797
+ if [[ $REPLICATION_STATS_ENABLED == true ]]; then
798
+ t=0
799
+ while [ $t -lt " $TILES " ]
800
+ do
801
+ Gather_Stats $t " detailed-replication"
802
+ (( t++ ))
803
+ done
804
+ fi
801
805
fi
0 commit comments