Reference Implementation

BSFG NATS/JetStream Reference

How BSFG is implemented using NATS and JetStream as the substrate

Audience: Implementation engineers, operators

Use: Understand how BSFG's abstract architecture maps to NATS and JetStream. This describes one possible implementation; the BSFG architecture itself is substrate-neutral


Important Note

This document describes the NATS/JetStream reference implementation of BSFG. The architecture and guarantees defined in Architecture Specification are independent of this implementation. Alternative implementations using PostgreSQL, Kafka, S3, or other substrates can fully satisfy the BSFG contract.


Overview

The NATS/JetStream reference implementation uses:

  • JetStream — A streaming and persistence layer built on NATS, providing durable streams with at-least-once delivery
  • Object Store — JetStream's high-performance object storage for large binaries
  • Durable Consumers — Named consumers with explicit acknowledgment and cursor tracking
  • mTLS + NATS Accounts — For zone-level isolation and authorization
  • Connect RPC — For authenticated, encrypted cross-zone communication

How BSFG Roles Map to JetStream

Ingress Store Buffer (ISB) → JetStream Stream

The ISB is implemented as a JetStream stream in the external zone:

Subject: facts.<category>[.<prefix>]
Storage: File or Memory
Replicas: >= 1 (configurable)
MaxAge: 7 days (default; configurable per category)
Retention Policy: limits (size) or interest (consumer acknowledgment)

Producers call AppendFact, which appends the fact to the ISB stream via stream.publish(). JetStream confirms durability once the message is replicated according to the configured replication factor (default RF=1 for local persistence, RF≥2 for HA clusters).

Ingress Forward Buffer (IFB) → JetStream Durable Consumer

The IFB is implemented as a durable consumer in the internal zone:

Consumer Name: ingress_consumer
Stream: (points to internal ISB copy)
Mode: pull (explicit acknowledgment)
AckPolicy: explicit
FilterSubject: facts.>
MaxAckPending: 0 (enforce strict ordering)

The durable consumer provides idempotent semantics: each fact is indexed by message ID in a deduplication window. Redelivery of the same message ID is automatically deduplicated; the consumer returns the same sequence number.

Cursor Tracker → Durable Consumer Checkpoint

The cursor position (highest_contiguous_committed_offset) is the durable consumer's acknowledged offset:

last_ack_offset = consumer.getNextMsgInfo().offset

When a consumer calls ConfirmReceipt(consumer_name, offset), the BSFG node invokes:

consumer.ackSync(lastMsgMetadata)

This advances the consumer's cursor (also called delivered_consumer_seq) to the confirmed offset. The cursor is persisted and survives node restarts.

Egress Store Buffer (ESB) → JetStream Stream (Mirror)

Mirror of the ISB for outbound flow. Implemented identically in the internal zone.

Egress Forward Buffer (EFB) → JetStream Durable Consumer (Mirror)

Mirror of the IFB for external pickup. Implemented identically in the external zone.


JetStream Domain Setup

Each BSFG zone runs a zone-local JetStream domain (isolated cluster or single node):

# JetStream configuration per zone

# External Zone (enterprise-bsfg)
---
zone: external
nats_server:
  jetstream:
    store_dir: /var/lib/nats/js
    max_file_slice: 30GB

streams:
  - name: facts_operational
    subjects:
      - facts.operational.>
    max_age: 10y
    storage: file
    replicas: 1  # or 3 for HA

  - name: facts_audit
    subjects:
      - facts.audit.>
    max_age: 7y
    storage: file

  - name: facts_telemetry
    subjects:
      - facts.telemetry.>
    max_age: 3y
    storage: file

# Object Store (for artifacts)
object_store:
  name: bsfg_artifacts
  bucket: bsfg
  replicas: 1

Each stream is scoped to its zone. No stream mirroring across zone boundaries. Cross-zone facts are transferred via the four RPC operations (AppendFact, FetchFacts, etc.), not by subscribing to remote streams.


NATS Accounts and Authorization

Zone-level authorization is enforced via NATS accounts:

