Kafka Connect & CDC

Kafka Connect is the integration layer between Kafka and everything else—PostgreSQL, S3, Elasticsearch, legacy JDBC. You deploy connector plugins on workers; the framework handles offset storage, config distribution, and REST management. Debezium turns database transaction logs into event streams without polling your tables to death.

developer senior Kafka Connect 3.x

Kafka Connect architecture

Connect separates what to replicate (connector plugin + config) from where it runs (workers). Connectors split work into tasks; workers execute tasks and expose a REST API for lifecycle management.

flowchart TB
  REST[REST API :8083]
  W1[Worker JVM]
  C[Connector plugin\nJdbcSource / Debezium]
  T1[Task 0]
  T2[Task 1]
  K[(Kafka cluster)]
  EXT[(External system\nDB / S3 / ES)]
  REST --> W1
  W1 --> C
  C --> T1
  C --> T2
  T1 --> K
  T2 --> K
  T1 <--> EXT
  T2 <--> EXT

Connector

A connector is a logical job definition—configuration + plugin class. Source connectors pull from external systems into Kafka topics. Sink connectors consume Kafka topics and write outward. The connector class (connector.class) implements SourceConnector or SinkConnector and splits into tasks.

Task

Unit of parallelism within a connector. JDBC source might assign one table per task; Debezium might assign one database partition per task. Each task runs in a worker thread with independent offset tracking in Kafka topic connect-offsets.

Worker

JVM process loading connector plugins from plugin.path. Runs tasks, handles converters and transforms, produces/consumes with embedded Kafka clients. Distributed workers form a Connect cluster coordinated via internal topics: connect-configs, connect-offsets, connect-status.

Standalone vs distributed mode

Mode Use case Limitation
Standalone Local dev, single-file testing No HA; offsets in local file
Distributed Production Requires Kafka cluster for config/status/offsets

REST API

Default port 8083. Core operations:

Shell — Connect REST
# List connectors
curl -s http://connect:8083/connectors | jq

# Create connector
curl -X POST http://connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @debezium-postgres.json

# Status (connector + tasks)
curl -s http://connect:8083/connectors/inventory-cdc/status | jq

# Pause / resume / delete
curl -X PUT http://connect:8083/connectors/inventory-cdc/pause
curl -X PUT http://connect:8083/connectors/inventory-cdc/resume
curl -X DELETE http://connect:8083/connectors/inventory-cdc

# Restart failed task
curl -X POST http://connect:8083/connectors/inventory-cdc/tasks/0/restart

Converter

Serialize/deserialize between Connect’s internal Schema + Struct representation and Kafka bytes. Set at worker or connector level: key.converter, value.converter.

Converter Format When
JsonConverter JSON + optional schema envelope Dev, human-readable topics
AvroConverter Avro + Schema Registry ID Production internal pipelines
ProtobufConverter Protobuf + schema metadata Cross-language, gRPC interop
ByteArrayConverter Raw bytes Opaque payloads, pass-through
connector config — converters
{
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

Transforms (SMT) overview

Single Message Transforms mutate each record in-flight—field extraction, masking, routing— without a separate stream processor. Chain multiple SMTs per connector (see dedicated section below).

🔬 Under the Hood

Worker stores connector config in connect-configs compact topic; status heartbeats in connect-status. On worker death, rebalance redistributes tasks—offsets in connect-offsets ensure at-least-once resume.

Source connectors

Sources move data into Kafka. JDBC polling is simple but load-heavy; Debezium reads the database’s transaction log— the right tool for CDC and the transactional outbox pattern.

JDBC Source

Confluent JDBC connector modes:

Mode Mechanism Trade-off
Bulk / table polling SELECT * FROM table on interval Simple; full table scan load; no change-only
Incrementing column WHERE id > ? monotonic ID Misses deletes; gaps if IDs non-sequential
Timestamp WHERE updated_at > ? Misses deletes; clock skew issues
Timestamp + incrementing Composite cursor Better uniqueness on updates
Query mode Custom SQL with $criteria Flexible joins; operator owns correctness
JSON — JDBC source timestamp+incrementing
{
  "name": "jdbc-orders-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://db:5432/sales",
    "connection.user": "connect_user",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "order_id",
    "topic.prefix": "jdbc-",
    "poll.interval.ms": "5000",
    "tasks.max": "4"
  }
}

