Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

运行时系统

MoFA 运行时系统示例,用于智能体生命周期管理。

基础运行时 API

使用运行时 API 创建和管理智能体。

位置: examples/runtime_example/

#![allow(unused)]
fn main() {
use mofa_sdk::kernel::{MoFAAgent, AgentContext, AgentInput, AgentOutput, AgentState};
use mofa_sdk::runtime::{AgentRunner, AgentBuilder, SimpleRuntime, run_agents};

// 定义智能体
struct SimpleRuntimeAgent {
    id: String,
    name: String,
    state: AgentState,
}

#[async_trait]
impl MoFAAgent for SimpleRuntimeAgent {
    fn id(&self) -> &str { &self.id }
    fn name(&self) -> &str { &self.name }
    fn state(&self) -> AgentState { self.state.clone() }

    async fn initialize(&mut self, _ctx: &AgentContext) -> AgentResult<()> {
        self.state = AgentState::Ready;
        Ok(())
    }

    async fn execute(&mut self, input: AgentInput, _ctx: &AgentContext) -> AgentResult<AgentOutput> {
        self.state = AgentState::Executing;
        let text = input.to_text();
        self.state = AgentState::Ready;
        Ok(AgentOutput::text(format!("处理完成: {}", text)))
    }

    async fn shutdown(&mut self) -> AgentResult<()> {
        self.state = AgentState::Shutdown;
        Ok(())
    }
}
}

批量执行

通过单个智能体运行多个输入:

#![allow(unused)]
fn main() {
let agent = SimpleRuntimeAgent::new("agent_batch", "BatchAgent");
let inputs = vec![
    AgentInput::text("task-1"),
    AgentInput::text("task-2"),
    AgentInput::text("task-3"),
];

let outputs = run_agents(agent, inputs).await?;
for output in outputs {
    println!("输出: {}", output.to_text());
}
}

智能体构建器模式

带配置构建智能体:

#![allow(unused)]
fn main() {
let mut runtime = AgentBuilder::new("agent1", "AgentOne")
    .with_capability("echo")
    .with_capability("event_handler")
    .with_agent(agent)
    .await?;

runtime.start().await?;
runtime.handle_event(AgentEvent::Custom("test".to_string(), vec![])).await?;
runtime.stop().await?;
}

多智能体运行时

通过消息传递管理多个智能体。

#![allow(unused)]
fn main() {
let runtime = SimpleRuntime::new();

// 注册多个智能体
let metadata1 = AgentBuilder::new("master", "MasterAgent")
    .with_capability("master")
    .build_metadata();

let metadata2 = AgentBuilder::new("worker", "WorkerAgent")
    .with_capability("worker")
    .build_metadata();

let mut rx1 = runtime.register_agent(metadata1, config1, "master").await?;
let mut rx2 = runtime.register_agent(metadata2, config2, "worker").await?;

// 订阅主题
runtime.subscribe_topic("master", "commands").await?;
runtime.subscribe_topic("worker", "commands").await?;

// 发送消息
let bus = runtime.message_bus();
bus.publish("commands", AgentEvent::Custom("start".to_string(), vec![])).await?;
bus.send_to("worker", AgentEvent::Custom("task".to_string(), b"data".to_vec())).await?;
}

消息总线背压

处理消息总线中的背压。

位置: examples/runtime_message_bus_backpressure/

#![allow(unused)]
fn main() {
let runtime = SimpleRuntime::new();

// 注册小容量通道的智能体
let mut rx = runtime.register_agent(metadata, config, "worker").await?;

// 填满通道产生背压
runtime.send_to_agent("slow-agent", AgentEvent::Custom("warmup".to_string(), vec![])).await?;

// 生成会被阻塞的任务
let send_task = tokio::spawn({
    let bus = bus.clone();
    async move {
        bus.send_to("slow-agent", AgentEvent::Custom("blocked".to_string(), vec![])).await
    }
});

// 其他操作保持响应
timeout(Duration::from_millis(300), runtime.register_agent(other_meta, other_cfg, "observer")).await??;

// 消费消息解除阻塞
let _ = rx.recv().await;
send_task.await??;
}

关键点

  • send_to 在接收者通道满时阻塞
  • publish 在任一订阅者通道满时阻塞
  • 背压时其他运行时操作保持响应
  • 使用超时检测慢消费者

运行示例

# 基础运行时示例
cargo run -p runtime_example

# 背压演示
cargo run -p runtime_message_bus_backpressure

可用示例

示例描述
runtime_example基础运行时 API
runtime_message_bus_backpressure消息总线背压处理

相关链接