Using Rumpsteak Handlers
Overview
The RumpsteakHandler provides a production-ready implementation of choreographic effects using session-typed channels. This guide covers everything you need to know to use it effectively in your applications.
Quick Start
Basic Two-Party Protocol
use rumpsteak_aura_choreography::effects::{
ChoreoHandler,
handlers::rumpsteak::{RumpsteakHandler, RumpsteakEndpoint, SimpleChannel},
};
// Define roles
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum Role { Alice, Bob }
impl rumpsteak_aura::Role for Role {
type Message = MyMessage;
fn seal(&mut self) {}
fn is_sealed(&self) -> bool { false }
}
// Define messages
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MyMessage {
content: String,
}
impl rumpsteak_aura::Message<Box<dyn std::any::Any + Send>> for MyMessage {
fn upcast(msg: Box<dyn std::any::Any + Send>) -> Self {
*msg.downcast::<MyMessage>().unwrap()
}
fn downcast(self) -> Result<Box<dyn std::any::Any + Send>, Self> {
Ok(Box::new(self))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoints
let mut alice_ep = RumpsteakEndpoint::new(Role::Alice);
let mut bob_ep = RumpsteakEndpoint::new(Role::Bob);
// Setup channels
let (alice_ch, bob_ch) = SimpleChannel::pair();
alice_ep.register_channel(Role::Bob, alice_ch);
bob_ep.register_channel(Role::Alice, bob_ch);
// Create handlers
let mut alice_handler = RumpsteakHandler::<Role, MyMessage>::new();
let mut bob_handler = RumpsteakHandler::<Role, MyMessage>::new();
// Send and receive
let msg = MyMessage { content: "Hello!".to_string() };
alice_handler.send(&mut alice_ep, Role::Bob, &msg).await?;
let received: MyMessage = bob_handler.recv(&mut bob_ep, Role::Alice).await?;
println!("Received: {}", received.content);
Ok(())
}
This example creates two endpoints and connects them with a SimpleChannel pair. It demonstrates direct send and recv calls through the handler.
Core Concepts
Roles
Roles represent participants in the choreography. They must implement:
rumpsteak_aura::RoleClone,Copy,Debug,PartialEq,Eq,Hash
Messages
Messages are the data exchanged between roles. They must:
- Implement
SerializeandDeserialize(via serde) - Implement
rumpsteak_aura::Message - Be
SendandSync
Endpoints
RumpsteakEndpoint<R> manages the channels and session state for a role:
- One endpoint per role in the protocol
- Contains channels to all peers
- Tracks session metadata (operation counts, state descriptions)
Channels
SimpleChannel provides bidirectional async message passing:
- Created in pairs:
SimpleChannel::pair() - Uses mpsc unbounded channels internally
- Automatically serializes/deserializes messages
Handlers
RumpsteakHandler<R, M> interprets choreographic effects:
- Stateless (can be shared across operations)
- Implements
ChoreoHandlertrait - Provides send, recv, choose, offer operations
API Reference
RumpsteakEndpoint
#![allow(unused)]
fn main() {
impl<R: Role + Eq + Hash + Clone> RumpsteakEndpoint<R>
}
This shows the generic bounds required by the endpoint type.
Constructor
#![allow(unused)]
fn main() {
pub fn new(local_role: R) -> Self
}
Create a new endpoint for a role.
Channel Management
#![allow(unused)]
fn main() {
pub fn register_channel<T>(&mut self, peer: R, channel: T)
}
Register a channel with a peer role.
#![allow(unused)]
fn main() {
pub fn register_session(&mut self, peer: R, session: RumpsteakSession)
}
Register a dynamically dispatched session (for example one produced via
RumpsteakSession::from_simple_channel or
RumpsteakSession::from_sink_stream). Use this when you need additional
transport logic such as WebSockets, recording, or custom middleware stacks.
#![allow(unused)]
fn main() {
pub fn has_channel(&self, peer: &R) -> bool
}
Check if a channel exists for a peer.
#![allow(unused)]
fn main() {
pub fn close_channel(&mut self, peer: &R) -> bool
}
Close a specific channel.
#![allow(unused)]
fn main() {
pub fn close_all_channels(&mut self) -> usize
}
Close all channels and return count.
#![allow(unused)]
fn main() {
pub fn active_channel_count(&self) -> usize
}
Get number of active channels.
#![allow(unused)]
fn main() {
pub fn is_all_closed(&self) -> bool
}
Check if all channels are closed.
Metadata Access
#![allow(unused)]
fn main() {
pub fn get_metadata(&self, peer: &R) -> Option<&SessionMetadata>
}
Get session metadata for a peer.
#![allow(unused)]
fn main() {
pub fn all_metadata(&self) -> Vec<(R, &SessionMetadata)>
}
Get metadata for all sessions.
RumpsteakHandler
#![allow(unused)]
fn main() {
impl<R, M> RumpsteakHandler<R, M>
}
This shows the handler type parameters. The handler is generic over role and message types.
Constructor
#![allow(unused)]
fn main() {
pub fn new() -> Self
}
Create a new handler.
ChoreoHandler Implementation
#![allow(unused)]
fn main() {
async fn send<Msg>(&mut self, ep: &mut Endpoint, to: Role, msg: &Msg) -> Result<()>
where Msg: Serialize + Send + Sync
}
Send a message to a role.
#![allow(unused)]
fn main() {
async fn recv<Msg>(&mut self, ep: &mut Endpoint, from: Role) -> Result<Msg>
where Msg: DeserializeOwned + Send
}
Receive a message from a role.
#![allow(unused)]
fn main() {
async fn choose(&mut self, ep: &mut Endpoint, who: Role, label: Label) -> Result<()>
}
Make a choice (internal choice).
#![allow(unused)]
fn main() {
async fn offer(&mut self, ep: &mut Endpoint, from: Role) -> Result<Label>
}
Offer a choice (external choice).
#![allow(unused)]
fn main() {
async fn with_timeout<F, T>(&mut self, ep: &mut Endpoint, at: Role, dur: Duration, body: F) -> Result<T>
where F: Future<Output = Result<T>> + Send
}
Execute operation with timeout.
SimpleChannel
#![allow(unused)]
fn main() {
pub struct SimpleChannel
}
This type wraps a bidirectional byte channel. It is the default transport for the handler.
Constructor
#![allow(unused)]
fn main() {
pub fn pair() -> (Self, Self)
}
Create a connected pair of channels.
Operations
#![allow(unused)]
fn main() {
pub async fn send(&mut self, msg: Vec<u8>) -> Result<(), String>
}
Send raw bytes.
#![allow(unused)]
fn main() {
pub async fn recv(&mut self) -> Result<Vec<u8>, String>
}
Receive raw bytes.
RumpsteakSession Builders
#![allow(unused)]
fn main() {
RumpsteakSession::from_simple_channel(channel: SimpleChannel)
}
Wraps a legacy channel in the new dynamic session API.
#![allow(unused)]
fn main() {
RumpsteakSession::from_sink_stream(sender, receiver)
}
Accepts any async sink and stream pair carrying Vec<u8> payloads. It exposes the pair to the handler. Use this for custom transports, then call endpoint.register_session(peer, session).
SessionMetadata
#![allow(unused)]
fn main() {
pub struct SessionMetadata {
pub state_description: String,
pub is_complete: bool,
pub operation_count: usize,
}
}
This struct records session state for a peer. It is updated as operations run.
Tracks session progression:
state_description: Human-readable current stateis_complete: Whether session has completedoperation_count: Number of operations performed
Usage Patterns
Pattern 1: Request-Response
#![allow(unused)]
fn main() {
// Client side
let request = Request { query: "data".to_string() };
handler.send(&mut endpoint, Role::Server, &request).await?;
let response: Response = handler.recv(&mut endpoint, Role::Server).await?;
}
This pattern sends a request and waits for a response. It is the simplest round trip flow.
Pattern 2: Choice with Branches
#![allow(unused)]
fn main() {
// Sender
let decision = if condition {
Label("accept")
} else {
Label("reject")
};
handler.choose(&mut endpoint, Role::Other, decision).await?;
// Receiver
let choice = handler.offer(&mut endpoint, Role::Other).await?;
match choice.0 {
"accept" => {
// Handle accept branch
}
"reject" => {
// Handle reject branch
}
_ => unreachable!(),
}
}
This pattern uses choose and offer to coordinate a branch. The chosen label drives the receiver logic.
Pattern 3: Sequential Messages
#![allow(unused)]
fn main() {
for item in items {
handler.send(&mut endpoint, Role::Peer, &item).await?;
let ack: Ack = handler.recv(&mut endpoint, Role::Peer).await?;
}
}
This pattern sends a batch of items with acknowledgments. Each step waits for the peer response.
Pattern 4: Multi-Party Coordination
#![allow(unused)]
fn main() {
// Coordinator
let offer: Offer = handler.recv(&mut endpoint, Role::Buyer).await?;
handler.send(&mut endpoint, Role::Seller, &offer).await?;
let response: Response = handler.recv(&mut endpoint, Role::Seller).await?;
handler.send(&mut endpoint, Role::Buyer, &response).await?;
}
This pattern relays messages between two peers. It keeps the coordinator role in control of ordering.
Pattern 5: Timeout Protection
#![allow(unused)]
fn main() {
let result = handler.with_timeout(
&mut endpoint,
Role::Self_,
Duration::from_secs(5),
async {
handler.recv(&mut endpoint, Role::Peer).await
}
).await;
match result {
Ok(msg) => {
// Process message
}
Err(ChoreographyError::Timeout(_)) => {
// Handle timeout
}
Err(e) => {
// Handle other errors
}
}
}
This pattern wraps a receive in with_timeout. It distinguishes timeout errors from other failures.
Best Practices
1. Resource Management
Recommended approach:
#![allow(unused)]
fn main() {
// Close channels explicitly when done
endpoint.close_all_channels();
}
This closes channels explicitly when the protocol is complete.
Recommended alternative:
#![allow(unused)]
fn main() {
// Use Drop to ensure cleanup
{
let mut endpoint = RumpsteakEndpoint::new(role);
// ... use endpoint ...
} // Drop ensures cleanup
}
This relies on drop to clean up resources at scope end.
Avoid:
#![allow(unused)]
fn main() {
// Don't forget to clean up resources
let mut endpoint = RumpsteakEndpoint::new(role);
// ... use endpoint ...
// Forgot to close!
}
This leaves channels open after the protocol.
2. Error Handling
Recommended approach:
#![allow(unused)]
fn main() {
match handler.send(&mut ep, role, &msg).await {
Ok(()) => { /* success */ }
Err(ChoreographyError::Transport(e)) => {
// Handle transport error
tracing::error!("Send failed: {}", e);
}
Err(e) => {
// Handle other errors
}
}
}
This handles transport errors explicitly. It keeps other errors visible.
Avoid:
#![allow(unused)]
fn main() {
// Don't ignore errors
handler.send(&mut ep, role, &msg).await.unwrap();
}
This panics on failures and hides transport details.
3. Channel Setup
Recommended approach:
#![allow(unused)]
fn main() {
// Setup all channels before starting protocol
let (ch1, ch2) = SimpleChannel::pair();
alice_ep.register_channel(Role::Bob, ch1);
bob_ep.register_channel(Role::Alice, ch2);
// Then start protocol
protocol_run().await?;
}
This ensures channels exist before the first send.
Avoid:
#![allow(unused)]
fn main() {
// Don't register channels mid-protocol
handler.send(&mut ep, role, &msg).await?; // Might not have channel!
ep.register_channel(role, channel); // Too late!
}
This can cause send failures when a channel is missing.
4. Metadata Usage
Recommended approach:
#![allow(unused)]
fn main() {
// Use metadata for debugging and monitoring
if let Some(meta) = endpoint.get_metadata(&peer) {
tracing::info!(
peer = ?peer,
operations = meta.operation_count,
state = %meta.state_description,
"Session status"
);
}
}
This reports progress and state for each peer. It is useful for debugging.
5. Testing
Recommended approach:
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_protocol() {
// Setup test environment
let mut alice_ep = RumpsteakEndpoint::new(Role::Alice);
let mut bob_ep = RumpsteakEndpoint::new(Role::Bob);
let (alice_ch, bob_ch) = SimpleChannel::pair();
alice_ep.register_channel(Role::Bob, alice_ch);
bob_ep.register_channel(Role::Alice, bob_ch);
// Test protocol
let msg = TestMessage { data: vec![1, 2, 3] };
handler.send(&mut alice_ep, Role::Bob, &msg).await.unwrap();
let received: TestMessage = handler.recv(&mut bob_ep, Role::Alice).await.unwrap();
assert_eq!(received.data, vec![1, 2, 3]);
}
}
This sets up a local channel pair and exercises a full send and receive. It validates handler wiring in tests.