Debezium — Change Data Capture

Debezium connectors read the database transaction log (WAL, binlog, oplog)—not application tables—for every insert, update, delete. Low overhead on source DB; captures deletes; preserves commit order per table/partition.

How Debezium works

  1. Connector connects to DB replication stream
  2. Snapshot (optional) — consistent initial image of tables
  3. Streaming — tail log events, map to Kafka records
  4. Emit to topics: {server}.{schema}.{table}
  5. Store offsets in Kafka Connect offset store

Database-specific capture

Database Log mechanism Requirements
PostgreSQL Logical replication (pgoutput plugin) wal_level=logical, replication slot, publication
MySQL Binlog (row format) binlog_format=ROW, binlog_row_image=FULL
MongoDB Change streams / oplog Replica set required
Oracle LogMiner Supplemental logging, archivelog mode
SQL Server CDC tables CDC enabled per database/table

Debezium envelope structure

Standard change event payload (value):

JSON — Debezium change event
{
  "before": { "id": 42, "status": "PENDING", "amount": 100.00 },
  "after":  { "id": 42, "status": "SHIPPED", "amount": 100.00 },
  "source": {
    "version": "2.5.0.Final",
    "connector": "postgresql",
    "name": "inventory-server",
    "ts_ms": 1717200000123,
    "snapshot": "false",
    "db": "inventory",
    "schema": "public",
    "table": "orders",
    "txId": 987654,
    "lsn": 123456789
  },
  "op": "u",
  "ts_ms": 1717200000456,
  "transaction": null
}
Field Meaning
beforeRow state before change (null on insert)
afterRow state after change (null on delete)
opc create, u update, d delete, r read (snapshot)
source.ts_msSource DB event time
ts_msConnector processing time

Snapshot modes

  • initial — full snapshot then stream (default)
  • never — streaming only; existing data not backfilled
  • when_needed — snapshot if no offset or history lost
  • initial_only — snapshot then stop (batch load)
  • no_data — schema snapshot only, no row copy
JSON — Debezium PostgreSQL connector
{
  "name": "inventory-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "replicator",
    "database.password": "${secrets:db/password}",
    "database.dbname": "inventory",
    "topic.prefix": "inventory",
    "table.include.list": "public.orders,public.order_lines",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_inventory",
    "publication.name": "dbz_publication",
    "snapshot.mode": "initial",
    "tombstones.on.delete": "true"
  }
}

Debezium + Outbox Pattern

Application writes business event to outbox table in same DB transaction as domain write. Debezium captures outbox rows; EventRouter SMT routes to domain topics by event type field— no dual-write to Kafka from application code.

JSON — outbox EventRouter SMT
{
  "transforms": "outbox",
  "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.table.field.event.id": "id",
  "transforms.outbox.table.field.event.key": "aggregate_id",
  "transforms.outbox.table.field.event.type": "event_type",
  "transforms.outbox.table.field.event.payload": "payload",
  "transforms.outbox.route.topic.replacement": "domain.${routedByValue}",
  "transforms.outbox.table.expand.json.payload": "true"
}

FileStreamSource

Built-in FileStreamSourceConnector — reads stdin/file line-by-line. Testing and demos only—no partitioning, no production offset semantics, no fault tolerance worth trusting.

⚠️ Pitfall

PostgreSQL replication slot lag — if Connect stops consuming WAL, slot retains disk until disk fills. Monitor pg_replication_slots confirmed_flush_lsn vs current LSN.

