Kafka Connect & CDC
Kafka Connect is the integration layer between Kafka and everything else—PostgreSQL, S3, Elasticsearch, legacy JDBC. You deploy connector plugins on workers; the framework handles offset storage, config distribution, and REST management. Debezium turns database transaction logs into event streams without polling your tables to death.
Kafka Connect architecture
Connect separates what to replicate (connector plugin + config) from where it runs (workers). Connectors split work into tasks; workers execute tasks and expose a REST API for lifecycle management.
flowchart TB REST[REST API :8083] W1[Worker JVM] C[Connector plugin\nJdbcSource / Debezium] T1[Task 0] T2[Task 1] K[(Kafka cluster)] EXT[(External system\nDB / S3 / ES)] REST --> W1 W1 --> C C --> T1 C --> T2 T1 --> K T2 --> K T1 <--> EXT T2 <--> EXT
Connector
A connector is a logical job definition—configuration + plugin class. Source connectors pull from external systems into Kafka topics. Sink connectors consume Kafka topics and write outward. The connector class (connector.class) implements SourceConnector or SinkConnector and splits into tasks.
Task
Unit of parallelism within a connector. JDBC source might assign one table per task; Debezium might assign one database partition per task. Each task runs in a worker thread with independent offset tracking in Kafka topic connect-offsets.
Worker
JVM process loading connector plugins from plugin.path. Runs tasks, handles converters and transforms, produces/consumes with embedded Kafka clients. Distributed workers form a Connect cluster coordinated via internal topics: connect-configs, connect-offsets, connect-status.
Standalone vs distributed mode
| Mode | Use case | Limitation |
|---|---|---|
| Standalone | Local dev, single-file testing | No HA; offsets in local file |
| Distributed | Production | Requires Kafka cluster for config/status/offsets |
REST API
Default port 8083. Core operations:
# List connectors
curl -s http://connect:8083/connectors | jq
# Create connector
curl -X POST http://connect:8083/connectors \
-H "Content-Type: application/json" \
-d @debezium-postgres.json
# Status (connector + tasks)
curl -s http://connect:8083/connectors/inventory-cdc/status | jq
# Pause / resume / delete
curl -X PUT http://connect:8083/connectors/inventory-cdc/pause
curl -X PUT http://connect:8083/connectors/inventory-cdc/resume
curl -X DELETE http://connect:8083/connectors/inventory-cdc
# Restart failed task
curl -X POST http://connect:8083/connectors/inventory-cdc/tasks/0/restart
Converter
Serialize/deserialize between Connect’s internal Schema + Struct representation and Kafka bytes. Set at worker or connector level: key.converter, value.converter.
| Converter | Format | When |
|---|---|---|
JsonConverter |
JSON + optional schema envelope | Dev, human-readable topics |
AvroConverter |
Avro + Schema Registry ID | Production internal pipelines |
ProtobufConverter |
Protobuf + schema metadata | Cross-language, gRPC interop |
ByteArrayConverter |
Raw bytes | Opaque payloads, pass-through |
{
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
Transforms (SMT) overview
Single Message Transforms mutate each record in-flight—field extraction, masking, routing— without a separate stream processor. Chain multiple SMTs per connector (see dedicated section below).
Worker stores connector config in connect-configs compact topic; status heartbeats in connect-status. On worker death, rebalance redistributes tasks—offsets in connect-offsets ensure at-least-once resume.
Source connectors
Sources move data into Kafka. JDBC polling is simple but load-heavy; Debezium reads the database’s transaction log— the right tool for CDC and the transactional outbox pattern.
JDBC Source
Confluent JDBC connector modes:
| Mode | Mechanism | Trade-off |
|---|---|---|
| Bulk / table polling | SELECT * FROM table on interval |
Simple; full table scan load; no change-only |
| Incrementing column | WHERE id > ? monotonic ID |
Misses deletes; gaps if IDs non-sequential |
| Timestamp | WHERE updated_at > ? |
Misses deletes; clock skew issues |
| Timestamp + incrementing | Composite cursor | Better uniqueness on updates |
| Query mode | Custom SQL with $criteria |
Flexible joins; operator owns correctness |
{
"name": "jdbc-orders-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/sales",
"connection.user": "connect_user",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "order_id",
"topic.prefix": "jdbc-",
"poll.interval.ms": "5000",
"tasks.max": "4"
}
}
Debezium — Change Data Capture
Debezium connectors read the database transaction log (WAL, binlog, oplog)—not application tables—for every insert, update, delete. Low overhead on source DB; captures deletes; preserves commit order per table/partition.
How Debezium works
- Connector connects to DB replication stream
- Snapshot (optional) — consistent initial image of tables
- Streaming — tail log events, map to Kafka records
- Emit to topics: {server}.{schema}.{table}
- Store offsets in Kafka Connect offset store
Database-specific capture
| Database | Log mechanism | Requirements |
|---|---|---|
| PostgreSQL | Logical replication (pgoutput plugin) |
wal_level=logical, replication slot, publication |
| MySQL | Binlog (row format) | binlog_format=ROW, binlog_row_image=FULL |
| MongoDB | Change streams / oplog | Replica set required |
| Oracle | LogMiner | Supplemental logging, archivelog mode |
| SQL Server | CDC tables | CDC enabled per database/table |
Debezium envelope structure
Standard change event payload (value):
{
"before": { "id": 42, "status": "PENDING", "amount": 100.00 },
"after": { "id": 42, "status": "SHIPPED", "amount": 100.00 },
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "inventory-server",
"ts_ms": 1717200000123,
"snapshot": "false",
"db": "inventory",
"schema": "public",
"table": "orders",
"txId": 987654,
"lsn": 123456789
},
"op": "u",
"ts_ms": 1717200000456,
"transaction": null
}
| Field | Meaning |
|---|---|
before | Row state before change (null on insert) |
after | Row state after change (null on delete) |
op | c create, u update, d delete, r read (snapshot) |
source.ts_ms | Source DB event time |
ts_ms | Connector processing time |
Snapshot modes
- initial — full snapshot then stream (default)
- never — streaming only; existing data not backfilled
- when_needed — snapshot if no offset or history lost
- initial_only — snapshot then stop (batch load)
- no_data — schema snapshot only, no row copy
{
"name": "inventory-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replicator",
"database.password": "${secrets:db/password}",
"database.dbname": "inventory",
"topic.prefix": "inventory",
"table.include.list": "public.orders,public.order_lines",
"plugin.name": "pgoutput",
"slot.name": "debezium_inventory",
"publication.name": "dbz_publication",
"snapshot.mode": "initial",
"tombstones.on.delete": "true"
}
}
Debezium + Outbox Pattern
Application writes business event to outbox table in same DB transaction as domain write. Debezium captures outbox rows; EventRouter SMT routes to domain topics by event type field— no dual-write to Kafka from application code.
{
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "domain.${routedByValue}",
"transforms.outbox.table.expand.json.payload": "true"
}
FileStreamSource
Built-in FileStreamSourceConnector — reads stdin/file line-by-line. Testing and demos only—no partitioning, no production offset semantics, no fault tolerance worth trusting.
PostgreSQL replication slot lag — if Connect stops consuming WAL, slot retains disk until disk fills. Monitor pg_replication_slots confirmed_flush_lsn vs current LSN.
Zalando open-sourced Debezium patterns for outbox; Shopify streams MySQL binlog changes to Kafka for search indexing. JDBC polling remains common for legacy batch extracts where CDC is not approved.
Sink connectors
Sinks consume Kafka topics and land data in systems optimized for query, search, or archival. Idempotent writes and upsert semantics matter because Connect delivers at-least-once.
JDBC Sink
Writes records to relational tables. insert.mode=upsert with
pk.mode=record_key or pk.fields
generates INSERT ... ON CONFLICT UPDATE (PostgreSQL) or equivalent—retries do not duplicate rows.
{
"name": "jdbc-orders-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "inventory.public.orders",
"connection.url": "jdbc:postgresql://warehouse:5432/analytics",
"insert.mode": "upsert",
"pk.mode": "record_key",
"auto.create": "false",
"auto.evolve": "true",
"delete.enabled": "true"
}
}
Elasticsearch Sink
Indexes documents for full-text search. Key config: connection.url, type.name (legacy), write.method=UPSERT, key.ignore=false for idempotent updates by Kafka key. Use data stream / index templates for time-series indices.
S3 Sink
Archival and data lake ingestion with pluggable formats:
- Partitioning — partitioner.class: time-based (
year=/month=/day=) or field-based (customer_id=) - Formats — Parquet (analytics default), Avro (schema evolution), JSON (human-readable cost)
- flush.size / rotate.interval.ms — file rollover policy
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "orders",
"s3.bucket.name": "data-lake-raw",
"s3.region": "us-east-1",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage"
}
BigQuery & Snowflake sinks
Confluent Cloud / partner connectors stream to warehouse tables—often with staged files + merge (Snowflake) or streaming inserts (BigQuery Storage Write API). Match topic schema via Avro/Protobuf + Schema Registry. Consider cost of frequent small batches vs staged bulk loads.
HTTP Sink
Delivers records to REST webhooks—use for lightweight integrations without maintaining a consumer service. Configure http.api.url, retry policy, and request.body.format. At-least-once means endpoints must be idempotent (Idempotency-Key header).
| Sink | Idempotency strategy |
|---|---|
| JDBC | Primary key upsert |
| Elasticsearch | Document ID from key |
| S3 | Object paths + dedup job downstream |
| HTTP | Idempotency-Key / If-Match ETag |
S3 sink favors throughput and cost; JDBC/ES favor serving latency. Landing raw Avro/Parquet in S3 + dbt/Spark transform is the modern lake pattern; direct JDBC sink suits smaller operational sync jobs.
SMT — Single Message Transforms
SMTs are lightweight record processors inside the connector—no Kafka Streams cluster. Chain them in config order: first transform receives raw converter output; last feeds the topic.
| Transform | Purpose | Example config key |
|---|---|---|
ExtractField |
Pull nested field to top-level value | transforms.extract.field |
ReplaceField |
Rename, include, or exclude fields | transforms.rename.renames |
MaskField |
Replace field values with null/hash | transforms.mask.fields |
TimestampConverter |
Convert timestamp format/timezone | transforms.ts.target.type |
Flatten |
Nested struct → dot-delimited flat fields | transforms.flat.delimiter |
HoistField |
Wrap entire value under named field | transforms.hoist.field |
InsertField |
Add static or metadata fields | transforms.insert.static.field |
Filter |
Drop records matching predicate | transforms.filter.condition |
RegexRouter |
Reroute topic by regex | transforms.route.regex |
Chaining multiple SMTs
Define comma-separated alias list in transforms; each alias gets transforms.{alias}.type and parameters. Order matters: left-to-right in the list.
{
"transforms": "extractKey,maskPii,addSource,filterNulls",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "order_id",
"transforms.maskPii.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPii.fields": "email,phone",
"transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSource.static.field": "pipeline",
"transforms.addSource.static.value": "connect-v2",
"transforms.filterNulls.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterNulls.predicate": "IsNull",
"transforms.filterNulls.predicate.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
}
{
"transforms": "ts",
"transforms.ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ts.field": "created_at",
"transforms.ts.target.type": "string",
"transforms.ts.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"transforms.ts.timezone": "UTC"
}
SMTs run per-record in the worker thread—heavy logic (JSON parsing huge blobs) blocks task throughput. Push complex enrichment to Kafka Streams or Flink; keep SMTs for field hygiene.
Error handling in Connect
Converter failures, schema mismatches, and sink rejections default to failing the task—stopping the pipeline. Production configs route poison pills to a dead letter topic while healthy records continue.
errors.tolerance
| Value | Behavior | Use when |
|---|---|---|
none |
Fail task immediately (default) | Strict pipelines; staging |
all |
Skip bad records; continue processing | Production with DLT monitoring |
Dead letter queue
errors.deadletterqueue.topic.name — failed records published with headers: __connect.errors.topic, __connect.errors.exception.message, __connect.errors.stage (KEY_CONVERTER, VALUE_CONVERTER, SINK).
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "connect-dlq.inventory-orders",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
errors.log.enable
Logs exception stack trace and optionally message payload to worker logs—essential for debugging without consuming DLT. Pair with log aggregation (ELK, Loki) and alert on error rate spikes.
errors.tolerance=all without DLT silently drops records if DLT topic misconfigured—always set errors.deadletterqueue.topic.name and monitor DLT lag.
“How do you handle poison messages in Connect?” — errors.tolerance=all + DLT topic + headers for replay; fix schema/code; reset connector or consume DLT with repair pipeline. Contrast with Kafka consumer manual seek/DLT pattern.
Connect performance tuning
Connect throughput is bounded by task count, poll batch sizes, converter CPU, and sink acceptance rate. Scale tasks before scaling workers; scale workers before inflating heap without measurement.
tasks.max — parallelism
Upper bound on tasks the connector may spawn. Set to match:
- JDBC source — number of tables or partitions being polled
- Debezium — often 1 per connector for single DB; multi-partition for sharded capture
- Kafka sink — match consumed topic partition count for full parallelism
Actual tasks = min(tasks.max, connector’s reported max tasks).
JDBC source batching
| Property | Effect |
|---|---|
poll.interval.ms |
Delay between table polls (default 5000) |
batch.max.rows |
Rows per query batch |
query.timeout.ms |
DB query timeout |
Consumer settings (sinks)
Sink tasks embed Kafka consumers—inherits consumer.override.* prefix:
{
"tasks.max": "12",
"consumer.override.fetch.min.bytes": "1048576",
"consumer.override.fetch.max.wait.ms": "500",
"consumer.override.max.poll.records": "1000"
}
Worker JVM and GC
- Heap — 4–8 GB typical; Avro/JSON conversion is CPU/GC heavy at high throughput
- G1GC — -XX:+UseG1GC -XX:MaxGCPauseMillis=20
- plugin isolation — separate connector plugins in isolated classloaders (default)—watch metaspace
- Horizontal scale — add workers to distributed cluster; rebalance tasks automatically
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
group.id=connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
Monitor: source-record-poll-rate, sink-record-read-rate, task status RUNNING/FAILED, worker rebalance time. Failed task ≠ restart connector—use task-level restart API first.
Confluent Cloud managed Connect handles worker sizing; self-hosted teams run 3+ workers for HA and cap tasks.max per connector to avoid one JDBC monster starving others on shared workers.