flowchart TB
  subgraph E["enterprise-bsfg"]
    EA["enterprise_account
(application access to facts.*)"] ES["system_account
(BSFG node system calls to $JS.>)"] end subgraph P["plant-a-bsfg"] PA["plant_a_account
(application access to facts.*)"] PS["system_account
(BSFG node system calls to $JS.>)"] end

Each zone's accounts are zone-scoped and do not share credentials with other zones.

Authorization Rules (Subject-Level ACLs)

# Enterprise NATS Account
accounts:
  enterprise_account:
    limits:
      max_connections: 100
    permissions:
      publish:
        allow: ['facts.operational.>', 'facts.audit.>', 'facts.documents.>']
      subscribe:
        allow: ['facts.operational.>', 'facts.audit.>', 'facts.documents.>']

  system_account:
    permissions:
      publish:
        allow: ['$JS.>']
      subscribe:
        allow: ['$JS.>']

Each zone allows:

  • Application accounts to publish/subscribe facts within their zone
  • System accounts to manage JetStream internals ($JS.API.>, $JS.EVENT.>)
  • No cross-account access (all facts transferred via RPC, not via NATS subscriptions)

Backend Adapters (Hexagonal Ports)

The StoreBuffer, ForwardBuffer, and CursorTracker ports are implemented by various backends. NATS/JetStream is one example:

ISB/ESB (Store Buffer) Adapters

  • Kafka: Append to topic partition; offset = Kafka offset
  • PostgreSQL: INSERT INTO store_buffer (payload, seq) VALUES (...) RETURNING seq
  • S3: List objects; new object = new fact; sequence = timestamp or index
  • Filesystem: Append to log file; offset = byte position or line number

IFB/EFB (Forward Buffer) Adapters

  • Redis: SET <key> <payload> NX (SET if Not eXists)
  • PostgreSQL: INSERT INTO forward_buffer (key, payload) VALUES (...) ON CONFLICT DO NOTHING
  • etcd: Conditional insert via If-Create-Revision proto
  • S3: Conditional PUT with If-None-Match header

Cursor Tracker Adapters

  • etcd: Key-value store at /bsfg/<zone>/cursor/<direction>
  • PostgreSQL: UPDATE cursor_checkpoint SET committed_offset = ? WHERE direction = ?
  • S3: Metadata object at s3://bucket/bsfg-cursors/<zone>/<direction>.json

The NATS/JetStream implementation uses JetStream consumer offsets as the cursor tracker; no separate cursor storage is required.


Transport: Connect RPC + mTLS

All cross-zone communication uses Connect RPC over HTTP/2 with TLS 1.3+:

sequenceDiagram
  participant Client as Client (plant-a-bsfg)
  participant Server as enterprise-bsfg

  Client->>Server: ClientHello + cert (CN=plant-a-bsfg)
  Note right of Server: verify CN = plant-a-bsfg
  Server-->>Client: ServerHello + cert (CN=enterprise-bsfg)
  Note left of Client: verify CN = enterprise-bsfg
  Client->>Server: RPC over HTTP/2 (e.g. AppendFact)

Each BSFG node has an mTLS certificate:

Subject: CN=<zone>-bsfg
Issuer: Enterprise PKI CA
Key Usage: TLS Server Auth, TLS Client Auth
SANs: [<node-hostname>, <node-ip>]
Validity: 1 year (recommend renewal 30 days before expiry)

JetStream Immutability and Atomicity

Append-Only Semantics

JetStream streams are append-only:

  • Once a message is published, it cannot be modified
  • Deletion via stream.purgeSubject() removes all messages for a subject (rarely used)
  • stream.deleteMessage(sequence) removes a single message but leaves a "delete marker" (audit-safe)

BSFG enforces immutability at the data model level: facts are never updated, only new facts are appended (e.g., corrections are new facts with a correction_predicate).

Atomic Idempotent Insert (Durable Consumer Deduplication)

JetStream durable consumers enforce idempotent append via message deduplication:

// Producer publishes with message ID header
await stream.publish(subject, payload, {
  msgID: stable_message_id,  // e.g., hash or UUID
  timeout: 10000
});

// If the same msgID arrives again, JetStream deduplicates within the deduplication window
// The producer receives the same publish response (same sequence number)

The deduplication window is configured per stream (default is the stream's MaxAge). Within the window:

  • Same message ID + same payload → return original sequence
  • Same message ID + different payload → reject (conflicting duplicate)

Data Format: Envelope and Fact

Envelope (Transport Metadata)

BSFG-level metadata required for routing and idempotency:

{
  "message_id": "msg_abc123",
  "from_zone": "enterprise-bsfg",
  "to_zone": "plant-a-bsfg",
  "object_schema": "order_created_v1",
  "timestamp": "2026-03-06T14:30:00Z"
}

Fact (Business Payload)

Domain-specific data:

{
  "subject": "order:ENT-12345",
  "predicate": "order_created",
  "object_json": {
    "order_id": "ENT-12345",
    "customer": "CUST-999",
    "ship_to": "PlantA",
    "total_amount": 15000.00,
    "created_at": "2026-03-06T14:30:00Z"
  }
}

Full Message Published to JetStream

{
  "envelope": { ... },
  "fact": { ... }
}

Canonical JSON encoding ensures deterministic hashing for deduplication.


Naming Conventions

For detailed NATS/JetStream naming rules, see NATS Naming Profile.

Quick summary:

  • Streams: facts.<category>[.<prefix>] (e.g., facts.operational.batch.*)
  • Accounts: <zone>_account (e.g., enterprise_account)
  • Zone identities: <zone>-bsfg (e.g., enterprise-bsfg)

Operational Notes

High Availability (HA)

For production:

  • Run JetStream in a 3-node cluster per zone (min for quorum)
  • Configure stream replication: replicas: 3
  • Use persistent file storage (not memory)
  • Enable JetStream's SuperCluster mode for multi-region deployments

Patching and Updates

  • NATS Server: Regular security patches; upgrades typically in-place without downtime if running HA cluster
  • Go Runtime: Regularly patch for security vulnerabilities
  • JetStream: Updated as part of NATS server version

Coordinate patches with the operational runbook: Operations Runbook.

Monitoring and Metrics

Key metrics to monitor:

  • js_num_consumers — Number of active durable consumers
  • js_consumer_ack_pending — Unacknowledged messages per consumer (growing = consumer lag)
  • js_num_streams — Total JetStream streams
  • js_store_bytes — Total storage used
  • js_replication_lag — Replication lag in HA cluster

See Observability & Operations for the full metrics guide.


Security Considerations

mTLS Certificate Management

  • Store private keys in restricted directories (/etc/bsfg/certs/ with mode 0600)
  • Rotate certificates before expiry (alert at 30 days)
  • Use a private CA to sign BSFG certificates; do not mix with external CAs
  • Implement automated certificate renewal (e.g., cert-manager in Kubernetes)

NATS Account Credentials

  • Store credentials in environment variables or secrets manager, not in git
  • Rotate account credentials annually
  • Implement per-service accounts (one for producer, one for consumer, one for system)

JetStream Data Encryption

  • Enable encryption at rest if JetStream runs on untrusted infrastructure
  • Use TLS in transit (required for all cross-zone communication)
  • Consider PKCS#8 encrypted private keys for node keys

See Security Model for the full security model.


Troubleshooting

Consumer Lag Growing

Check:

nats consumer info <stream> <consumer>

Look for Ack Floor vs. Last Sequence — if gap is growing, consumer is not keeping up.

Fix:

  • Increase consumer throughput or scale out consumers
  • Check if remote zone is reachable (network partition)
  • Verify mTLS certificates are valid (check nats server check)

Stream Retention Exceeded

Check:

nats stream info <stream>

Look at Bytes and MaxBytes — if Bytes > MaxBytes, eviction is happening.

Fix:

  • Increase MaxBytes for the stream
  • Reduce TTL if old data is no longer needed
  • For safety-critical streams, reject new writes instead of evicting (see Operations Runbook)

mTLS Handshake Failure

Check:

nats server check

Look for certificate expiry warnings or CN mismatches.

Fix:

  • Verify certificate CN matches zone identity (openssl x509 -in cert.pem -text -noout | grep CN)
  • Verify CA certificate is trusted by both peers
  • Regenerate certificate if CN or SAN is wrong