Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Choreography Development Guide

This guide covers how to build distributed protocols using Aura's choreographic programming system. Use it when you need to coordinate multiple parties with session types, CRDTs, and multi-phase workflows.

For theoretical foundations, see MPST and Choreography. For operation categorization, see Operation Categories.

1. When to Use Choreography

Use choreographic protocols when:

  • Multiple parties must coordinate (threshold signing, consensus, sync)
  • Session guarantees matter (no deadlock, no message mismatch)
  • You need formal verification of protocol correctness

Do not use choreography for:

  • Single-party operations (use effect handlers)
  • Simple request-response (use direct transport)

2. Protocol Development Pipeline

This pipeline applies to all Layer 4/5 choreographies and all Category C ceremonies.

Phase 1: Classification and Facts

Classify the operation using Operation Categories:

  • Category A: Local operations, no coordination
  • Category B: Optimistic CRDT operations, eventual consistency
  • Category C: Ceremonies requiring threshold agreement

Define fact types with schema versioning:

#![allow(unused)]
fn main() {
use aura_macros::ceremony_facts;

#[ceremony_facts]
pub enum InvitationFact {
    CeremonyInitiated {
        ceremony_id: CeremonyId,
        agreement_mode: Option<AgreementMode>,
        trace_id: Option<String>,
        timestamp_ms: u64,
    },
    CeremonyCommitted {
        ceremony_id: CeremonyId,
        relationship_id: String,
        agreement_mode: Option<AgreementMode>,
        trace_id: Option<String>,
        timestamp_ms: u64,
    },
    CeremonyAborted {
        ceremony_id: CeremonyId,
        reason: String,
        trace_id: Option<String>,
        timestamp_ms: u64,
    },
}
}

The macro provides canonical ceremony_id() and ceremony_timestamp_ms() accessors.

Phase 2: Choreography Specification

Write the choreography in a .choreo file:

#![allow(unused)]
fn main() {
use aura_macros::choreography;

choreography! {
    #[namespace = "secure_request"]
    protocol SecureRequest {
        roles: Client, Server;

        Client[guard_capability = "send_request", flow_cost = 50]
        -> Server: SendRequest(RequestData);

        Server[guard_capability = "send_response", flow_cost = 30, journal_facts = "response_sent"]
        -> Client: SendResponse(ResponseData);
    }
}
}

Annotation syntax: Role[guard_capability = "...", flow_cost = N, journal_facts = "..."] -> Target: Message

Select the narrowest TimeStamp domain for each time field. See Effect System for time domains.

Phase 3: Runtime Wiring

Create the protocol implementation:

#![allow(unused)]
fn main() {
use aura_agent::runtime::open_manifest_vm_session_admitted;

let (mut engine, handler, vm_sid) = open_manifest_vm_session_admitted(
    &my_protocol::COMPOSITION_MANIFEST,
    "Initiator",
    &my_protocol::global_type(),
    &my_protocol::local_types(),
    scheduler_signals,
).await?;

let status = engine.run_to_completion(vm_sid)?;
}

This wiring opens an admitted VM session from generated choreography metadata. The runtime source of truth is the composition manifest, not an ad hoc adapter. Register the service with the runtime and integrate it with the guard chain. Category C operations must follow the ceremony contract.

Production services should treat the admitted unit as a VM fragment. If the manifest declares link bundles, each linked bundle becomes its own ownership unit. Runtime transfer must use ReconfigurationManager. Do not bypass fragment ownership through service-local state.

The runtime also derives execution mode from admitted policy. Cooperative protocols stay on the canonical VM path. Replay-deterministic and envelope-bounded protocols select the threaded path only through the admission and hardening surface. Service code should not construct ad hoc threaded runtimes.

Phase 4: Status and Testing

Implement CeremonyStatus for Category C or protocol-specific status views:

#![allow(unused)]
fn main() {
pub fn ceremony_status(facts: &[InvitationFact]) -> CeremonyStatus {
    // Reduce facts to current status
}
}

Definition of Done:

  • Operation category declared (A/B/C)
  • Facts defined with reducer and schema version
  • Choreography specified with roles/messages documented
  • Runtime wiring added (role runners + registration)
  • Fragment ownership uses manifest admission and runtime ownership APIs
  • delegate and link flows use ReconfigurationManager
  • Threaded or envelope-bounded execution uses admitted policy only
  • Category C uses ceremony runner and emits standard facts
  • Status output implemented
  • Shared-bus integration test added
  • Simulation test added
  • Choreography parity/replay tests added (Category C)

See crates/aura-consensus/src/protocol/ for canonical examples.

3. CRDT Integration

