Schema Registry & Data Contracts

Kafka moves bytes fast—but without a data contract, a producer renaming one field can silently break every downstream consumer. Confluent Schema Registry is the gatekeeper: versioned schemas, compatibility checks, and a 4-byte schema ID embedded in every Avro/Protobuf/JSON Schema message.

developer senior Schema Registry 7.x

Why schema management matters

Event streams outlive individual services. The schema is the contract between teams—without enforcement, “small” producer changes become production incidents weeks later in a warehouse or analytics job.

Schema-on-write vs schema-on-read

Approach When schema is enforced Typical stack
Schema-on-write At produce time — invalid payloads rejected Kafka + Schema Registry + Avro
Schema-on-read At consume time — reader interprets raw bytes JSON topics, data lakes, exploratory analytics

Schema-on-write pushes failures left: the producer cannot publish a breaking change if compatibility checks fail. Schema-on-read is flexible but shifts burden to every consumer—each must handle missing fields, type changes, and renames independently.

The silent breakage problem

Without a registry, this sequence is common:

  1. Team A ships v2 of OrderEvent — renames customerIdcustomer_id
  2. Kafka accepts JSON bytes — no validation at broker
  3. Team B’s Flink job still reads customerId → null for all new records
  4. Revenue dashboard under-reports for three days until someone notices
⚠️ Pitfall

JSON on Kafka feels fast in week one. In month six, fifteen consumers parse slightly different shapes. Schema Registry + Avro costs upfront ceremony; it pays back the first time you block an incompatible deploy in CI.

Data contract

A data contract is the agreed structure of a topic’s key and value: field names, types, semantics, defaults, and evolution rules. It is owned by the producing team, reviewed by consumers, and enforced at registration time—not documented in a wiki and hoped for.

  • Producer obligation: register schema before writing; only publish compatible versions
  • Consumer obligation: deserialize with reader schema that resolves against writer schema
  • Platform obligation: run Schema Registry HA, set global compatibility, audit subjects

Schema Registry role

Central repository for Avro, Protobuf, and JSON Schema definitions. Each registration gets a monotonically increasing schema ID and version per subject. Serializers fetch schema by ID on read; on write they register (or lookup) and prepend ID to payload. Incompatible schemas are rejected before a single byte hits the topic.

📦 Real World

Mature orgs tie schema registration to CI: PR merges Avro .avsc changes → pipeline calls POST /compatibility/subjects/.../versions/latest against staging registry → only green builds deploy producers.

Confluent Schema Registry architecture

Schema Registry is a stateless-ish HTTP service backed by a compacted Kafka topic. Serializers are thin clients; the registry is source of truth for schema metadata and compatibility policy.

flowchart LR
  P[Producer\nAvro Serializer]
  SR[Schema Registry\nREST :8081]
  S[(_schemas topic\ncompacted)]
  K[(Kafka topic\norders)]
  C[Consumer\nAvro Deserializer]

  P -->|register / lookup ID| SR
  SR -->|persist| S
  P -->|0x00 + ID + bytes| K
  K --> C
  C -->|GET /schemas/ids/id| SR
  SR --> S

Storage: _schemas topic

All schema metadata lives in the internal compacted topic _schemas (configurable via kafkastore.topic). Keys are subject + version; values hold schema string, schema type, references. Compaction retains latest record per key— the registry rebuilds in-memory caches on startup by consuming this topic.

REST API

Primary operations (Confluent REST v1):

Operation Endpoint Purpose
Register schema POST /subjects/{subject}/versions New version if compatible; returns schema ID
Get schema by ID GET /schemas/ids/{id} Deserializer lookup (cached aggressively)
Get latest GET /subjects/{subject}/versions/latest CI/CD and tooling
Check compatibility POST /compatibility/subjects/{subject}/versions/{ver} Pre-deploy gate without registering
List subjects GET /subjects Inventory, governance
Delete subject / version DELETE /subjects/... Soft delete (tombstone); use with care in prod
bash — register and check compatibility
# Register new schema version
curl -X POST http://schema-registry:8081/subjects/orders-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"long\"}]}"}'

# Check compatibility against latest (without registering)
curl -X POST "http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "..."}'

# Fetch schema used by on-wire ID 42
curl http://schema-registry:8081/schemas/ids/42

Schema ID and wire format

Confluent serializers prepend a 5-byte header to each message (key and value independently):

Byte layout (Confluent Platform wire format)
┌────────┬──────────────────────────────┬─────────────────────────┐
│ 0x00   │ Schema ID (4 bytes, big-endian) │ Serialized payload      │
│ magic  │ e.g. 0x00 0x00 0x00 0x2A (=42)  │ Avro / Protobuf / JSON  │
└────────┴──────────────────────────────┴─────────────────────────┘
  • Magic byte 0x00: Confluent framing (distinct from raw Avro or Kafka’s own formats)
  • Schema ID: global integer assigned at registration—same ID forever for that exact schema text
  • Payload: binary Avro (default), Protobuf bytes, or JSON Schema serialized form
