Data Contracts: The Missing Layer Between Teams
The analytics team ran their Monday pipeline. It failed. They traced it to a column rename in the orders table — customer_id had become buyer_id — that the backend team had shipped on Friday. The backend team had, in their view, done everything correctly: they had updated the ORM, migrated the column, updated the API documentation, and sent a Slack message to the general engineering channel. The analytics team had, in their view, done everything correctly: they had built their pipeline against the documented schema. Nobody had lied. There was simply no layer between them that would have made the conflict visible before it broke.
This is the canonical data contract failure. Not malice. Not negligence. A missing agreement.
Data contracts are that missing layer. They are not a schema registry, though a schema registry is part of the implementation. They are not documentation, though they read like documentation. They are the enforceable, versioned, owned agreement between a data producer and its consumers — covering schema, semantics, freshness, and quality expectations.
The Anatomy of a Data Contract
A data contract has five components. Teams that implement only the schema component wonder why contracts don't solve their coordination problems.
A concrete contract in YAML:
# contracts/orders/order-created-event.yaml
apiVersion: data-contracts/v1
kind: DataContract
metadata:
name: order-created-event
version: "2.1.0"
owner: payments-team
contact: payments-oncall@company.com
consumers:
- team: analytics
use: revenue-reporting pipeline
- team: fulfillment
use: order fulfillment trigger
- team: notifications
use: order confirmation email
created: "2024-03-15"
last_modified: "2025-10-02"
sla:
freshness_sla_minutes: 5
availability_target_percent: 99.9
breaking_change_notice_days: 14
deprecation_notice_days: 30
schema:
format: json-schema
path: schemas/order-created-event-v2.1.json
quality_rules:
- field: order_id
rule: not_null
severity: critical
- field: buyer_id
rule: not_null
severity: critical
- field: total_cents
rule: "value > 0"
severity: critical
- field: currency
rule: "value in ['USD', 'EUR', 'GBP', 'JPY', 'CAD', 'AUD']"
severity: error
- field: items
rule: "array_length >= 1"
severity: critical
- field: created_at
rule: "value <= now() + interval '1 minute'"
severity: warning
description: "Timestamps should not be in the future"And the associated JSON Schema:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "order-created-event-v2.1.json",
"title": "OrderCreatedEvent",
"description": "Emitted when an order is successfully created and payment confirmed",
"type": "object",
"required": ["order_id", "buyer_id", "total_cents", "currency", "items", "created_at"],
"properties": {
"order_id": {
"type": "string",
"description": "Stable, globally unique order identifier. Format: ord_{ulid}",
"pattern": "^ord_[0-9A-Z]{26}$"
},
"buyer_id": {
"type": "string",
"description": "Identifier of the purchasing user. Renamed from customer_id in v2.0.0",
"pattern": "^usr_[0-9A-Z]{26}$"
},
"total_cents": {
"type": "integer",
"minimum": 1,
"description": "Total order value in the smallest currency unit (cents for USD)"
},
"currency": {
"type": "string",
"enum": ["USD", "EUR", "GBP", "JPY", "CAD", "AUD"]
},
"items": {
"type": "array",
"minItems": 1,
"items": {
"$ref": "#/definitions/OrderItem"
}
},
"created_at": {
"type": "string",
"format": "date-time"
}
}
}Schema Registry: Enforcement at Publish Time
A contract written in a YAML file is documentation. A contract enforced at publish time is infrastructure.
The schema registry — Confluent Schema Registry is the most common for Kafka, but AWS Glue Schema Registry and Apicurio are also widely used — stores canonical schemas and enforces compatibility rules when a producer attempts to register a new version.
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_conf = {'url': 'https://schema-registry.internal'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Schema evolution with backward compatibility
# Consumers on v1 can read v2 events — new optional fields only
order_event_schema_v2 = """
{
"type": "record",
"name": "OrderCreatedEvent",
"namespace": "com.company.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "buyer_id", "type": "string"},
{"name": "total_cents", "type": "long"},
{"name": "currency", "type": "string"},
{"name": "created_at", "type": "string"},
{
"name": "fulfillment_type",
"type": ["null", "string"],
"default": null,
"doc": "Added in v2: null for standard fulfillment"
}
]
}
"""
# This call will FAIL if the new schema breaks backward compatibility
# e.g., if we renamed buyer_id back to customer_id
schema = Schema(order_event_schema_v2, schema_type="AVRO")
schema_registry_client.register_schema(
"order-created-event-value",
schema
)Configure your subject-level compatibility mode deliberately:
- BACKWARD: New schema can read data written by old schema. Consumers can be upgraded before producers. This is the default and usually the right choice.
- FORWARD: Old schema can read data written by new schema. Producers can be upgraded before consumers.
- FULL: Both backward and forward. Required when you have long-lived consumers you can't coordinate upgrades with.
- NONE: No enforcement. Only acceptable for development environments.
# Set compatibility mode for a specific subject
curl -X PUT \
https://schema-registry.internal/config/order-created-event-value \
-H 'Content-Type: application/json' \
-d '{"compatibility": "BACKWARD"}'Ownership: The Part Nobody Builds First
Schema enforcement without ownership is incomplete. When a contract is violated — a freshness SLA breach, a schema incompatibility, a quality rule failure — who is responsible and who needs to be notified?
We maintain a contract registry: a Git repository of contract YAML files, where the owner field is a team name resolvable to a PagerDuty schedule and a Slack channel. CI validates that every contract has a non-empty, valid owner before merging.
# scripts/validate_contracts.py
import yaml
from pathlib import Path
VALID_TEAMS = load_team_registry() # From HR system or Backstage catalog
errors = []
for contract_file in Path("contracts/").rglob("*.yaml"):
with open(contract_file) as f:
contract = yaml.safe_load(f)
owner = contract.get("metadata", {}).get("owner")
if not owner:
errors.append(f"{contract_file}: missing owner")
elif owner not in VALID_TEAMS:
errors.append(f"{contract_file}: owner '{owner}' is not a registered team")
consumers = contract.get("metadata", {}).get("consumers", [])
for consumer in consumers:
if consumer.get("team") not in VALID_TEAMS:
errors.append(f"{contract_file}: consumer team '{consumer['team']}' is not registered")
if errors:
print("Contract validation failed:")
for e in errors:
print(f" {e}")
raise SystemExit(1)SLAs: Making the Implicit Explicit
The orders column rename broke the analytics pipeline because nobody had agreed on a notice period for breaking changes. The backend team's Friday Slack message was not the problem — the missing SLA was.
A data SLA has three components:
Freshness SLA: The maximum acceptable lag between a domain event occurring and the data being available in the agreed delivery mechanism. For a Kafka topic, this is the end-to-end latency from write to consumer offset. For a data warehouse table, this is the pipeline lag.
Availability SLA: The percentage of time the data delivery mechanism is available and producing non-stale data. This is different from the producer service's availability — a healthy service can still produce stale data if its pipeline is stuck.
Breaking change notice period: The minimum number of calendar days between the producer announcing a breaking schema change and the change taking effect in production. We standardize on 14 days for internal consumers and 30 days for external consumers.
Monitor SLA compliance with pipeline-level freshness checks:
-- Detect freshness SLA breaches in a data warehouse
-- Runs every 5 minutes; fires alert if any contract is breached
SELECT
contract_name,
max(event_created_at) AS last_event_time,
current_timestamp - max(event_created_at) AS lag,
freshness_sla_minutes,
CASE
WHEN extract(epoch from (current_timestamp - max(event_created_at))) / 60
> freshness_sla_minutes THEN 'BREACHED'
ELSE 'OK'
END AS sla_status
FROM contract_registry cr
JOIN data_events de ON cr.event_type = de.event_type
WHERE cr.monitoring_enabled = true
GROUP BY contract_name, freshness_sla_minutes
HAVING sla_status = 'BREACHED';Migrations Through Contracts
The customer_id → buyer_id rename that broke the analytics team was a migration. Done through a data contract, it looks like this:
The dual-emit period is the contract's migration path. During those 14 days, the producer emits both customer_id (deprecated, same value as buyer_id) and buyer_id. Consumers can migrate at their own pace within the window. After the window closes, the deprecated field is removed.
class OrderEventProducer:
def __init__(self, transition_period_active: bool):
self.transition_period_active = transition_period_active
def build_event(self, order: Order) -> dict:
event = {
"order_id": order.id,
"buyer_id": order.buyer_id, # New canonical field
"total_cents": order.total_cents,
"currency": order.currency,
"created_at": order.created_at.isoformat(),
}
# Emit deprecated field during transition window only
if self.transition_period_active:
event["customer_id"] = order.buyer_id # Deprecated alias
return eventThe transition_period_active flag is controlled by a feature flag with a hard expiry set to the end of the 14-day window. When the flag expires, the deprecated field disappears automatically — no manual cleanup required.
Quality Rules: The Contract's Truth Guarantee
A contract that guarantees schema compatibility but not data quality is half a contract. Quality rules encode the invariants that make the data useful.
Run quality checks in your pipeline and route failures by severity:
class ContractQualityValidator:
def validate(self, event: dict, contract: DataContract) -> ValidationResult:
violations = []
for rule in contract.quality_rules:
field_value = event.get(rule.field)
if not self._evaluate(field_value, rule.rule):
violations.append(QualityViolation(
field=rule.field,
rule=rule.rule,
value=field_value,
severity=rule.severity,
))
critical = [v for v in violations if v.severity == "critical"]
errors = [v for v in violations if v.severity == "error"]
warnings = [v for v in violations if v.severity == "warning"]
return ValidationResult(
passed=len(critical) == 0, # Critical violations fail the event
violations=violations,
should_dead_letter=len(critical) > 0,
should_alert=len(critical) > 0 or len(errors) > 5,
)Critical violations go to a dead-letter queue and page the producer team. Error-level violations are collected and reported in a daily digest. Warnings are tracked in dashboards but don't trigger alerts. This tiered severity model keeps the producer team's alert fatigue low while ensuring critical data integrity issues get immediate attention.
Key Takeaways
- A data contract has five components: ownership, schema, SLAs, quality rules, and migration conventions. Teams that implement only the schema component find that contracts don't solve their coordination problems.
- Enforce contracts at publish time through a schema registry, not just at review time through documentation. A BACKWARD-compatible schema registry blocks breaking changes before they reach consumers.
- The notice period SLA — minimum days between announcing a breaking change and shipping it — is the component most teams forget to define. Fourteen days for internal consumers is a reasonable starting point.
- Dual-emit during migration windows allows producers and consumers to decouple their migration timelines. Emit both the old and new field for the duration of the notice period; remove the old field when the window closes.
- Quality rules encode data invariants that schema alone cannot express: null rate bounds, value set membership, referential integrity. Run them in the pipeline and route violations by severity — critical to dead-letter and page, warnings to dashboards.
- Ownership is the load-bearing pillar. Without a named team and escalation path on every contract, enforcement has no one to escalate to when SLAs are breached.