Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ __pycache__
*.egg-info
.DS_Store
*.bak

debezium-source-custom-plugin.json
debezium-connector-mysql-v2.4.0.zip
debezium-connector-mysql-2.4.0.Final-plugin.tar.gz
jcustenborder-kafka-config-provider-aws-0.1.2.zip
/debezium-connector-mysql
/.kiro
msk-connector-worker-config.b64
msk-connector-worker-config.txt
# CDK asset staging directory
.cdk.staging
cdk.out
33 changes: 22 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ Before synthesizing the CloudFormation, make sure getting a Debezium source conn

1. Create a custom plugin

(a) Download the MySQL connector plugin for the latest stable release from the [Debezium](https://debezium.io/releases/) site.<br/>
(a) Download the MySQL connector plugin for the latest stable release from the [Debezium](https://debezium.io/releases/2.4/) site.<br/>
View supported MSK Connector version [here](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-debeziumsource-connector-example-steps.html#:~:text=Create%20a%20custom%20plugin,1%20folder)

(b) Download and extract the [AWS Secrets Manager Config Provider](https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws).<br/>
(c) After completing steps (a), (b) above, you may have the following archives:
- `debezium-connector-mysql-2.4.0.Final-plugin.tar.gz`: Debezim MySQL Connector
Expand Down Expand Up @@ -102,9 +104,13 @@ Before synthesizing the CloudFormation, make sure getting a Debezium source conn
value.converter.schemas.enable=<i>false</i>
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers=secretManager
config.providers.secretManager.param.aws.region=<i>us-east-1</i>
config.providers.secretManager.param.aws.region=<i>us-west-2</i>
</pre>
(b) Run the following AWS CLI command to create your custom worker configuration.<br/>
(b) Run the following to convert to base64
<pre>
openssl base64 -A -in msk-connector-worker-config.txt > msk-connector-worker-config.b64
</pre>
(C) Run the following AWS CLI command to create your custom worker configuration.<br/>
Replace the following values:

- `my-worker-config-name` - a descriptive name for your custom worker configuration (e.g., `AuroraMySQLSource` )
Expand All @@ -113,7 +119,7 @@ Before synthesizing the CloudFormation, make sure getting a Debezium source conn
<pre>
aws kafkaconnect create-worker-configuration \
--name <i>&lt;my-worker-config-name&gt;</i> \
--properties-file-content <i>&lt;encoded-properties-file-content-string&gt;</i>
--properties-file-content file://<i>&lt;encoded-properties-file-content-string&gt;</i>
</pre>
You should see output similar to the following example on the AWS Web console.
![msk-connect-worker-configurations](assets/msk-connect-worker-configurations.png)
Expand All @@ -126,7 +132,7 @@ Before synthesizing the CloudFormation, make sure getting a Debezium source conn
value.converter.schemas.enable=false
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers=secretManager
config.providers.secretManager.param.aws.region=us-east-1
config.providers.secretManager.param.aws.region=us-west-2
</pre>

:information_source: To learn more about how to create a Debezium source connector, see [Debezium source connector with configuration provider](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-debeziumsource-connector-example.html)
Expand Down Expand Up @@ -164,6 +170,11 @@ Now you can now synthesize the CloudFormation template for this code.

## (Step 1) Creating Aurora MySQL cluster

Run CDK Bootstrap
<pre>
(.venv) $ cdk bootstrap
</pre>

Create an Aurora MySQL Cluster
<pre>
(.venv) $ cdk deploy MSKServerlessToS3VpcStack \
Expand All @@ -180,17 +191,18 @@ Create a MSK Serverless Cluster

## (Step 3) Confirm that binary logging is enabled

Create a bastion host to access the Aurora MySQL cluster
Create a bastion host to access the Aurora MySQL cluster. Bastion host is used to connect to the Aurora MySQL writer database.
<pre>
(.venv) $ cdk deploy BastionHost
</pre>

<b><em>In order to set up the Aurora MySQL, you need to connect the Aurora MySQL cluster on an EC2 Bastion host.</em></b>

1. Connect to the Aurora cluster writer node.
1. Connect to the Aurora cluster writer node. NOTE: Password for the database is stored in AWS Secrets Manager
<pre>
$ sudo pip install ec2instanceconnectcli
$ BASTION_HOST_ID=$(aws cloudformation describe-stacks --stack-name <i>BastionHost</i> | jq -r '.Stacks[0].Outputs | .[] | select(.OutputKey | endswith("EC2InstanceId")) | .OutputValue')
$ mssh -r <i>us-east-1</i> ec2-user@${BASTION_HOST_ID}
$ mssh -r <i>us-west-2</i> ec2-user@${BASTION_HOST_ID}
[ec2-user@ip-172-31-7-186 ~]$ mysql -h<i>db-cluster-name</i>.cluster-<i>xxxxxxxxxxxx</i>.<i>region-name</i>.rds.amazonaws.com -uadmin -p
Enter password:
Welcome to the MariaDB monitor. Commands end with ; or \g.
Expand All @@ -210,7 +222,6 @@ Create a bastion host to access the Aurora MySQL cluster
For more information, see [Connect using the EC2 Instance Connect CLI](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-connect-methods.html#ec2-instance-connect-connecting-ec2-cli).
For example,
<pre>
$ sudo pip install ec2instanceconnectcli
$ mssh ec2-user@i-001234a4bf70dec41EXAMPLE # ec2-instance-id
</pre>

Expand Down Expand Up @@ -330,7 +341,7 @@ Create a Kinesis Data Firehose to deliver CDC coming from MSK Serverless to S3
1. Generate test data.
<pre>
$ BASTION_HOST_ID=$(aws cloudformation describe-stacks --stack-name <i>BastionHost</i> | jq -r '.Stacks[0].Outputs | .[] | select(.OutputKey | endswith("EC2InstanceId")) |.OutputValue')
$ mssh -r <i>us-east-1</i> ec2-user@${BASTION_HOST_ID}
$ mssh -r <i>us-west-2</i> ec2-user@${BASTION_HOST_ID}
[ec2-user@ip-172-31-7-186 ~]$ cat <&ltEOF >requirements-dev.txt
> boto3
> dataset==1.5.2
Expand Down Expand Up @@ -599,7 +610,7 @@ Enjoy!
* Amazon MSK Serverless does not allow `auto.create.topics.enable` to be set to `true`.

```
$ aws kafka update-cluster-configuration --cluster-arn arn:aws:kafka:us-east-1:123456789012:cluster/msk/39bb8562-e1b9-42a5-ba82-703ac0dee7ea-s1 --configuration-info file://msk-cluster-config.json --current-version K2EUQ1WTGCTBG2
$ aws kafka update-cluster-configuration --cluster-arn arn:aws:kafka:us-west-2:123456789012:cluster/msk/39bb8562-e1b9-42a5-ba82-703ac0dee7ea-s1 --configuration-info file://msk-cluster-config.json --current-version K2EUQ1WTGCTBG2

An error occurred (BadRequestException) when calling the UpdateClusterConfiguration operation: This operation cannot be performed on serverless clusters.
```
Expand Down
3 changes: 1 addition & 2 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
msk_stack.add_dependency(aurora_mysql_stack)

msk_policy_stack = MSKClusterPolicyStack(app, 'MSKClusterPolicy',
vpc_stack.vpc,
msk_stack.msk_cluster_name,
msk_stack.msk_cluster_arn,
env=AWS_ENV
)
msk_policy_stack.add_dependency(msk_stack)
Expand Down
10 changes: 8 additions & 2 deletions cdk.context.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"vpc_name": "default",
"db_cluster_name": "retail",
"msk_cluster_name": "retail-trans",
"msk_connector_worker_configuration_name": "AuroraMySQLSource",
"msk_connector_worker_configuration_name": "debezium-worker",
"msk_connector_custom_plugin_name": "debezium-connector-mysql-v2-4-0",
"msk_connector_name": "retail-changes",
"msk_connector_configuration": {
Expand All @@ -20,5 +20,11 @@
"sizeInMBs": 100
},
"topic_name": "retail-server.testdb.retail_trans"
}
},
"availability-zones:account=664418978161:region=us-west-2": [
"us-west-2a",
"us-west-2b",
"us-west-2c",
"us-west-2d"
]
}
13 changes: 4 additions & 9 deletions cdk_stacks/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,13 @@ def __init__(self, scope: Construct, construct_id: str,

#XXX: For more information about Debezium connector, see the following url:
# https://docs.aws.amazon.com/msk/latest/developerguide/mkc-debeziumsource-connector-example.html
# NOTE: Debezium MySQL connector only supports tasks.max=1, so we use provisioned capacity
# with a single worker instead of auto-scaling to avoid "Only a single connector task may be started" error
msk_connector = aws_kafkaconnect.CfnConnector(self, "KafkaCfnConnector",
capacity=aws_kafkaconnect.CfnConnector.CapacityProperty(
auto_scaling=aws_kafkaconnect.CfnConnector.AutoScalingProperty(
max_worker_count=2,
provisioned_capacity=aws_kafkaconnect.CfnConnector.ProvisionedCapacityProperty(
mcu_count=1,
min_worker_count=1,
scale_in_policy=aws_kafkaconnect.CfnConnector.ScaleInPolicyProperty(
cpu_utilization_percentage=20
),
scale_out_policy=aws_kafkaconnect.CfnConnector.ScaleOutPolicyProperty(
cpu_utilization_percentage=80
)
worker_count=1
)
),
connector_configuration={
Expand Down
18 changes: 1 addition & 17 deletions cdk_stacks/msk_cluster_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# -*- encoding: utf-8 -*-
# vim: tabstop=2 shiftwidth=2 softtabstop=2 expandtab

import boto3

import aws_cdk as cdk

from aws_cdk import (
Expand All @@ -13,24 +11,11 @@
from constructs import Construct


def get_msk_cluster_arn(msk_cluster_name, region_name):
client = boto3.client('kafka', region_name=region_name)
response = client.list_clusters_v2(ClusterNameFilter=msk_cluster_name)
cluster_info_list = [e for e in response['ClusterInfoList'] if e['ClusterName'] == msk_cluster_name]
if not cluster_info_list:
cluster_arn = f"arn:aws:kafka:{cdk.Aws.REGION}:{cdk.Aws.ACCOUNT_ID}:cluster/{msk_cluster_name}/*"
else:
cluster_arn = cluster_info_list[0]['ClusterArn']
return cluster_arn


class MSKClusterPolicyStack(Stack):

def __init__(self, scope: Construct, construct_id: str, vpc, msk_cluster_name, **kwargs) -> None:
def __init__(self, scope: Construct, construct_id: str, msk_cluster_arn: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

msk_cluster_arn = get_msk_cluster_arn(msk_cluster_name, vpc.env.region)

msk_cluster_policy = {
"Version": "2012-10-17",
"Statement": [
Expand All @@ -50,7 +35,6 @@ def __init__(self, scope: Construct, construct_id: str, vpc, msk_cluster_name, *
policy=msk_cluster_policy
)


cdk.CfnOutput(self, 'MSKClusterPolicyCurrentVersion', value=cfn_cluster_policy.attr_current_version,
export_name=f'{self.stack_name}-ClusterPolicyCurrentVersion')
cdk.CfnOutput(self, 'KafkaClusterArn', value=cfn_cluster_policy.cluster_arn,
Expand Down
4 changes: 2 additions & 2 deletions utils/gen_fake_mysql_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
def main():
parser = argparse.ArgumentParser()

parser.add_argument('--region-name', action='store', default='us-east-1',
help='aws region name (default: us-east-1)')
parser.add_argument('--region-name', action='store', default='us-west-2',
help='aws region name (default: us-west-2)')
parser.add_argument('--host', action='store', help='database host')
parser.add_argument('-u', '--user', action='store', help='user name')
parser.add_argument('-p', '--password', action='store', help='password')
Expand Down