Kafka replication using Mirror Maker 2.0 (MM2)to AWS MSK with Monitoring

Harish Nandimandalam
5 min readJan 19, 2021

--

This story describes on how to replicate data for active-passive combination of Kafka clusters for both standalone and AWS MSK Kafka clusters.We will be using JMX metrics, prometheus and Grafana to visualize the stats.

Architecture

MM2 is different from MM1 where MM2 uses connect framework.Mirror cluster consumes from source cluster and pushes messages to target cluster.

More details about the architecture can be found here

Strategies

Write Strategy

  • Active-Active
  1. Distribute writes onto Kafka among c1 and c2 cluster
  2. Enable c1 → c2 replication
  3. Enable c2 → c1 replication
  • Active-Passive
  1. All writes to Kafka cluster in c1 or c2 cluster
  2. Enable c1 → c2 replication or c2→ c1 replication

Read Strategy

  1. Consume from both dc1 and dc2 data centers

2. Consume from either c1 or c2 Kafka clusters

Execution

Adopted the Active-Passive write strategy and consume from either c1 and c2 as read strategy. In our setup, c1 is the active cluster and c2 is passive cluster, Where c1 is standalone Kafka cluster and c2 is AWS MSK provisioned cluster.

  1. We set up the 2nodes/instances (2 VMs) for multi threading and parallel processing.
  2. choose any flavor of linux and install Kafka on the nodes where we will be running MM2 script.
  3. Update mm2. properties file in each node, below are the props

here we need to use replication policy class to a custom jar because when we enable the replication to destination the topic at destination created has the source cluster name as prefix on destination cluster (eg. c1.test_topic), so to avoid the prefix we will be using the jar provided by AWS labs. Below are the steps to be performed.

  1. Need to clone the repo from AWS git.
  2. git clone https://github.com/aws-samples/mirrormaker2-msk-migration.git
  3. Once it is downloaded you can go to this path “/mirrormaker2-msk-migration/CustomMM2ReplicationPolicy” and run “ mvn clean install”
  4. under this path “/mirrormaker2-msk-migration/CustomMM2ReplicationPolicy/target” we will have jar by name “CustomMM2ReplicationPolicy-1.0-SNAPSHOT.jar”
  5. Place the jar under the Kafka install libs folder. when we run the script it will pick up this jar.

Monitoring

To monitor the number of events processed, consumed and view the metrics we will make use of JMX, Prometheus and Grafana.

Following steps illustrate how Prometheus can be used to monitor MM2:

  1. Download the prometheus jar file.
    wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.13.0/jmx_prometheus_javaagent-0.13.0.jar

2. Create a kafka-connect.yml with the following contents.

