Integration

BSFG Consumer Guide

Implementing the receiver role

Audience: Integrators, application engineers

Use: Implement receiver-side fetching, confirmation, and idempotent processing correctly

Consumer Role

A consumer is any system that retrieves and processes facts from BSFG by calling FetchFacts and ConfirmReceipt.

Consumers are responsible for:

  • Pull-driven fact retrieval using durable named consumers
  • Idempotent processing (facts may arrive more than once)
  • Persisting business outcomes before confirmation
  • Confirming receipt only after durable handling

Required Call Sequence

The canonical consumer sequence is:

1. FetchFacts(consumer_name, limit)
   ↓
   [Receive batch of facts with offsets]
   ↓
2. FOR each fact in batch:
     a) apply_business_logic(fact)  [idempotent]
     b) persist_outcome_to_storage()
     c) if error: DO NOT CONFIRM, wait for next fetch
   ↓
3. ConfirmReceipt(consumer_name, highest_offset)
   ↓
   [Cursor advances; next fetch starts from confirmed offset + 1]

Step 1: Fetch Facts

Pull the next batch of facts from the boundary:

FetchFacts(
  consumer_name: "plant-a-receiver",
  limit: 100
) → batch of facts with offsets

The consumer name is durable. The boundary maintains a cursor for each named consumer, tracking the last confirmed offset. Repeated calls to FetchFacts with the same consumer name will resume from the last confirmed position.

Step 2: Process Facts Idempotently

For each fact in the batch:

for fact in facts:
  try:
    // Apply business logic idempotently
    result = apply_business_logic(fact)

    // Persist the outcome durably
    // BEFORE confirming
    persist_outcome(result)

  except Error as e:
    // Do NOT confirm on error
    // Log the error
    log_error(e, fact)
    // Wait for next fetch (batch is incomplete, will retry)
    break  // Exit the loop early

// If we get here, all facts in batch were processed

Step 3: Confirm Receipt

Only after all facts in the batch are processed and outcomes are persisted:

ConfirmReceipt(
  consumer_name: "plant-a-receiver",
  offset: 42  // highest offset from the batch
) → Confirmation: {cursor_advanced_to: 42}

The confirmation advances the boundary's cursor. On the next FetchFacts call, the batch will start from offset 43.

Idempotent Processing

Idempotency is required. Facts may arrive more than once due to:

  • Producer retry with the same message_id
  • Boundary replay after reconnection
  • Consumer restart and recovery from last confirmed offset

Idempotency Strategy 1: Unique Message ID

Store a record of processed message_id values. Before processing, check if already processed:

if (already_processed(fact.envelope.message_id)) {
  // Idempotent retry — skip processing
  return SUCCESS
}

// Process fact
result = apply_business_logic(fact)
persist_outcome(result)

// Record that this message_id was processed
mark_as_processed(fact.envelope.message_id)

The deduplication table can use a TTL equal to the boundary's fact retention (default 7 days).

Idempotency Strategy 2: Natural Key

If the business logic has a natural key (entity ID + operation), upsert instead of insert:

// Idempotent upsert using the fact's business key
entity_key = fact.subject + "_" + fact.predicate
business_key = extract_key_from(fact.object_json)

result = upsert(entity_key, business_key, fact.object_json)
// First call creates; subsequent calls update-or-skip

persist_outcome(result)

Idempotency Strategy 3: Outcome Idempotency

If the processing outcome is idempotent by nature (e.g., setting a field to a specific value), re-processing the same fact produces the same result:

// Processing is idempotent by design
outcome = execute_idempotent_operation(fact)

persist_outcome(outcome)
// If called again with the same fact, outcome is identical

Cursor Persistence and Recovery

The boundary maintains a durable named consumer. The consumer cursor (confirmed offset) is persisted and survives restarts.

On Consumer Restart

If the consumer crashes mid-batch:

  1. On restart, call FetchFacts with the same consumer name
  2. The boundary returns facts from last_confirmed_offset + 1
  3. Some facts in the new batch may have been partially processed before the crash
  4. Re-process them (idempotency handles this)

Confirming Only After Durability

Danger: If you confirm receipt before persisting the outcome, a crash between confirmation and persistence loses the outcome.

// ❌ WRONG ORDER:
ConfirmReceipt(...)  // cursor advances
persist_outcome(...)  // crashes before this — outcome lost!

// ✅ CORRECT ORDER:
persist_outcome(...)  // durable
ConfirmReceipt(...)   // cursor advances only after durable persist

Pull-Driven Fetch Model

Consumers pull facts from the boundary, not the other way around. This means:

  • Consumers control fetch pace. They can batch, throttle, or pause as needed without affecting the boundary.
  • Consumers are responsible for maintaining cursor state. The boundary does not push facts to known consumers.
  • Fetch latency depends on consumer health and processing speed, not on the boundary.

Polling pattern:

while (true) {
  facts = FetchFacts(consumer_name, limit)

  if (facts.length == 0) {
    // No new facts — wait before retrying
    sleep(100ms)
    continue
  }

  // Process batch
  for fact in facts:
    // ... process idempotently ...

  // Confirm
  ConfirmReceipt(consumer_name, facts.last.offset)
}

Handling Artifacts

If a fact references an artifact:

Extract Reference Metadata

artifact_ref = fact.object_json  // or extract specific fields

bucket = artifact_ref["bucket"]
key = artifact_ref["key"]
digest = artifact_ref["digest"]
media_type = artifact_ref["media_type"]

Retrieve Artifact

artifact_blob = GetObject(bucket, key)

Verify Integrity (Optional)

computed_digest = SHA256(artifact_blob)
if (computed_digest != digest) {
  throw IntegrityError("Artifact digest mismatch")
}

Retry on Unavailability

If GetObject fails with "not found," retry before treating it as permanent:

max_retries = 10
backoff = exponential(base=100ms)
retry_count = 0

while (retry_count < max_retries) {
  try:
    artifact_blob = GetObject(bucket, key)
    // Success — process artifact
    break
  except NotFound:
    retry_count++
    if (retry_count == max_retries) {
      throw ArtifactMissingError("Artifact not found after retries")
    }
    wait(backoff)
    backoff *= 2
}

Handling Delayed or Replayed Facts

Facts may arrive after a long delay (hours or days) due to:

  • Network partition and boundary replay
  • Producer batching or delayed emission
  • Consumer restart and recovery

Treat delayed facts the same as fresh facts. Process them idempotently. Do not reject them based on arrival time.

Performance Characteristics

Under normal operation:

  • FetchFacts latency: 1–50ms
  • Processing latency per fact: application-dependent (100ms–seconds)
  • ConfirmReceipt latency: 1–10ms

Batch size tuning:

  • Large batches (1000+): better throughput, higher memory, longer processing time per batch
  • Small batches (1–10): lower latency, more RPC calls, higher overhead
  • Typical: 100–500 facts per batch

Error Handling

Processing Error (Business Logic Fails)

If apply_business_logic throws an error:

  • Log the error with the fact ID and offset
  • Do NOT confirm receipt
  • Wait for the next fetch (batch is incomplete)
  • On next fetch, the boundary returns from the last confirmed offset — retry the failing fact
  • Consider a dead-letter queue for facts that fail repeatedly

Persistence Error (Database Fails)

If persist_outcome throws an error:

  • Do NOT confirm receipt
  • Wait for recovery or manual intervention
  • Once the database is healthy, call FetchFacts again — retry the batch

Artifact Unavailable

If GetObject fails with a transient error (timeout, temporary unavailability):

  • Retry with exponential backoff (up to 10 retries)
  • If it remains unavailable after retries, treat it as a processing error and wait for recovery