🔬 Under the Hood

Deserializer reads magic byte → if not 0x00, fails fast (wrong serializer). Fetches schema for ID from local cache; on miss, GET /schemas/ids/{id}. Schema text is parsed once and cached per ID—hot paths avoid registry round-trips.

Subject naming strategies

A subject is the registry namespace for a schema lineage. Strategy determines how subject names are derived from topic + record:

Strategy Subject pattern When to use
TopicNameStrategy (default) {topic}-key, {topic}-value One event type per topic — most Kafka pipelines
RecordNameStrategy Avro full record name (namespace + name) Topic carries multiple Avro record types (envelope)
TopicRecordNameStrategy {topic}-{recordName} Multi-type topic with per-topic isolation of same record name
Properties — subject strategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
# alternatives:
# io.confluent.kafka.serializers.subject.RecordNameStrategy
# io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
💡 Pro Tip

Topic-per-event-type + TopicNameStrategy keeps mental model simple: subject orders-value maps 1:1 to topic orders. RecordNameStrategy shines with Debezium topics where multiple tables share a connector namespace.

High availability

Run multiple Schema Registry instances behind a load balancer. All instances read/write the same _schemas topic; Kafka acts as coordination log. Leader election (older versions) or all-active mode depends on version— platform teams monitor registry lag and kafkastore connectivity like any critical dependency.

Avro — the preferred format

Avro is schema-first binary serialization designed for data pipelines. Unlike JSON, you cannot write or read without a schema— which is exactly what makes evolution and registry integration work.

Why Avro for Kafka

  • Schema required for read and write — no ambiguous bytes
  • Compact binary — smaller than JSON; no field names on wire (uses schema)
  • Evolution built-in — resolution rules between writer and reader schemas
  • Ecosystem — Kafka, Connect, Flink, Spark, Hive native support

Schema definition

Avro schemas are JSON documents: types, fields, defaults, namespaces, doc strings.

JSON — Avro record schema
{
  "type": "record",
  "name": "Order",
  "namespace": "com.acme.events",
  "doc": "Order placed event",
  "fields": [
    { "name": "orderId", "type": "long" },
    { "name": "customerId", "type": "string" },
    { "name": "amount", "type": "double", "default": 0.0 },
    { "name": "status", "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["PENDING", "SHIPPED", "CANCELLED"]
      }, "default": "PENDING"
    },
    { "name": "tags", "type": { "type": "array", "items": "string" }, "default": [] }
  ]
}

Primitive types

Avro type Java mapping Notes
nullnullOnly as union member
booleanboolean1 byte on wire
intint32-bit zigzag varint
longlong64-bit zigzag varint
floatfloat4-byte IEEE
doubledouble8-byte IEEE
bytesbyte[] / ByteBufferLength-prefixed
stringString / Utf8Length-prefixed UTF-8

Complex types

Type Purpose Example
recordStructured object (like a class)Order, Customer
enumClosed set of symbolsOrderStatus
arrayOrdered listarray<string>
mapString-keyed mapmap<double>
unionOne of several types["null","string"] for optional
fixedFixed-size bytes16-byte UUID

Writer schema + reader schema resolution

On deserialize, Avro uses the writer schema (embedded via registry ID in the message) and the reader schema (your application’s current model). Avro resolves differences field-by-field:

  • Field in writer, not in reader: skipped (if reader doesn’t need it)
  • Field in reader, not in writer: filled from default in reader schema
  • Same name, compatible type promotion: e.g. intlong
  • Field renamed: use aliases on reader schema to map old name
flowchart LR
  W[Writer schema v1\norderId, amount]
  B[Binary payload\non Kafka]
  R[Reader schema v2\norderId, amount, currency]
  O[Resolved Order object\ncurrency = default USD]

  W --> B
  B --> R
  R --> O
🎯 Interview Tip

“How does Avro evolution work?” — Writer schema comes from the message ID; reader schema is your code. New optional fields need defaults on the reader side for backward compatibility. Renames are not automatic—use aliases or treat as remove + add.

GenericRecord vs SpecificRecord

API Usage Trade-off
GenericRecord record.get("orderId") — dynamic field access Flexible; no compile-time safety; good for tooling
SpecificRecord (generated) order.getOrderId() — Java POJO from avro-maven-plugin Type-safe; refactor-friendly; standard in services
Java — SpecificRecord producer
Order order = Order.newBuilder()
    .setOrderId(9001L)
    .setCustomerId("cust-42")
    .setAmount(149.99)
    .setStatus(OrderStatus.PENDING)
    .build();