📦 Real World

Zalando open-sourced Debezium patterns for outbox; Shopify streams MySQL binlog changes to Kafka for search indexing. JDBC polling remains common for legacy batch extracts where CDC is not approved.

Sink connectors

Sinks consume Kafka topics and land data in systems optimized for query, search, or archival. Idempotent writes and upsert semantics matter because Connect delivers at-least-once.

JDBC Sink

Writes records to relational tables. insert.mode=upsert with pk.mode=record_key or pk.fields generates INSERT ... ON CONFLICT UPDATE (PostgreSQL) or equivalent—retries do not duplicate rows.

JSON — JDBC sink upsert
{
  "name": "jdbc-orders-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "inventory.public.orders",
    "connection.url": "jdbc:postgresql://warehouse:5432/analytics",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "auto.create": "false",
    "auto.evolve": "true",
    "delete.enabled": "true"
  }
}

Elasticsearch Sink

Indexes documents for full-text search. Key config: connection.url, type.name (legacy), write.method=UPSERT, key.ignore=false for idempotent updates by Kafka key. Use data stream / index templates for time-series indices.

S3 Sink

Archival and data lake ingestion with pluggable formats:

  • Partitioningpartitioner.class: time-based (year=/month=/day=) or field-based (customer_id=)
  • Formats — Parquet (analytics default), Avro (schema evolution), JSON (human-readable cost)
  • flush.size / rotate.interval.ms — file rollover policy
JSON — S3 sink Parquet time partition
{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "topics": "orders",
  "s3.bucket.name": "data-lake-raw",
  "s3.region": "us-east-1",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
  "flush.size": "10000",
  "rotate.interval.ms": "3600000",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage"
}

BigQuery & Snowflake sinks

Confluent Cloud / partner connectors stream to warehouse tables—often with staged files + merge (Snowflake) or streaming inserts (BigQuery Storage Write API). Match topic schema via Avro/Protobuf + Schema Registry. Consider cost of frequent small batches vs staged bulk loads.

HTTP Sink

Delivers records to REST webhooks—use for lightweight integrations without maintaining a consumer service. Configure http.api.url, retry policy, and request.body.format. At-least-once means endpoints must be idempotent (Idempotency-Key header).

Sink Idempotency strategy
JDBCPrimary key upsert
ElasticsearchDocument ID from key
S3Object paths + dedup job downstream
HTTPIdempotency-Key / If-Match ETag
⚖️ Trade-off

S3 sink favors throughput and cost; JDBC/ES favor serving latency. Landing raw Avro/Parquet in S3 + dbt/Spark transform is the modern lake pattern; direct JDBC sink suits smaller operational sync jobs.

SMT — Single Message Transforms

SMTs are lightweight record processors inside the connector—no Kafka Streams cluster. Chain them in config order: first transform receives raw converter output; last feeds the topic.

Transform Purpose Example config key
ExtractField Pull nested field to top-level value transforms.extract.field
ReplaceField Rename, include, or exclude fields transforms.rename.renames
MaskField Replace field values with null/hash transforms.mask.fields
TimestampConverter Convert timestamp format/timezone transforms.ts.target.type
Flatten Nested struct → dot-delimited flat fields transforms.flat.delimiter
HoistField Wrap entire value under named field transforms.hoist.field
InsertField Add static or metadata fields transforms.insert.static.field
Filter Drop records matching predicate transforms.filter.condition
RegexRouter Reroute topic by regex transforms.route.regex

Chaining multiple SMTs

Define comma-separated alias list in transforms; each alias gets transforms.{alias}.type and parameters. Order matters: left-to-right in the list.

