Workflows
Workflows orchestrate multiple agents into complex, coordinated processes. MoFA provides powerful workflow abstractions for building sophisticated AI systems.
Workflow Overview
A workflow is a directed graph where:
- Nodes are agents or operations
- Edges define the flow of data and control
graph LR
A[Input] --> B[Agent 1]
B --> C{Router}
C -->|Option A| D[Agent 2]
C -->|Option B| E[Agent 3]
D --> F[Agent 4]
E --> F
F --> G[Output]
StateGraph
The StateGraph is MoFA’s core workflow abstraction:
#![allow(unused)]
fn main() {
use mofa_sdk::workflow::{StateGraph, State, Transition};
// Create a workflow graph
let mut graph = StateGraph::new();
// Define states (nodes)
graph.add_state(State::new("start", start_handler));
graph.add_state(State::new("process", process_handler));
graph.add_state(State::new("end", end_handler));
// Define transitions (edges)
graph.add_transition("start", "process");
graph.add_transition("process", "end");
// Set entry point
graph.set_entry("start");
}
Multi-Agent Coordination Patterns
MoFA supports 7 built-in coordination patterns:
1. Request-Response
One-to-one deterministic communication:
#![allow(unused)]
fn main() {
use mofa_sdk::coordination::RequestResponse;
let pattern = RequestResponse::new(agent_a, agent_b);
let response = pattern
.send(AgentInput::text("What is AI?"))
.await?;
}
2. Publish-Subscribe
One-to-many broadcast:
#![allow(unused)]
fn main() {
use mofa_sdk::coordination::PubSub;
let mut pubsub = PubSub::new();
// Subscribe agents
pubsub.subscribe("news", agent_a);
pubsub.subscribe("news", agent_b);
pubsub.subscribe("news", agent_c);
// Broadcast message
pubsub.publish("news", AgentInput::text("Breaking news!")).await?;
}
3. Consensus
Multi-round negotiation for decision making:
#![allow(unused)]
fn main() {
use mofa_sdk::coordination::Consensus;
let consensus = Consensus::new()
.with_agents(vec![agent_a, agent_b, agent_c])
.with_threshold(0.6) // 60% agreement needed
.with_max_rounds(5);
let decision = consensus.decide(&proposal).await?;
}
4. Debate
Alternating discussion for quality improvement:
#![allow(unused)]
fn main() {
use mofa_sdk::coordination::Debate;
let debate = Debate::new()
.with_proposer(agent_a)
.with_opponent(agent_b)
.with_judge(agent_c)
.with_rounds(3);
let result = debate.debide(&topic).await?;
}
5. Parallel
Simultaneous execution with result aggregation:
#![allow(unused)]
fn main() {
use mofa_sdk::coordination::Parallel;
let parallel = Parallel::new()
.with_agents(vec![agent_a, agent_b, agent_c])
.with_aggregation(Aggregation::TakeBest);
let results = parallel.execute(input).await?;
}
6. Sequential
Pipeline execution:
#![allow(unused)]
fn main() {
use mofa_sdk::coordination::Sequential;
let pipeline = Sequential::new()
.add_step(agent_a) // Research
.add_step(agent_b) // Analysis
.add_step(agent_c); // Summary
let result = pipeline.execute(input).await?;
// Output of each step becomes input of next
}
7. Custom
User-defined patterns:
#![allow(unused)]
fn main() {
use mofa_sdk::coordination::CustomPattern;
struct MyCustomPattern {
// Your custom logic
}
impl CoordinationPattern for MyCustomPattern {
async fn execute(&self, input: AgentInput) -> AgentResult<AgentOutput> {
// Your custom coordination logic
}
}
}
Building Workflows
Example: Customer Support Workflow
#![allow(unused)]
fn main() {
use mofa_sdk::workflow::{StateGraph, State, WorkflowContext};
// Define state handlers
async fn triage(input: AgentInput, ctx: &mut WorkflowContext) -> Result<String, WorkflowError> {
let intent = ctx.call_agent("classifier", input.clone()).await?;
match intent.as_text().unwrap_or("") {
"technical" => Ok("technical_support"),
"billing" => Ok("billing_support"),
_ => Ok("general_support"),
}
}
async fn technical_support(input: AgentInput, ctx: &mut WorkflowContext) -> Result<String, WorkflowError> {
let response = ctx.call_agent("tech_agent", input).await?;
ctx.set("response", response);
Ok("satisfaction_check")
}
async fn billing_support(input: AgentInput, ctx: &mut WorkflowContext) -> Result<String, WorkflowError> {
let response = ctx.call_agent("billing_agent", input).await?;
ctx.set("response", response);
Ok("satisfaction_check")
}
async fn satisfaction_check(input: AgentInput, ctx: &mut WorkflowContext) -> Result<String, WorkflowError> {
let response = ctx.get::<AgentOutput>("response").unwrap();
println!("Response: {}", response.as_text().unwrap());
Ok("end")
}
// Build the workflow
let mut workflow = StateGraph::new();
workflow.add_state(State::async_handler("triage", triage));
workflow.add_state(State::async_handler("technical_support", technical_support));
workflow.add_state(State::async_handler("billing_support", billing_support));
workflow.add_state(State::async_handler("general_support", general_support));
workflow.add_state(State::async_handler("satisfaction_check", satisfaction_check));
workflow.add_state(State::terminal("end"));
workflow.add_transition("triage", "technical_support");
workflow.add_transition("triage", "billing_support");
workflow.add_transition("triage", "general_support");
workflow.add_transition("technical_support", "satisfaction_check");
workflow.add_transition("billing_support", "satisfaction_check");
workflow.add_transition("general_support", "satisfaction_check");
workflow.add_transition("satisfaction_check", "end");
workflow.set_entry("triage");
}
Running the Workflow
#![allow(unused)]
fn main() {
let ctx = WorkflowContext::new()
.with_agent("classifier", classifier_agent)
.with_agent("tech_agent", tech_agent)
.with_agent("billing_agent", billing_agent);
let result = workflow.run(AgentInput::text("I can't login to my account"), ctx).await?;
}
Workflow DSL
MoFA provides a DSL for defining workflows:
#![allow(unused)]
fn main() {
use mofa_sdk::workflow_dsl::WorkflowBuilder;
let workflow = WorkflowBuilder::new("customer_support")
.start("triage")
.agent("triage", classifier_agent)
.route("technical", "tech_agent")
.route("billing", "billing_agent")
.default("general_agent")
.agent("tech_agent", tech_agent)
.then("satisfaction")
.agent("billing_agent", billing_agent)
.then("satisfaction")
.agent("general_agent", general_agent)
.then("satisfaction")
.end("satisfaction")
.build();
}
Conditional Transitions
#![allow(unused)]
fn main() {
// Conditional routing
workflow.add_conditional_transition("triage", |ctx| {
let intent = ctx.get::<String>("intent").unwrap();
match intent.as_str() {
"technical" => "technical_support",
"billing" => "billing_support",
_ => "general_support",
}
});
}
Error Handling in Workflows
#![allow(unused)]
fn main() {
// Error recovery states
workflow.add_state(State::new("error_handler", error_handler));
workflow.add_transition("error", "error_handler");
workflow.add_transition("error_handler", "retry");
workflow.add_transition("error_handler", "end");
}
Workflow Visualization
Export workflows to various formats:
#![allow(unused)]
fn main() {
// Export to Mermaid diagram
let mermaid = workflow.to_mermaid();
println!("{}", mermaid);
// Export to DOT (Graphviz)
let dot = workflow.to_dot();
}
Best Practices
- Keep States Simple: Each state should do one thing well
- Use Meaningful Names: State names should describe their purpose
- Handle Errors: Always have error recovery paths
- Log Transitions: Log state transitions for debugging
- Test Paths: Test all possible paths through the workflow
See Also
- Multi-Agent Guide — Detailed coordination guide
- Secretary Agent — Human-in-the-loop workflows
- Examples: Workflows — Workflow examples