Skip to main content
Engineering

Data Contracts: The Missing Layer Between Teams

Ravinder··10 min read
EngineeringData ContractsSchemaData Engineering
Share:
Data Contracts: The Missing Layer Between Teams

The analytics team ran their Monday pipeline. It failed. They traced it to a column rename in the orders table — customer_id had become buyer_id — that the backend team had shipped on Friday. The backend team had, in their view, done everything correctly: they had updated the ORM, migrated the column, updated the API documentation, and sent a Slack message to the general engineering channel. The analytics team had, in their view, done everything correctly: they had built their pipeline against the documented schema. Nobody had lied. There was simply no layer between them that would have made the conflict visible before it broke.

This is the canonical data contract failure. Not malice. Not negligence. A missing agreement.

Data contracts are that missing layer. They are not a schema registry, though a schema registry is part of the implementation. They are not documentation, though they read like documentation. They are the enforceable, versioned, owned agreement between a data producer and its consumers — covering schema, semantics, freshness, and quality expectations.

The Anatomy of a Data Contract

A data contract has five components. Teams that implement only the schema component wonder why contracts don't solve their coordination problems.

graph TD DC[Data Contract] --> O[Ownership] DC --> S[Schema] DC --> SLA[SLAs] DC --> Q[Quality Rules] DC --> MC[Migration Conventions] O --> O1["Named producer team\nNamed consumer teams\nEscalation path"] S --> S1["Field names and types\nRequired vs optional\nSemantics and domain meaning"] SLA --> SLA1["Freshness SLA\nAvailability SLA\nBreaking change notice period"] Q --> Q1["Null rate bounds\nValue distribution invariants\nReferential integrity checks"] MC --> MC1["Deprecation timeline\nBackward compatibility rules\nConsumer migration support"] style DC fill:#1e3a5f,color:#fff style O fill:#2c5282,color:#fff style S fill:#2c5282,color:#fff style SLA fill:#2c5282,color:#fff style Q fill:#2c5282,color:#fff style MC fill:#2c5282,color:#fff

A concrete contract in YAML:

# contracts/orders/order-created-event.yaml
apiVersion: data-contracts/v1
kind: DataContract
metadata:
  name: order-created-event
  version: "2.1.0"
  owner: payments-team
  contact: payments-oncall@company.com
  consumers:
    - team: analytics
      use: revenue-reporting pipeline
    - team: fulfillment
      use: order fulfillment trigger
    - team: notifications
      use: order confirmation email
  created: "2024-03-15"
  last_modified: "2025-10-02"
 
sla:
  freshness_sla_minutes: 5
  availability_target_percent: 99.9
  breaking_change_notice_days: 14
  deprecation_notice_days: 30
 
schema:
  format: json-schema
  path: schemas/order-created-event-v2.1.json
 
quality_rules:
  - field: order_id
    rule: not_null
    severity: critical
  - field: buyer_id
    rule: not_null
    severity: critical
  - field: total_cents
    rule: "value > 0"
    severity: critical
  - field: currency
    rule: "value in ['USD', 'EUR', 'GBP', 'JPY', 'CAD', 'AUD']"
    severity: error
  - field: items
    rule: "array_length >= 1"
    severity: critical
  - field: created_at
    rule: "value <= now() + interval '1 minute'"
    severity: warning
    description: "Timestamps should not be in the future"

And the associated JSON Schema:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "$id": "order-created-event-v2.1.json",
  "title": "OrderCreatedEvent",
  "description": "Emitted when an order is successfully created and payment confirmed",
  "type": "object",
  "required": ["order_id", "buyer_id", "total_cents", "currency", "items", "created_at"],
  "properties": {
    "order_id": {
      "type": "string",
      "description": "Stable, globally unique order identifier. Format: ord_{ulid}",
      "pattern": "^ord_[0-9A-Z]{26}$"
    },
    "buyer_id": {
      "type": "string",
      "description": "Identifier of the purchasing user. Renamed from customer_id in v2.0.0",
      "pattern": "^usr_[0-9A-Z]{26}$"
    },
    "total_cents": {
      "type": "integer",
      "minimum": 1,
      "description": "Total order value in the smallest currency unit (cents for USD)"
    },
    "currency": {
      "type": "string",
      "enum": ["USD", "EUR", "GBP", "JPY", "CAD", "AUD"]
    },
    "items": {
      "type": "array",
      "minItems": 1,
      "items": {
        "$ref": "#/definitions/OrderItem"
      }
    },
    "created_at": {
      "type": "string",
      "format": "date-time"
    }
  }
}

