Choreography Development Guide
This guide covers how to build distributed protocols using Aura's choreographic programming system. Use it when you need to coordinate multiple parties with session types, CRDTs, and multi-phase workflows.
For theoretical foundations, see MPST and Choreography. For operation categorization, see Operation Categories.
1. When to Use Choreography
Use choreographic protocols when:
- Multiple parties must coordinate (threshold signing, consensus, sync)
- Session guarantees matter (no deadlock, no message mismatch)
- You need formal verification of protocol correctness
Do not use choreography for:
- Single-party operations (use effect handlers)
- Simple request-response (use direct transport)
2. Protocol Development Pipeline
This pipeline applies to all Layer 4/5 choreographies and all Category C ceremonies.
Phase 1: Classification and Facts
Classify the operation using Operation Categories:
- Category A: Local operations, no coordination
- Category B: Optimistic CRDT operations, eventual consistency
- Category C: Ceremonies requiring threshold agreement
Define fact types with schema versioning:
#![allow(unused)] fn main() { use aura_macros::ceremony_facts; #[ceremony_facts] pub enum InvitationFact { CeremonyInitiated { ceremony_id: CeremonyId, agreement_mode: Option<AgreementMode>, trace_id: Option<String>, timestamp_ms: u64, }, CeremonyCommitted { ceremony_id: CeremonyId, relationship_id: String, agreement_mode: Option<AgreementMode>, trace_id: Option<String>, timestamp_ms: u64, }, CeremonyAborted { ceremony_id: CeremonyId, reason: String, trace_id: Option<String>, timestamp_ms: u64, }, } }
The macro provides canonical ceremony_id() and ceremony_timestamp_ms() accessors.
Phase 2: Choreography Specification
Write the choreography in a .choreo file:
#![allow(unused)] fn main() { use aura_macros::choreography; choreography! { #[namespace = "secure_request"] protocol SecureRequest { roles: Client, Server; Client[guard_capability = "send_request", flow_cost = 50] -> Server: SendRequest(RequestData); Server[guard_capability = "send_response", flow_cost = 30, journal_facts = "response_sent"] -> Client: SendResponse(ResponseData); } } }
Annotation syntax: Role[guard_capability = "...", flow_cost = N, journal_facts = "..."] -> Target: Message
Select the narrowest TimeStamp domain for each time field. See Effect System for time domains.
Phase 3: Runtime Wiring
Create the protocol implementation:
#![allow(unused)] fn main() { use aura_agent::runtime::open_manifest_vm_session_admitted; let (mut engine, handler, vm_sid) = open_manifest_vm_session_admitted( &my_protocol::COMPOSITION_MANIFEST, "Initiator", &my_protocol::global_type(), &my_protocol::local_types(), scheduler_signals, ).await?; let status = engine.run_to_completion(vm_sid)?; }
This wiring opens an admitted VM session from generated choreography metadata. The runtime source of truth is the composition manifest, not an ad hoc adapter. Register the service with the runtime and integrate it with the guard chain. Category C operations must follow the ceremony contract.
Production services should treat the admitted unit as a VM fragment. If the manifest declares link bundles, each linked bundle becomes its own ownership unit. Runtime transfer must use ReconfigurationManager. Do not bypass fragment ownership through service-local state.
The runtime also derives execution mode from admitted policy. Cooperative protocols stay on the canonical VM path. Replay-deterministic and envelope-bounded protocols select the threaded path only through the admission and hardening surface. Service code should not construct ad hoc threaded runtimes.
Phase 4: Status and Testing
Implement CeremonyStatus for Category C or protocol-specific status views:
#![allow(unused)] fn main() { pub fn ceremony_status(facts: &[InvitationFact]) -> CeremonyStatus { // Reduce facts to current status } }
Definition of Done:
- Operation category declared (A/B/C)
- Facts defined with reducer and schema version
- Choreography specified with roles/messages documented
- Runtime wiring added (role runners + registration)
- Fragment ownership uses manifest admission and runtime ownership APIs
-
delegateandlinkflows useReconfigurationManager - Threaded or envelope-bounded execution uses admitted policy only
- Category C uses ceremony runner and emits standard facts
- Status output implemented
- Shared-bus integration test added
- Simulation test added
- Choreography parity/replay tests added (Category C)
See crates/aura-consensus/src/protocol/ for canonical examples.
3. CRDT Integration
CRDTs handle state consistency in choreographic protocols. See Journal for CRDT theory.
CRDT Coordinator
Use CrdtCoordinator to manage CRDT state in protocols:
#![allow(unused)] fn main() { use aura_protocol::effects::crdt::CrdtCoordinator; // State-based CRDT let coordinator = CrdtCoordinator::with_cv_state(authority_id, initial_journal); // Delta CRDT with compaction threshold let coordinator = CrdtCoordinator::with_delta_threshold(authority_id, 100); // Meet-semilattice for constraints let coordinator = CrdtCoordinator::with_mv_state(authority_id, capability_set); }
Protocol Integration
Protocols consume and return coordinators with updated state:
#![allow(unused)] fn main() { use aura_sync::choreography::anti_entropy::execute_as_requester; let (result, updated_coordinator) = execute_anti_entropy( authority_id, config, is_requester, &effect_system, coordinator, ).await?; let synchronized_state = updated_coordinator.cv_handler().get_state(); }
4. Protocol Composition
Complex applications require composing multiple protocols.
Sequential Composition
Chain protocols for multi-phase workflows:
#![allow(unused)] fn main() { pub async fn execute_authentication_flow( &self, target_device: aura_core::DeviceId, ) -> Result<AuthenticationResult, ProtocolError> { // Phase 1: Identity exchange let identity_result = self.execute_identity_exchange(target_device).await?; // Phase 2: Capability negotiation let capability_result = self.execute_capability_negotiation( target_device, &identity_result ).await?; // Phase 3: Session establishment let session_result = self.execute_session_establishment( target_device, &capability_result ).await?; Ok(AuthenticationResult { identity: identity_result, capabilities: capability_result, session: session_result, }) } }
Each phase uses results from previous phases. Failed phases abort the entire workflow.
Parallel Composition
Execute independent protocols concurrently:
#![allow(unused)] fn main() { pub async fn execute_distributed_computation( &self, worker_devices: Vec<aura_core::DeviceId>, ) -> Result<ComputationResult, ProtocolError> { // Launch parallel worker protocols let worker_futures = worker_devices.iter().map(|device| { self.execute_worker_protocol(*device) }); // Wait for all workers with timeout let worker_results = tokio::time::timeout( self.config.worker_timeout, futures::future::try_join_all(worker_futures) ).await??; // Aggregate results self.aggregate_worker_results(worker_results).await } }
Effect Program Composition
Compose protocols through effect programs:
#![allow(unused)] fn main() { let composed_protocol = Program::new() .ext(ValidateCapability { capability: "coordinate".into(), role: Coordinator }) .then(anti_entropy_program) .then(threshold_ceremony_program) .ext(LogEvent { event: "protocols_complete".into() }) .end(); }
5. Error Handling and Resilience
Timeout and Retry
Implement robust timeout handling with exponential backoff:
#![allow(unused)] fn main() { pub async fn execute_with_resilience<T>( &self, protocol_fn: impl Fn() -> BoxFuture<'_, Result<T, ProtocolError>>, operation_name: &str, ) -> Result<T, ProtocolError> { let mut attempt = 0; while attempt < self.config.max_attempts { match tokio::time::timeout( self.config.operation_timeout, protocol_fn() ).await { Ok(Ok(result)) => return Ok(result), Ok(Err(e)) if !e.is_retryable() => return Err(e), _ => { // Exponential backoff with jitter let delay = self.config.base_delay * 2_u32.pow(attempt); tokio::time::sleep(self.add_jitter(delay)).await; attempt += 1; } } } Err(ProtocolError::MaxRetriesExceeded) } }
Compensation and Rollback
For multi-phase protocols, implement compensation for partial failures:
#![allow(unused)] fn main() { pub async fn execute_compensating_transaction( &self, operations: Vec<Operation>, ) -> Result<TransactionResult, TransactionError> { let mut completed = Vec::new(); for operation in &operations { match self.execute_operation(operation).await { Ok(result) => { completed.push((operation.clone(), result)); } Err(e) => { // Compensate in reverse order self.execute_compensation(&completed).await?; return Err(TransactionError::OperationFailed { operation: operation.clone(), cause: e, }); } } } Ok(TransactionResult { completed }) } }
Circuit Breakers
Prevent cascading failures with circuit breakers:
#![allow(unused)] fn main() { pub enum CircuitState { Closed { failure_count: usize }, Open { opened_at: Instant }, HalfOpen { test_requests: usize }, } pub async fn execute_with_circuit_breaker<T>( &self, protocol_fn: impl Fn() -> BoxFuture<'_, Result<T, ProtocolError>>, ) -> Result<T, ProtocolError> { let should_execute = match &*self.circuit_state.lock() { CircuitState::Closed { failure_count } => *failure_count < self.config.failure_threshold, CircuitState::Open { opened_at } => opened_at.elapsed() >= self.config.recovery_timeout, CircuitState::HalfOpen { test_requests } => *test_requests < self.config.test_threshold, }; if !should_execute { return Err(ProtocolError::CircuitBreakerOpen); } match protocol_fn().await { Ok(result) => { self.record_success(); Ok(result) } Err(e) => { self.record_failure(); Err(e) } } } }
6. Guard Chain Integration
The guard chain enforces authorization, flow budgets, and journal commits. See Authorization for the full specification.
Guard Chain Pattern
Guards are pure: evaluation runs synchronously over a prepared GuardSnapshot:
#![allow(unused)] fn main() { // Phase 1: Authorization via Biscuit + policy (async, cached) let token = effects.verify_biscuit(&request.token).await?; // Phase 2: Prepare snapshot and evaluate guards (sync) let snapshot = GuardSnapshot { capabilities: token.capabilities(), flow_budget: current_budget, ..Default::default() }; let commands = guard_chain.evaluate(&snapshot, &request)?; // Phase 3: Execute commands (async) for command in commands { interpreter.execute(command).await?; } }
No transport observable occurs until the interpreter executes commands in order.
Security Annotations
Choreography annotations compile to guard chain commands:
guard_capability: Creates capability check before sendflow_cost: Charges flow budgetjournal_facts: Records facts after successful sendleak: Records leakage budget charge
7. Domain Service Pattern
Domain crates define stateless handlers; the agent layer wraps them with services.
Domain Handler
#![allow(unused)] fn main() { // In domain crate (e.g., aura-chat/src/service.rs) pub struct ChatHandler; impl ChatHandler { pub async fn send_message<E>( &self, effects: &E, channel_id: ChannelId, content: String, ) -> Result<MessageId> where E: StorageEffects + RandomEffects + PhysicalTimeEffects { let message_id = effects.random_uuid().await; // ... domain logic Ok(message_id) } } }
Agent Service Wrapper
#![allow(unused)] fn main() { // In aura-agent/src/handlers/chat_service.rs pub struct ChatService { handler: ChatHandler, effects: Arc<RwLock<AuraEffectSystem>>, } impl ChatService { pub async fn send_message( &self, channel_id: ChannelId, content: String, ) -> AgentResult<MessageId> { let effects = self.effects.read().await; self.handler.send_message(&*effects, channel_id, content) .await .map_err(Into::into) } } }
Benefits: Domain crate stays pure (no tokio/RwLock), testable with mock effects, consistent pattern across crates.
8. Testing Choreographies
Unit Testing Guard Logic
#![allow(unused)] fn main() { #[test] fn test_cap_guard_denies_unauthorized() { let snapshot = GuardSnapshot { capabilities: vec![], flow_budget: FlowBudget { limit: 100, spent: 0, epoch: 0 }, ..Default::default() }; let result = CapGuard::evaluate(&snapshot, &SendRequest::default()); assert!(result.is_err()); } }
Integration Testing Protocols
#![allow(unused)] fn main() { #[aura_test] async fn test_sync_protocol() -> aura_core::AuraResult<()> { let fixture = create_test_fixture().await?; let coordinator = CrdtCoordinator::with_cv_state( fixture.authority_id(), fixture.initial_journal(), ); let (result, _) = execute_anti_entropy( fixture.authority_id(), SyncConfig::default(), true, // is_requester &fixture.effects(), coordinator, ).await?; assert!(result.is_success()); Ok(()) } }
Simulation Testing
See Simulation Guide for fault injection and adversarial testing.
Related Documentation
- MPST and Choreography - Session type theory
- Operation Categories - Category A/B/C classification
- Authorization - Guard chain specification
- Journal - CRDT and fact semantics
- Testing Guide - Test patterns
- Simulation Guide - Fault injection testing