运行时系统
MoFA 运行时系统示例,用于智能体生命周期管理。
基础运行时 API
使用运行时 API 创建和管理智能体。
位置: examples/runtime_example/
#![allow(unused)]
fn main() {
use mofa_sdk::kernel::{MoFAAgent, AgentContext, AgentInput, AgentOutput, AgentState};
use mofa_sdk::runtime::{AgentRunner, AgentBuilder, SimpleRuntime, run_agents};
// 定义智能体
struct SimpleRuntimeAgent {
id: String,
name: String,
state: AgentState,
}
#[async_trait]
impl MoFAAgent for SimpleRuntimeAgent {
fn id(&self) -> &str { &self.id }
fn name(&self) -> &str { &self.name }
fn state(&self) -> AgentState { self.state.clone() }
async fn initialize(&mut self, _ctx: &AgentContext) -> AgentResult<()> {
self.state = AgentState::Ready;
Ok(())
}
async fn execute(&mut self, input: AgentInput, _ctx: &AgentContext) -> AgentResult<AgentOutput> {
self.state = AgentState::Executing;
let text = input.to_text();
self.state = AgentState::Ready;
Ok(AgentOutput::text(format!("处理完成: {}", text)))
}
async fn shutdown(&mut self) -> AgentResult<()> {
self.state = AgentState::Shutdown;
Ok(())
}
}
}
批量执行
通过单个智能体运行多个输入:
#![allow(unused)]
fn main() {
let agent = SimpleRuntimeAgent::new("agent_batch", "BatchAgent");
let inputs = vec![
AgentInput::text("task-1"),
AgentInput::text("task-2"),
AgentInput::text("task-3"),
];
let outputs = run_agents(agent, inputs).await?;
for output in outputs {
println!("输出: {}", output.to_text());
}
}
智能体构建器模式
带配置构建智能体:
#![allow(unused)]
fn main() {
let mut runtime = AgentBuilder::new("agent1", "AgentOne")
.with_capability("echo")
.with_capability("event_handler")
.with_agent(agent)
.await?;
runtime.start().await?;
runtime.handle_event(AgentEvent::Custom("test".to_string(), vec![])).await?;
runtime.stop().await?;
}
多智能体运行时
通过消息传递管理多个智能体。
#![allow(unused)]
fn main() {
let runtime = SimpleRuntime::new();
// 注册多个智能体
let metadata1 = AgentBuilder::new("master", "MasterAgent")
.with_capability("master")
.build_metadata();
let metadata2 = AgentBuilder::new("worker", "WorkerAgent")
.with_capability("worker")
.build_metadata();
let mut rx1 = runtime.register_agent(metadata1, config1, "master").await?;
let mut rx2 = runtime.register_agent(metadata2, config2, "worker").await?;
// 订阅主题
runtime.subscribe_topic("master", "commands").await?;
runtime.subscribe_topic("worker", "commands").await?;
// 发送消息
let bus = runtime.message_bus();
bus.publish("commands", AgentEvent::Custom("start".to_string(), vec![])).await?;
bus.send_to("worker", AgentEvent::Custom("task".to_string(), b"data".to_vec())).await?;
}
消息总线背压
处理消息总线中的背压。
位置: examples/runtime_message_bus_backpressure/
#![allow(unused)]
fn main() {
let runtime = SimpleRuntime::new();
// 注册小容量通道的智能体
let mut rx = runtime.register_agent(metadata, config, "worker").await?;
// 填满通道产生背压
runtime.send_to_agent("slow-agent", AgentEvent::Custom("warmup".to_string(), vec![])).await?;
// 生成会被阻塞的任务
let send_task = tokio::spawn({
let bus = bus.clone();
async move {
bus.send_to("slow-agent", AgentEvent::Custom("blocked".to_string(), vec![])).await
}
});
// 其他操作保持响应
timeout(Duration::from_millis(300), runtime.register_agent(other_meta, other_cfg, "observer")).await??;
// 消费消息解除阻塞
let _ = rx.recv().await;
send_task.await??;
}
关键点
send_to在接收者通道满时阻塞publish在任一订阅者通道满时阻塞- 背压时其他运行时操作保持响应
- 使用超时检测慢消费者
运行示例
# 基础运行时示例
cargo run -p runtime_example
# 背压演示
cargo run -p runtime_message_bus_backpressure
可用示例
| 示例 | 描述 |
|---|---|
runtime_example | 基础运行时 API |
runtime_message_bus_backpressure | 消息总线背压处理 |
相关链接
- 架构概览 — 运行时架构
- API 参考:运行时 — 运行时 API