JSON — chained SMT pipeline
{
  "transforms": "extractKey,maskPii,addSource,filterNulls",

  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field": "order_id",

  "transforms.maskPii.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskPii.fields": "email,phone",

  "transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addSource.static.field": "pipeline",
  "transforms.addSource.static.value": "connect-v2",

  "transforms.filterNulls.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterNulls.predicate": "IsNull",
  "transforms.filterNulls.predicate.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
}
JSON — TimestampConverter
{
  "transforms": "ts",
  "transforms.ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.ts.field": "created_at",
  "transforms.ts.target.type": "string",
  "transforms.ts.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
  "transforms.ts.timezone": "UTC"
}
💡 Pro Tip

SMTs run per-record in the worker thread—heavy logic (JSON parsing huge blobs) blocks task throughput. Push complex enrichment to Kafka Streams or Flink; keep SMTs for field hygiene.

Error handling in Connect

Converter failures, schema mismatches, and sink rejections default to failing the task—stopping the pipeline. Production configs route poison pills to a dead letter topic while healthy records continue.

errors.tolerance

Value Behavior Use when
none Fail task immediately (default) Strict pipelines; staging
all Skip bad records; continue processing Production with DLT monitoring

Dead letter queue

errors.deadletterqueue.topic.name — failed records published with headers: __connect.errors.topic, __connect.errors.exception.message, __connect.errors.stage (KEY_CONVERTER, VALUE_CONVERTER, SINK).

JSON — error tolerance + DLT
{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "connect-dlq.inventory-orders",
  "errors.deadletterqueue.topic.replication.factor": "3",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true"
}

errors.log.enable

Logs exception stack trace and optionally message payload to worker logs—essential for debugging without consuming DLT. Pair with log aggregation (ELK, Loki) and alert on error rate spikes.

⚠️ Pitfall

errors.tolerance=all without DLT silently drops records if DLT topic misconfigured—always set errors.deadletterqueue.topic.name and monitor DLT lag.

🎯 Interview Tip

“How do you handle poison messages in Connect?” — errors.tolerance=all + DLT topic + headers for replay; fix schema/code; reset connector or consume DLT with repair pipeline. Contrast with Kafka consumer manual seek/DLT pattern.

Connect performance tuning

Connect throughput is bounded by task count, poll batch sizes, converter CPU, and sink acceptance rate. Scale tasks before scaling workers; scale workers before inflating heap without measurement.

tasks.max — parallelism

Upper bound on tasks the connector may spawn. Set to match:

  • JDBC source — number of tables or partitions being polled
  • Debezium — often 1 per connector for single DB; multi-partition for sharded capture
  • Kafka sink — match consumed topic partition count for full parallelism

Actual tasks = min(tasks.max, connector’s reported max tasks).

JDBC source batching

Property Effect
poll.interval.ms Delay between table polls (default 5000)
batch.max.rows Rows per query batch
query.timeout.ms DB query timeout

Consumer settings (sinks)

Sink tasks embed Kafka consumers—inherits consumer.override.* prefix:

JSON — sink consumer overrides
{
  "tasks.max": "12",
  "consumer.override.fetch.min.bytes": "1048576",
  "consumer.override.fetch.max.wait.ms": "500",
  "consumer.override.max.poll.records": "1000"
}

Worker JVM and GC

  • Heap — 4–8 GB typical; Avro/JSON conversion is CPU/GC heavy at high throughput
  • G1GC-XX:+UseG1GC -XX:MaxGCPauseMillis=20
  • plugin isolation — separate connector plugins in isolated classloaders (default)—watch metaspace
  • Horizontal scale — add workers to distributed cluster; rebalance tasks automatically
connect-distributed.properties — worker tuning
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
group.id=connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
⚙️ Config

Monitor: source-record-poll-rate, sink-record-read-rate, task status RUNNING/FAILED, worker rebalance time. Failed task ≠ restart connector—use task-level restart API first.

📦 Real World

Confluent Cloud managed Connect handles worker sizing; self-hosted teams run 3+ workers for HA and cap tasks.max per connector to avoid one JDBC monster starving others on shared workers.