CRDTs handle state consistency in choreographic protocols. See Journal for CRDT theory.

CRDT Coordinator

Use CrdtCoordinator to manage CRDT state in protocols:

#![allow(unused)]
fn main() {
use aura_protocol::effects::crdt::CrdtCoordinator;

// State-based CRDT
let coordinator = CrdtCoordinator::with_cv_state(authority_id, initial_journal);

// Delta CRDT with compaction threshold
let coordinator = CrdtCoordinator::with_delta_threshold(authority_id, 100);

// Meet-semilattice for constraints
let coordinator = CrdtCoordinator::with_mv_state(authority_id, capability_set);
}

Protocol Integration

Protocols consume and return coordinators with updated state:

#![allow(unused)]
fn main() {
use aura_sync::choreography::anti_entropy::execute_as_requester;

let (result, updated_coordinator) = execute_anti_entropy(
    authority_id,
    config,
    is_requester,
    &effect_system,
    coordinator,
).await?;

let synchronized_state = updated_coordinator.cv_handler().get_state();
}

4. Protocol Composition

Complex applications require composing multiple protocols.

Sequential Composition

Chain protocols for multi-phase workflows:

#![allow(unused)]
fn main() {
pub async fn execute_authentication_flow(
    &self,
    target_device: aura_core::DeviceId,
) -> Result<AuthenticationResult, ProtocolError> {
    // Phase 1: Identity exchange
    let identity_result = self.execute_identity_exchange(target_device).await?;

    // Phase 2: Capability negotiation
    let capability_result = self.execute_capability_negotiation(
        target_device,
        &identity_result
    ).await?;

    // Phase 3: Session establishment
    let session_result = self.execute_session_establishment(
        target_device,
        &capability_result
    ).await?;

    Ok(AuthenticationResult {
        identity: identity_result,
        capabilities: capability_result,
        session: session_result,
    })
}
}

Each phase uses results from previous phases. Failed phases abort the entire workflow.

Parallel Composition

Execute independent protocols concurrently:

#![allow(unused)]
fn main() {
pub async fn execute_distributed_computation(
    &self,
    worker_devices: Vec<aura_core::DeviceId>,
) -> Result<ComputationResult, ProtocolError> {
    // Launch parallel worker protocols
    let worker_futures = worker_devices.iter().map(|device| {
        self.execute_worker_protocol(*device)
    });

    // Wait for all workers with timeout
    let worker_results = tokio::time::timeout(
        self.config.worker_timeout,
        futures::future::try_join_all(worker_futures)
    ).await??;

    // Aggregate results
    self.aggregate_worker_results(worker_results).await
}
}

Effect Program Composition

Compose protocols through effect programs:

#![allow(unused)]
fn main() {
let composed_protocol = Program::new()
    .ext(ValidateCapability {
        capability: "coordinate".into(),
        role: Coordinator
    })
    .then(anti_entropy_program)
    .then(threshold_ceremony_program)
    .ext(LogEvent {
        event: "protocols_complete".into()
    })
    .end();
}

5. Error Handling and Resilience

Timeout and Retry

Implement robust timeout handling with exponential backoff:

#![allow(unused)]
fn main() {
pub async fn execute_with_resilience<T>(
    &self,
    protocol_fn: impl Fn() -> BoxFuture<'_, Result<T, ProtocolError>>,
    operation_name: &str,
) -> Result<T, ProtocolError> {
    let mut attempt = 0;

    while attempt < self.config.max_attempts {
        match tokio::time::timeout(
            self.config.operation_timeout,
            protocol_fn()
        ).await {
            Ok(Ok(result)) => return Ok(result),
            Ok(Err(e)) if !e.is_retryable() => return Err(e),
            _ => {
                // Exponential backoff with jitter
                let delay = self.config.base_delay * 2_u32.pow(attempt);
                tokio::time::sleep(self.add_jitter(delay)).await;
                attempt += 1;
            }
        }
    }

    Err(ProtocolError::MaxRetriesExceeded)
}
}

Compensation and Rollback

For multi-phase protocols, implement compensation for partial failures:

#![allow(unused)]
fn main() {
pub async fn execute_compensating_transaction(
    &self,
    operations: Vec<Operation>,
) -> Result<TransactionResult, TransactionError> {
    let mut completed = Vec::new();

    for operation in &operations {
        match self.execute_operation(operation).await {
            Ok(result) => {
                completed.push((operation.clone(), result));
            }
            Err(e) => {
                // Compensate in reverse order
                self.execute_compensation(&completed).await?;
                return Err(TransactionError::OperationFailed {
                    operation: operation.clone(),
                    cause: e,
                });
            }
        }
    }

    Ok(TransactionResult { completed })
}
}