ProducerRecord<String, Order> record =
    new ProducerRecord<>("orders", order.getOrderId().toString(), order);
producer.send(record);

Schema compatibility types

Compatibility defines which schema transitions are legal. Set globally (/config) or per subject. Registry runs Avro rules (or Protobuf/JSON rules) before accepting a new version.

Mode Rule (intuition) New schema can…
BACKWARD New consumers read old data Add fields with default; remove fields without default
FORWARD Old consumers read new data Add fields without default; remove fields with default
FULL Backward + forward (vs previous version only) Add/remove only with defaults on the right side
NONE No checks Anything—dangerous in shared topics

Transitive variants

Non-transitive modes compare new schema only against the latest version. TRANSITIVE modes compare against all historical versions in the subject—stricter, catches multi-hop breaks.

  • BACKWARD_TRANSITIVE — new schema must read data from every older version
  • FORWARD_TRANSITIVE — every older schema must read new data
  • FULL_TRANSITIVE — both directions against full history
⚙️ Config

Production recommendation: FULL_TRANSITIVE globally or per critical subject. Strictest safe default—prevents “A→B compatible, B→C compatible, but A vs C breaks” chains. Loosen to BACKWARD only if you control all consumers and deploy them before producers.

bash — set global compatibility
curl -X PUT http://schema-registry:8081/config \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL_TRANSITIVE"}'

# Per-subject override
curl -X PUT http://schema-registry:8081/config/orders-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

Compatibility examples

BACKWARD-safe change — add optional field with default:

JSON — v2 adds currency (BACKWARD OK)
{ "name": "currency", "type": "string", "default": "USD" }

BACKWARD-breaking — add required field (no default): old writers don’t populate it; old readers may not expect it depending on direction.

FORWARD-breaking — remove field without default: new writers stop sending it; old readers still require it.

⚖️ Trade-off

FULL_TRANSITIVE blocks some pragmatic changes (tightening enums, removing deprecated fields) until you run a coordinated migration. That friction is the point—it forces explicit consumer upgrades or reserved-field patterns instead of Friday-night surprises.

Schema evolution patterns

Evolution is a design discipline, not a one-time schema dump. Plan additive changes, deprecate explicitly, and never rename silently.

Safe changes

  • Add field with default — BACKWARD and FULL friendly; old messages deserialize with default
  • Remove field that had a default — FORWARD friendly; old consumers still see default when reading new data
  • Add optional union["null","string"] with default null
  • Widen numeric typeintlong (promotion rules)

Unsafe changes

  • Rename field — Avro sees remove + add; use alias instead
  • Change typestringint breaks resolution; widen only
  • Remove required field — breaks FORWARD unless consumers upgraded
  • Reorder fields — safe on wire (order by schema, not JSON) but confuses human diffs

Alias trick for renames

Add new canonical name; attach aliases with old name so reader schema resolves both:

JSON — rename customerId → customer_id via alias
{
  "name": "customer_id",
  "type": "string",
  "aliases": ["customerId"]
}

Deploy consumer with aliased reader schema first, then producer writing customer_id. Registry still sees a structural change—plan compatibility mode and version bump accordingly.

Reserved fields pattern

Before removing a field consumers might still read:

  1. Stop writing the field in producer (or write null/default)
  2. Mark field doc: "DEPRECATED — remove in v4"
  3. Confirm all consumers on reader schema without the field
  4. Remove in new major schema version under FULL_TRANSITIVE only when safe
JSON — reserved sentinel before removal
{
  "name": "legacyStatusCode",
  "type": ["null", "int"],
  "default": null,
  "doc": "RESERVED: do not use. Will be removed when all consumers >= v2.3."
}
⚠️ Pitfall

Changing enum symbols (removing SHIPPED) breaks compatibility—treat enums as closed contracts. Prefer adding new enum type or string field with validation in application code.

Versioning workflow

sequenceDiagram
  participant Dev as Developer
  participant CI as CI pipeline
  participant SR as Schema Registry
  participant Cons as Consumers
  participant Prod as Producer

  Dev->>CI: PR updates .avsc
  CI->>SR: compatibility check
  SR-->>CI: compatible ✓
  CI->>Cons: deploy consumers (new reader)
  Cons-->>Prod: ready
  Prod->>SR: register + publish vN

Protobuf & JSON Schema support

Schema Registry is not Avro-only. Protobuf and JSON Schema use the same subject/version/ID model with format-specific compatibility rules—pick per boundary, not per preference.

Format Strengths Best for
Avro Compact; evolution via reader/writer; Hadoop/Kafka native Internal Kafka pipelines, data lakes
Protobuf Field numbers (not names) on wire; gRPC; strong cross-language Microservices + gRPC + Kafka dual-write, polyglot teams
JSON Schema Human-readable; flexible; web-friendly External REST APIs, partner integrations, rapid prototyping