Schema Registry: Enforcement at Publish Time

A contract written in a YAML file is documentation. A contract enforced at publish time is infrastructure.

The schema registry — Confluent Schema Registry is the most common for Kafka, but AWS Glue Schema Registry and Apicurio are also widely used — stores canonical schemas and enforces compatibility rules when a producer attempts to register a new version.

from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer
 
schema_registry_conf = {'url': 'https://schema-registry.internal'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
 
# Schema evolution with backward compatibility
# Consumers on v1 can read v2 events — new optional fields only
order_event_schema_v2 = """
{
  "type": "record",
  "name": "OrderCreatedEvent",
  "namespace": "com.company.orders",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "buyer_id", "type": "string"},
    {"name": "total_cents", "type": "long"},
    {"name": "currency", "type": "string"},
    {"name": "created_at", "type": "string"},
    {
      "name": "fulfillment_type",
      "type": ["null", "string"],
      "default": null,
      "doc": "Added in v2: null for standard fulfillment"
    }
  ]
}
"""
 
# This call will FAIL if the new schema breaks backward compatibility
# e.g., if we renamed buyer_id back to customer_id
schema = Schema(order_event_schema_v2, schema_type="AVRO")
schema_registry_client.register_schema(
    "order-created-event-value",
    schema
)

Configure your subject-level compatibility mode deliberately:

  • BACKWARD: New schema can read data written by old schema. Consumers can be upgraded before producers. This is the default and usually the right choice.
  • FORWARD: Old schema can read data written by new schema. Producers can be upgraded before consumers.
  • FULL: Both backward and forward. Required when you have long-lived consumers you can't coordinate upgrades with.
  • NONE: No enforcement. Only acceptable for development environments.
# Set compatibility mode for a specific subject
curl -X PUT \
  https://schema-registry.internal/config/order-created-event-value \
  -H 'Content-Type: application/json' \
  -d '{"compatibility": "BACKWARD"}'

Ownership: The Part Nobody Builds First

Schema enforcement without ownership is incomplete. When a contract is violated — a freshness SLA breach, a schema incompatibility, a quality rule failure — who is responsible and who needs to be notified?

We maintain a contract registry: a Git repository of contract YAML files, where the owner field is a team name resolvable to a PagerDuty schedule and a Slack channel. CI validates that every contract has a non-empty, valid owner before merging.

# scripts/validate_contracts.py
import yaml
from pathlib import Path
 
VALID_TEAMS = load_team_registry()  # From HR system or Backstage catalog
 
errors = []
for contract_file in Path("contracts/").rglob("*.yaml"):
    with open(contract_file) as f:
        contract = yaml.safe_load(f)
 
    owner = contract.get("metadata", {}).get("owner")
    if not owner:
        errors.append(f"{contract_file}: missing owner")
    elif owner not in VALID_TEAMS:
        errors.append(f"{contract_file}: owner '{owner}' is not a registered team")
 
    consumers = contract.get("metadata", {}).get("consumers", [])
    for consumer in consumers:
        if consumer.get("team") not in VALID_TEAMS:
            errors.append(f"{contract_file}: consumer team '{consumer['team']}' is not registered")
 
if errors:
    print("Contract validation failed:")
    for e in errors:
        print(f"  {e}")
    raise SystemExit(1)

SLAs: Making the Implicit Explicit

The orders column rename broke the analytics pipeline because nobody had agreed on a notice period for breaking changes. The backend team's Friday Slack message was not the problem — the missing SLA was.

A data SLA has three components:

Freshness SLA: The maximum acceptable lag between a domain event occurring and the data being available in the agreed delivery mechanism. For a Kafka topic, this is the end-to-end latency from write to consumer offset. For a data warehouse table, this is the pipeline lag.

Availability SLA: The percentage of time the data delivery mechanism is available and producing non-stale data. This is different from the producer service's availability — a healthy service can still produce stale data if its pipeline is stuck.

Breaking change notice period: The minimum number of calendar days between the producer announcing a breaking schema change and the change taking effect in production. We standardize on 14 days for internal consumers and 30 days for external consumers.

Monitor SLA compliance with pipeline-level freshness checks:

-- Detect freshness SLA breaches in a data warehouse
-- Runs every 5 minutes; fires alert if any contract is breached
 
SELECT
    contract_name,
    max(event_created_at) AS last_event_time,
    current_timestamp - max(event_created_at) AS lag,
    freshness_sla_minutes,
    CASE
        WHEN extract(epoch from (current_timestamp - max(event_created_at))) / 60
             > freshness_sla_minutes THEN 'BREACHED'
        ELSE 'OK'
    END AS sla_status
FROM contract_registry cr
JOIN data_events de ON cr.event_type = de.event_type
WHERE cr.monitoring_enabled = true
GROUP BY contract_name, freshness_sla_minutes
HAVING sla_status = 'BREACHED';

Migrations Through Contracts

The customer_idbuyer_id rename that broke the analytics team was a migration. Done through a data contract, it looks like this:

sequenceDiagram participant Prod as Producer (Payments) participant Reg as Schema Registry participant Cons as Consumer (Analytics) Prod->>Cons: Breaking change notice: buyer_id replaces customer_id in 14 days Prod->>Reg: Register v2 schema with both customer_id (deprecated) and buyer_id Note over Reg: Compatibility: BACKWARD — both fields present Prod->>Prod: Deploy: emit both fields for 14-day transition period Cons->>Cons: Migrate pipeline to use buyer_id Cons->>Prod: Migration complete, confirmed Prod->>Reg: Register v2.1 schema — customer_id removed Prod->>Prod: Deploy: emit buyer_id only

The dual-emit period is the contract's migration path. During those 14 days, the producer emits both customer_id (deprecated, same value as buyer_id) and buyer_id. Consumers can migrate at their own pace within the window. After the window closes, the deprecated field is removed.

class OrderEventProducer:
    def __init__(self, transition_period_active: bool):
        self.transition_period_active = transition_period_active
 
    def build_event(self, order: Order) -> dict:
        event = {
            "order_id": order.id,
            "buyer_id": order.buyer_id,  # New canonical field
            "total_cents": order.total_cents,
            "currency": order.currency,
            "created_at": order.created_at.isoformat(),
        }
        # Emit deprecated field during transition window only
        if self.transition_period_active:
            event["customer_id"] = order.buyer_id  # Deprecated alias
        return event

The transition_period_active flag is controlled by a feature flag with a hard expiry set to the end of the 14-day window. When the flag expires, the deprecated field disappears automatically — no manual cleanup required.

Quality Rules: The Contract's Truth Guarantee

A contract that guarantees schema compatibility but not data quality is half a contract. Quality rules encode the invariants that make the data useful.

Run quality checks in your pipeline and route failures by severity:

class ContractQualityValidator:
    def validate(self, event: dict, contract: DataContract) -> ValidationResult:
        violations = []
        for rule in contract.quality_rules:
            field_value = event.get(rule.field)
            if not self._evaluate(field_value, rule.rule):
                violations.append(QualityViolation(
                    field=rule.field,
                    rule=rule.rule,
                    value=field_value,
                    severity=rule.severity,
                ))
 
        critical = [v for v in violations if v.severity == "critical"]
        errors = [v for v in violations if v.severity == "error"]
        warnings = [v for v in violations if v.severity == "warning"]
 
        return ValidationResult(
            passed=len(critical) == 0,  # Critical violations fail the event
            violations=violations,
            should_dead_letter=len(critical) > 0,
            should_alert=len(critical) > 0 or len(errors) > 5,
        )

Critical violations go to a dead-letter queue and page the producer team. Error-level violations are collected and reported in a daily digest. Warnings are tracked in dashboards but don't trigger alerts. This tiered severity model keeps the producer team's alert fatigue low while ensuring critical data integrity issues get immediate attention.

Key Takeaways

  • A data contract has five components: ownership, schema, SLAs, quality rules, and migration conventions. Teams that implement only the schema component find that contracts don't solve their coordination problems.
  • Enforce contracts at publish time through a schema registry, not just at review time through documentation. A BACKWARD-compatible schema registry blocks breaking changes before they reach consumers.
  • The notice period SLA — minimum days between announcing a breaking change and shipping it — is the component most teams forget to define. Fourteen days for internal consumers is a reasonable starting point.
  • Dual-emit during migration windows allows producers and consumers to decouple their migration timelines. Emit both the old and new field for the duration of the notice period; remove the old field when the window closes.
  • Quality rules encode data invariants that schema alone cannot express: null rate bounds, value set membership, referential integrity. Run them in the pipeline and route violations by severity — critical to dead-letter and page, warnings to dashboards.
  • Ownership is the load-bearing pillar. Without a named team and escalation path on every contract, enforcement has no one to escalate to when SLAs are breached.