Circuit Breakers

Prevent cascading failures with circuit breakers:

#![allow(unused)]
fn main() {
pub enum CircuitState {
    Closed { failure_count: usize },
    Open { opened_at: Instant },
    HalfOpen { test_requests: usize },
}

pub async fn execute_with_circuit_breaker<T>(
    &self,
    protocol_fn: impl Fn() -> BoxFuture<'_, Result<T, ProtocolError>>,
) -> Result<T, ProtocolError> {
    let should_execute = match &*self.circuit_state.lock() {
        CircuitState::Closed { failure_count } =>
            *failure_count < self.config.failure_threshold,
        CircuitState::Open { opened_at } =>
            opened_at.elapsed() >= self.config.recovery_timeout,
        CircuitState::HalfOpen { test_requests } =>
            *test_requests < self.config.test_threshold,
    };

    if !should_execute {
        return Err(ProtocolError::CircuitBreakerOpen);
    }

    match protocol_fn().await {
        Ok(result) => {
            self.record_success();
            Ok(result)
        }
        Err(e) => {
            self.record_failure();
            Err(e)
        }
    }
}
}

6. Guard Chain Integration

The guard chain enforces authorization, flow budgets, and journal commits. See Authorization for the full specification.

Guard Chain Pattern

Guards are pure: evaluation runs synchronously over a prepared GuardSnapshot:

#![allow(unused)]
fn main() {
// Phase 1: Authorization via Biscuit + policy (async, cached)
let token = effects.verify_biscuit(&request.token).await?;

// Phase 2: Prepare snapshot and evaluate guards (sync)
let snapshot = GuardSnapshot {
    capabilities: token.capabilities(),
    flow_budget: current_budget,
    ..Default::default()
};
let commands = guard_chain.evaluate(&snapshot, &request)?;

// Phase 3: Execute commands (async)
for command in commands {
    interpreter.execute(command).await?;
}
}

No transport observable occurs until the interpreter executes commands in order.

Security Annotations

Choreography annotations compile to guard chain commands:

  • guard_capability: Creates capability check before send
  • flow_cost: Charges flow budget
  • journal_facts: Records facts after successful send
  • leak: Records leakage budget charge

7. Domain Service Pattern

Domain crates define stateless handlers; the agent layer wraps them with services.

Domain Handler

#![allow(unused)]
fn main() {
// In domain crate (e.g., aura-chat/src/service.rs)
pub struct ChatHandler;

impl ChatHandler {
    pub async fn send_message<E>(
        &self,
        effects: &E,
        channel_id: ChannelId,
        content: String,
    ) -> Result<MessageId>
    where
        E: StorageEffects + RandomEffects + PhysicalTimeEffects
    {
        let message_id = effects.random_uuid().await;
        // ... domain logic
        Ok(message_id)
    }
}
}

Agent Service Wrapper

#![allow(unused)]
fn main() {
// In aura-agent/src/handlers/chat_service.rs
pub struct ChatService {
    handler: ChatHandler,
    effects: Arc<RwLock<AuraEffectSystem>>,
}

impl ChatService {
    pub async fn send_message(
        &self,
        channel_id: ChannelId,
        content: String,
    ) -> AgentResult<MessageId> {
        let effects = self.effects.read().await;
        self.handler.send_message(&*effects, channel_id, content)
            .await
            .map_err(Into::into)
    }
}
}

Benefits: Domain crate stays pure (no tokio/RwLock), testable with mock effects, consistent pattern across crates.

8. Testing Choreographies

Unit Testing Guard Logic

#![allow(unused)]
fn main() {
#[test]
fn test_cap_guard_denies_unauthorized() {
    let snapshot = GuardSnapshot {
        capabilities: vec![],
        flow_budget: FlowBudget { limit: 100, spent: 0, epoch: 0 },
        ..Default::default()
    };
    let result = CapGuard::evaluate(&snapshot, &SendRequest::default());
    assert!(result.is_err());
}
}

Integration Testing Protocols

#![allow(unused)]
fn main() {
#[aura_test]
async fn test_sync_protocol() -> aura_core::AuraResult<()> {
    let fixture = create_test_fixture().await?;

    let coordinator = CrdtCoordinator::with_cv_state(
        fixture.authority_id(),
        fixture.initial_journal(),
    );

    let (result, _) = execute_anti_entropy(
        fixture.authority_id(),
        SyncConfig::default(),
        true, // is_requester
        &fixture.effects(),
        coordinator,
    ).await?;

    assert!(result.is_success());
    Ok(())
}
}

Simulation Testing

See Simulation Guide for fault injection and adversarial testing.