Protobuf

Messages identified by field numbers—renaming a field does not break wire compatibility if number stays same. Never reuse field numbers. reserved blocks deprecated numbers. Schema Registry stores .proto text; serializer uses Confluent wire format (same magic byte + schema ID).

protobuf — Order message
syntax = "proto3";
package com.acme.events;

message Order {
  int64 order_id = 1;
  string customer_id = 2;
  double amount = 3;
  OrderStatus status = 4;
  reserved 5;           // formerly legacy_code
  reserved "old_field";
}

enum OrderStatus {
  PENDING = 0;
  SHIPPED = 1;
  CANCELLED = 2;
}

Forward compatibility: old code ignores unknown fields (proto3). Adding fields is safe; removing without reservation is not. Excellent when same models serve gRPC and Kafka.

JSON Schema

Most flexible, least safe—schemas can be permissive (additionalProperties: true). Registry supports draft versions; compatibility checking compares JSON Schema structures. Useful when partners submit JSON and you want registry governance without Avro codegen.

JSON — JSON Schema for Order
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Order",
  "type": "object",
  "required": ["orderId", "customerId"],
  "properties": {
    "orderId": { "type": "integer" },
    "customerId": { "type": "string" },
    "amount": { "type": "number", "default": 0 }
  },
  "additionalProperties": false
}

Choosing a format

  • Internal Kafka, JVM + analytics: Avro + SpecificRecord
  • Cross-system with gRPC: Protobuf everywhere
  • External REST / webhook ingress: JSON Schema or schemaless JSON with validation service
  • Connect sink to BigQuery: Avro/Protobuf with registry often required for strongly typed tables
⚖️ Trade-off

Three formats on one cluster multiplies serializer config and subject governance. Standardize per topic tier (internal vs external), not per team preference—ops and on-call will thank you.

Spring Boot + Schema Registry

Spring Kafka abstracts producer/consumer factories—swap JSON for Avro by changing serializer classes and a handful of properties. Disable auto-registration in production; register schemas via CI instead.

Dependencies

xml — Maven coordinates
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>7.6.0</version>
</dependency>

KafkaAvroSerializer / KafkaAvroDeserializer

yaml — application.yml
spring:
  kafka:
    bootstrap-servers: kafka:9092
    properties:
      schema.registry.url: http://schema-registry:8081
      specific.avro.reader: true
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        auto.register.schemas: false
        use.latest.version: true
    consumer:
      group-id: order-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
Property Production guidance
schema.registry.url LB URL or K8s service; comma-separated for failover URLs
auto.register.schemas false — register in CI/CD only
use.latest.version true for producers using latest registered schema text
specific.avro.reader true → deserialize to SpecificRecord, not GenericRecord
value.subject.name.strategy TopicNameStrategy unless multi-type topic

@KafkaListener with Avro

Java — consumer
@KafkaListener(topics = "orders", groupId = "order-service")
public void onOrder(ConsumerRecord<String, Order> record) {
  Order order = record.value();
  log.info("orderId={} amount={}", order.getOrderId(), order.getAmount());
}

SchemaRegistryClient bean

For admin tasks—compatibility checks, explicit registration, subject listing—inject Confluent’s client. @EnableSchemaRegistryClient (Spring Cloud Stream schema registry starters) or manual bean:

Java — SchemaRegistryClient configuration
@Configuration
public class SchemaRegistryConfig {

  @Bean
  public SchemaRegistryClient schemaRegistryClient(
      @Value("${spring.kafka.properties.schema.registry.url}") String url) {
    Map<String, Object> props = Map.of(
        AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url
    );
    return new CachedSchemaRegistryClient(
        List.of(url),
        1000,
        List.of(new AvroSchemaProvider()),
        props
    );
  }
}

// Usage: preflight in admin endpoint or startup health
// schemaRegistryClient.testCompatibility("orders-value", new AvroSchema(schemaString));
Java — KafkaTemplate with Avro
@Service
public class OrderPublisher {
  private final KafkaTemplate<String, Order> kafkaTemplate;

  public OrderPublisher(KafkaTemplate<String, Order> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  public void publish(Order order) {
    kafkaTemplate.send("orders", order.getOrderId().toString(), order);
  }
}
💡 Pro Tip

Generate Avro classes in CI from the same .avsc committed to git—never hand-edit generated Order.java. Pair avro-maven-plugin with registry compatibility check in the same pipeline stage.

🎯 Interview Tip

Why auto.register.schemas=false? Prevents a buggy producer from polluting registry with accidental schemas in prod. Registration becomes a governed act—CI registers, prod serializers only lookup IDs.