lowercaseOutputName: true
rules:
#kafka.connect:type=app-info,client-id="{clientid}"
#kafka.consumer:type=app-info,client-id="{clientid}"
#kafka.producer:type=app-info,client-id="{clientid}"
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>start-time-ms'
name: kafka_$1_start_time_seconds
labels:
clientId: "$2"
help: "Kafka $1 JMX metric start time seconds"
type: GAUGE
valueFactor: 0.001
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>(commit-id|version): (.+)'
name: kafka_$1_$3_info
value: 1
labels:
clientId: "$2"
$3: "$4"
help: "Kafka $1 JMX metric info version and commit-id"
type: GAUGE#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(.+-total|compression-rate|.+-avg|.+-replica|.+-lag|.+-lead)
name: kafka_$2_$6
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
help: "Kafka $1 JMX metric type $2"
type: GAUGE#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(.+-total|compression-rate|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
topic: "$4"
help: "Kafka $1 JMX metric type $2"
type: GAUGE#kafka.connect:type=connect-node-metrics,client-id="{clientid}",node-id="{nodeid}"
#kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id="{nodeid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-total|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
nodeId: "$4"
help: "Kafka $1 JMX metric type $2"
type: UNTYPED#kafka.connect:type=kafka-metrics-count,client-id="{clientid}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-coordinator-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-metrics,client-id="{clientid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-total|.+-avg|.+-bytes|.+-count|.+-ratio|.+-age|.+-flight|.+-threads|.+-connectors|.+-tasks|.+-ago)
name: kafka_$2_$4
labels:
clientId: "$3"
help: "Kafka $1 JMX metric type $2"
type: GAUGE#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}<> status"
- pattern: 'kafka.connect<type=connector-task-metrics, connector=(.+), task=(.+)><>status: ([a-z-]+)'
name: kafka_connect_connector_status
value: 1
labels:
connector: "$1"
task: "$2"
status: "$3"
help: "Kafka Connect JMX Connector status"
type: GAUGE#kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total|.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
help: "Kafka Connect JMX metric type $1"
type: GAUGE#kafka.connect:type=connector-metrics,connector="{connector}"
#kafka.connect:type=connect-worker-metrics,connector="{connector}"
- pattern: kafka.connect<type=connect-worker-metrics, connector=(.+)><>([a-z-]+)
name: kafka_connect_worker_$2
labels:
connector: "$1"
help: "Kafka Connect JMX metric $1"
type: GAUGE#kafka.connect:type=connect-worker-metrics
- pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
name: kafka_connect_worker_$1
help: "Kafka Connect JMX metric worker"
type: GAUGE#kafka.connect:type=connect-worker-rebalance-metrics
- pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
name: kafka_connect_worker_rebalance_$1
help: "Kafka Connect JMX metric rebalance information"
type: GAUGE#kafka.connect.mirror:type=MirrorSourceConnector
- pattern: kafka.connect.mirror<type=MirrorSourceConnector, target=(.+), topic=(.+), partition=([0-9]+)><>([a-z-]+)
name: kafka_connect_mirror_source_connector_$4
help: Kafka Connect MM2 Source Connector Information
labels:
destination: "$1"
topic: "$2"
partition: "$3"
type: GAUGE#kafka.connect.mirror:type=MirrorCheckpointConnector
- pattern: kafka.connect.mirror<type=MirrorCheckpointConnector, source=(.+), target=(.+)><>([a-z-]+)
name: kafka_connect_mirror_checkpoint_connector_$3
help: Kafka Connect MM2 Checkpoint Connector Information
labels:
source: "$1"
target: "$2"
type: GAUGE

3. Configure the environment variable KAFKA_OPTS to the above kafka_connect.yml and the jar file and provide MM2 script as .sh file to execute in a shell script as below.

export KAFKA_OPTS=’-Dcom.sun.management.jmxremote -javaagent:Path_to_downlaoded_jmx_prometheus_javaagent-0.13.0.jar=3600:path_to_downloaded_kafka-connect.yml’

/path_to_kakfa_/connect-mirror-maker.sh /path_to_created/mm2.properties 1>>/var/log/kafka/mm2.log 2>>/var/log/kafka/mm2.log &

4. Start MM2 script .sh file in the same terminal

sh mm2_script_filename.sh

5. You can now verify the latency via

curl localhost:3600 | grep -i kafka_connect_mirror_source_connector_replication_latency_ms_max

When MM2 is in sync the output will be as follows:

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
Dload Upload Total Spent Left Speed
100 230k 100 230k 0 0 197k 0 0:00:01 0:00:01 --:--:-- 197k#HELP kafka_connect_mirror_source_connector_replication_latency_ms_max Kafka Connect MM2 Source Connector Information#TYPE kafka_connect_mirror_source_connector_replication_latency_ms_max gaugekafka_connect_mirror_source_connector_replication_latency_ms_max{destination="destination-name",partition="7",topic="test-topic",} NaN

There will be a row for every partition of the topic for which MM2 is running and when it ends with NaN , it means that the partition is in sync.

MM2 is in sync when both the above record ends with NaN.

For the above metrics we can view in the browser by providing hostname:jmx_port_number

Visualization

Prometheus

To view the metrics in realtime and graphical view we need TSDB like prometheus and view them using Grafana.

  1. Download the prometheus TSDB from the repo

wget https://github.com/prometheus/prometheus/releases/download/v2.23.0/prometheus-2.23.0.linux-amd64.tar.gz

2. Unzip and configure the endpoint of JMX metrics in prometheus.yaml with scrape configs. All the details and configs can be seen on Prometheus docs.

3. Once everything is up and running with no errors on Prometheus we can verify our queries using PROMQL from prometheus UI.

Grafana

  1. To view the graphs and values we can use Grafana, download the installation file and start the grafana service. Download_link
  2. Once Grafana is setup and started with no errors we can login and add the prometheus as datasource and visualize the graphs in realtime data.

--

--

Harish Nandimandalam
Harish Nandimandalam

Written by Harish Nandimandalam

Bigdata, Devops,SRE, multi cloud, K8's,Docker,sports,travel,foodie

No responses yet