Schema Registry & Data Contracts
Kafka moves bytes fast—but without a data contract, a producer renaming one field can silently break every downstream consumer. Confluent Schema Registry is the gatekeeper: versioned schemas, compatibility checks, and a 4-byte schema ID embedded in every Avro/Protobuf/JSON Schema message.
Why schema management matters
Event streams outlive individual services. The schema is the contract between teams—without enforcement, “small” producer changes become production incidents weeks later in a warehouse or analytics job.
Schema-on-write vs schema-on-read
| Approach | When schema is enforced | Typical stack |
|---|---|---|
| Schema-on-write | At produce time — invalid payloads rejected | Kafka + Schema Registry + Avro |
| Schema-on-read | At consume time — reader interprets raw bytes | JSON topics, data lakes, exploratory analytics |
Schema-on-write pushes failures left: the producer cannot publish a breaking change if compatibility checks fail. Schema-on-read is flexible but shifts burden to every consumer—each must handle missing fields, type changes, and renames independently.
The silent breakage problem
Without a registry, this sequence is common:
- Team A ships v2 of OrderEvent — renames customerId → customer_id
- Kafka accepts JSON bytes — no validation at broker
- Team B’s Flink job still reads customerId → null for all new records
- Revenue dashboard under-reports for three days until someone notices
JSON on Kafka feels fast in week one. In month six, fifteen consumers parse slightly different shapes. Schema Registry + Avro costs upfront ceremony; it pays back the first time you block an incompatible deploy in CI.
Data contract
A data contract is the agreed structure of a topic’s key and value: field names, types, semantics, defaults, and evolution rules. It is owned by the producing team, reviewed by consumers, and enforced at registration time—not documented in a wiki and hoped for.
- Producer obligation: register schema before writing; only publish compatible versions
- Consumer obligation: deserialize with reader schema that resolves against writer schema
- Platform obligation: run Schema Registry HA, set global compatibility, audit subjects
Schema Registry role
Central repository for Avro, Protobuf, and JSON Schema definitions. Each registration gets a monotonically increasing schema ID and version per subject. Serializers fetch schema by ID on read; on write they register (or lookup) and prepend ID to payload. Incompatible schemas are rejected before a single byte hits the topic.
Mature orgs tie schema registration to CI: PR merges Avro .avsc changes → pipeline calls POST /compatibility/subjects/.../versions/latest against staging registry → only green builds deploy producers.
Confluent Schema Registry architecture
Schema Registry is a stateless-ish HTTP service backed by a compacted Kafka topic. Serializers are thin clients; the registry is source of truth for schema metadata and compatibility policy.
flowchart LR P[Producer\nAvro Serializer] SR[Schema Registry\nREST :8081] S[(_schemas topic\ncompacted)] K[(Kafka topic\norders)] C[Consumer\nAvro Deserializer] P -->|register / lookup ID| SR SR -->|persist| S P -->|0x00 + ID + bytes| K K --> C C -->|GET /schemas/ids/id| SR SR --> S
Storage: _schemas topic
All schema metadata lives in the internal compacted topic _schemas (configurable via kafkastore.topic). Keys are subject + version; values hold schema string, schema type, references. Compaction retains latest record per key— the registry rebuilds in-memory caches on startup by consuming this topic.
REST API
Primary operations (Confluent REST v1):
| Operation | Endpoint | Purpose |
|---|---|---|
| Register schema | POST /subjects/{subject}/versions |
New version if compatible; returns schema ID |
| Get schema by ID | GET /schemas/ids/{id} |
Deserializer lookup (cached aggressively) |
| Get latest | GET /subjects/{subject}/versions/latest |
CI/CD and tooling |
| Check compatibility | POST /compatibility/subjects/{subject}/versions/{ver} |
Pre-deploy gate without registering |
| List subjects | GET /subjects |
Inventory, governance |
| Delete subject / version | DELETE /subjects/... |
Soft delete (tombstone); use with care in prod |
# Register new schema version
curl -X POST http://schema-registry:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"long\"}]}"}'
# Check compatibility against latest (without registering)
curl -X POST "http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "..."}'
# Fetch schema used by on-wire ID 42
curl http://schema-registry:8081/schemas/ids/42
Schema ID and wire format
Confluent serializers prepend a 5-byte header to each message (key and value independently):
Byte layout (Confluent Platform wire format) ┌────────┬──────────────────────────────┬─────────────────────────┐ │ 0x00 │ Schema ID (4 bytes, big-endian) │ Serialized payload │ │ magic │ e.g. 0x00 0x00 0x00 0x2A (=42) │ Avro / Protobuf / JSON │ └────────┴──────────────────────────────┴─────────────────────────┘
- Magic byte 0x00: Confluent framing (distinct from raw Avro or Kafka’s own formats)
- Schema ID: global integer assigned at registration—same ID forever for that exact schema text
- Payload: binary Avro (default), Protobuf bytes, or JSON Schema serialized form
Deserializer reads magic byte → if not 0x00, fails fast (wrong serializer). Fetches schema for ID from local cache; on miss, GET /schemas/ids/{id}. Schema text is parsed once and cached per ID—hot paths avoid registry round-trips.
Subject naming strategies
A subject is the registry namespace for a schema lineage. Strategy determines how subject names are derived from topic + record:
| Strategy | Subject pattern | When to use |
|---|---|---|
| TopicNameStrategy (default) | {topic}-key, {topic}-value |
One event type per topic — most Kafka pipelines |
| RecordNameStrategy | Avro full record name (namespace + name) | Topic carries multiple Avro record types (envelope) |
| TopicRecordNameStrategy | {topic}-{recordName} |
Multi-type topic with per-topic isolation of same record name |
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
# alternatives:
# io.confluent.kafka.serializers.subject.RecordNameStrategy
# io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Topic-per-event-type + TopicNameStrategy keeps mental model simple: subject orders-value maps 1:1 to topic orders. RecordNameStrategy shines with Debezium topics where multiple tables share a connector namespace.
High availability
Run multiple Schema Registry instances behind a load balancer. All instances read/write the same _schemas topic; Kafka acts as coordination log. Leader election (older versions) or all-active mode depends on version— platform teams monitor registry lag and kafkastore connectivity like any critical dependency.
Avro — the preferred format
Avro is schema-first binary serialization designed for data pipelines. Unlike JSON, you cannot write or read without a schema— which is exactly what makes evolution and registry integration work.
Why Avro for Kafka
- Schema required for read and write — no ambiguous bytes
- Compact binary — smaller than JSON; no field names on wire (uses schema)
- Evolution built-in — resolution rules between writer and reader schemas
- Ecosystem — Kafka, Connect, Flink, Spark, Hive native support
Schema definition
Avro schemas are JSON documents: types, fields, defaults, namespaces, doc strings.
{
"type": "record",
"name": "Order",
"namespace": "com.acme.events",
"doc": "Order placed event",
"fields": [
{ "name": "orderId", "type": "long" },
{ "name": "customerId", "type": "string" },
{ "name": "amount", "type": "double", "default": 0.0 },
{ "name": "status", "type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "SHIPPED", "CANCELLED"]
}, "default": "PENDING"
},
{ "name": "tags", "type": { "type": "array", "items": "string" }, "default": [] }
]
}
Primitive types
| Avro type | Java mapping | Notes |
|---|---|---|
null | null | Only as union member |
boolean | boolean | 1 byte on wire |
int | int | 32-bit zigzag varint |
long | long | 64-bit zigzag varint |
float | float | 4-byte IEEE |
double | double | 8-byte IEEE |
bytes | byte[] / ByteBuffer | Length-prefixed |
string | String / Utf8 | Length-prefixed UTF-8 |
Complex types
| Type | Purpose | Example |
|---|---|---|
record | Structured object (like a class) | Order, Customer |
enum | Closed set of symbols | OrderStatus |
array | Ordered list | array<string> |
map | String-keyed map | map<double> |
union | One of several types | ["null","string"] for optional |
fixed | Fixed-size bytes | 16-byte UUID |
Writer schema + reader schema resolution
On deserialize, Avro uses the writer schema (embedded via registry ID in the message) and the reader schema (your application’s current model). Avro resolves differences field-by-field:
- Field in writer, not in reader: skipped (if reader doesn’t need it)
- Field in reader, not in writer: filled from default in reader schema
- Same name, compatible type promotion: e.g. int → long
- Field renamed: use aliases on reader schema to map old name
flowchart LR W[Writer schema v1\norderId, amount] B[Binary payload\non Kafka] R[Reader schema v2\norderId, amount, currency] O[Resolved Order object\ncurrency = default USD] W --> B B --> R R --> O
“How does Avro evolution work?” — Writer schema comes from the message ID; reader schema is your code. New optional fields need defaults on the reader side for backward compatibility. Renames are not automatic—use aliases or treat as remove + add.
GenericRecord vs SpecificRecord
| API | Usage | Trade-off |
|---|---|---|
| GenericRecord | record.get("orderId") — dynamic field access |
Flexible; no compile-time safety; good for tooling |
| SpecificRecord (generated) | order.getOrderId() — Java POJO from avro-maven-plugin |
Type-safe; refactor-friendly; standard in services |
Order order = Order.newBuilder()
.setOrderId(9001L)
.setCustomerId("cust-42")
.setAmount(149.99)
.setStatus(OrderStatus.PENDING)
.build();
ProducerRecord<String, Order> record =
new ProducerRecord<>("orders", order.getOrderId().toString(), order);
producer.send(record);
Schema compatibility types
Compatibility defines which schema transitions are legal. Set globally (/config) or per subject. Registry runs Avro rules (or Protobuf/JSON rules) before accepting a new version.
| Mode | Rule (intuition) | New schema can… |
|---|---|---|
| BACKWARD | New consumers read old data | Add fields with default; remove fields without default |
| FORWARD | Old consumers read new data | Add fields without default; remove fields with default |
| FULL | Backward + forward (vs previous version only) | Add/remove only with defaults on the right side |
| NONE | No checks | Anything—dangerous in shared topics |
Transitive variants
Non-transitive modes compare new schema only against the latest version. TRANSITIVE modes compare against all historical versions in the subject—stricter, catches multi-hop breaks.
- BACKWARD_TRANSITIVE — new schema must read data from every older version
- FORWARD_TRANSITIVE — every older schema must read new data
- FULL_TRANSITIVE — both directions against full history
Production recommendation: FULL_TRANSITIVE globally or per critical subject. Strictest safe default—prevents “A→B compatible, B→C compatible, but A vs C breaks” chains. Loosen to BACKWARD only if you control all consumers and deploy them before producers.
curl -X PUT http://schema-registry:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL_TRANSITIVE"}'
# Per-subject override
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
Compatibility examples
BACKWARD-safe change — add optional field with default:
{ "name": "currency", "type": "string", "default": "USD" }
BACKWARD-breaking — add required field (no default): old writers don’t populate it; old readers may not expect it depending on direction.
FORWARD-breaking — remove field without default: new writers stop sending it; old readers still require it.
FULL_TRANSITIVE blocks some pragmatic changes (tightening enums, removing deprecated fields) until you run a coordinated migration. That friction is the point—it forces explicit consumer upgrades or reserved-field patterns instead of Friday-night surprises.
Schema evolution patterns
Evolution is a design discipline, not a one-time schema dump. Plan additive changes, deprecate explicitly, and never rename silently.
Safe changes
- Add field with default — BACKWARD and FULL friendly; old messages deserialize with default
- Remove field that had a default — FORWARD friendly; old consumers still see default when reading new data
- Add optional union — ["null","string"] with default null
- Widen numeric type — int → long (promotion rules)
Unsafe changes
- Rename field — Avro sees remove + add; use alias instead
- Change type — string → int breaks resolution; widen only
- Remove required field — breaks FORWARD unless consumers upgraded
- Reorder fields — safe on wire (order by schema, not JSON) but confuses human diffs
Alias trick for renames
Add new canonical name; attach aliases with old name so reader schema resolves both:
{
"name": "customer_id",
"type": "string",
"aliases": ["customerId"]
}
Deploy consumer with aliased reader schema first, then producer writing customer_id. Registry still sees a structural change—plan compatibility mode and version bump accordingly.
Reserved fields pattern
Before removing a field consumers might still read:
- Stop writing the field in producer (or write null/default)
- Mark field doc: "DEPRECATED — remove in v4"
- Confirm all consumers on reader schema without the field
- Remove in new major schema version under FULL_TRANSITIVE only when safe
{
"name": "legacyStatusCode",
"type": ["null", "int"],
"default": null,
"doc": "RESERVED: do not use. Will be removed when all consumers >= v2.3."
}
Changing enum symbols (removing SHIPPED) breaks compatibility—treat enums as closed contracts. Prefer adding new enum type or string field with validation in application code.
Versioning workflow
sequenceDiagram participant Dev as Developer participant CI as CI pipeline participant SR as Schema Registry participant Cons as Consumers participant Prod as Producer Dev->>CI: PR updates .avsc CI->>SR: compatibility check SR-->>CI: compatible ✓ CI->>Cons: deploy consumers (new reader) Cons-->>Prod: ready Prod->>SR: register + publish vN
Protobuf & JSON Schema support
Schema Registry is not Avro-only. Protobuf and JSON Schema use the same subject/version/ID model with format-specific compatibility rules—pick per boundary, not per preference.
| Format | Strengths | Best for |
|---|---|---|
| Avro | Compact; evolution via reader/writer; Hadoop/Kafka native | Internal Kafka pipelines, data lakes |
| Protobuf | Field numbers (not names) on wire; gRPC; strong cross-language | Microservices + gRPC + Kafka dual-write, polyglot teams |
| JSON Schema | Human-readable; flexible; web-friendly | External REST APIs, partner integrations, rapid prototyping |
Protobuf
Messages identified by field numbers—renaming a field does not break wire compatibility if number stays same. Never reuse field numbers. reserved blocks deprecated numbers. Schema Registry stores .proto text; serializer uses Confluent wire format (same magic byte + schema ID).
syntax = "proto3";
package com.acme.events;
message Order {
int64 order_id = 1;
string customer_id = 2;
double amount = 3;
OrderStatus status = 4;
reserved 5; // formerly legacy_code
reserved "old_field";
}
enum OrderStatus {
PENDING = 0;
SHIPPED = 1;
CANCELLED = 2;
}
Forward compatibility: old code ignores unknown fields (proto3). Adding fields is safe; removing without reservation is not. Excellent when same models serve gRPC and Kafka.
JSON Schema
Most flexible, least safe—schemas can be permissive (additionalProperties: true). Registry supports draft versions; compatibility checking compares JSON Schema structures. Useful when partners submit JSON and you want registry governance without Avro codegen.
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Order",
"type": "object",
"required": ["orderId", "customerId"],
"properties": {
"orderId": { "type": "integer" },
"customerId": { "type": "string" },
"amount": { "type": "number", "default": 0 }
},
"additionalProperties": false
}
Choosing a format
- Internal Kafka, JVM + analytics: Avro + SpecificRecord
- Cross-system with gRPC: Protobuf everywhere
- External REST / webhook ingress: JSON Schema or schemaless JSON with validation service
- Connect sink to BigQuery: Avro/Protobuf with registry often required for strongly typed tables
Three formats on one cluster multiplies serializer config and subject governance. Standardize per topic tier (internal vs external), not per team preference—ops and on-call will thank you.
Spring Boot + Schema Registry
Spring Kafka abstracts producer/consumer factories—swap JSON for Avro by changing serializer classes and a handful of properties. Disable auto-registration in production; register schemas via CI instead.
Dependencies
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
KafkaAvroSerializer / KafkaAvroDeserializer
spring:
kafka:
bootstrap-servers: kafka:9092
properties:
schema.registry.url: http://schema-registry:8081
specific.avro.reader: true
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
auto.register.schemas: false
use.latest.version: true
consumer:
group-id: order-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
| Property | Production guidance |
|---|---|
schema.registry.url |
LB URL or K8s service; comma-separated for failover URLs |
auto.register.schemas |
false — register in CI/CD only |
use.latest.version |
true for producers using latest registered schema text |
specific.avro.reader |
true → deserialize to SpecificRecord, not GenericRecord |
value.subject.name.strategy |
TopicNameStrategy unless multi-type topic |
@KafkaListener with Avro
@KafkaListener(topics = "orders", groupId = "order-service")
public void onOrder(ConsumerRecord<String, Order> record) {
Order order = record.value();
log.info("orderId={} amount={}", order.getOrderId(), order.getAmount());
}
SchemaRegistryClient bean
For admin tasks—compatibility checks, explicit registration, subject listing—inject Confluent’s client. @EnableSchemaRegistryClient (Spring Cloud Stream schema registry starters) or manual bean:
@Configuration
public class SchemaRegistryConfig {
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.kafka.properties.schema.registry.url}") String url) {
Map<String, Object> props = Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url
);
return new CachedSchemaRegistryClient(
List.of(url),
1000,
List.of(new AvroSchemaProvider()),
props
);
}
}
// Usage: preflight in admin endpoint or startup health
// schemaRegistryClient.testCompatibility("orders-value", new AvroSchema(schemaString));
@Service
public class OrderPublisher {
private final KafkaTemplate<String, Order> kafkaTemplate;
public OrderPublisher(KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publish(Order order) {
kafkaTemplate.send("orders", order.getOrderId().toString(), order);
}
}
Generate Avro classes in CI from the same .avsc committed to git—never hand-edit generated Order.java. Pair avro-maven-plugin with registry compatibility check in the same pipeline stage.
Why auto.register.schemas=false? Prevents a buggy producer from polluting registry with accidental schemas in prod. Registration becomes a governed act—CI registers, prod serializers only lookup IDs.