Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

流式持久化

流式 LLM 对话与数据库持久化示例。

自动持久化

使用 PostgreSQL 自动持久化流式对话。

位置: examples/streaming_persistence/

use mofa_sdk::persistence::quick_agent_with_postgres;

#[tokio::main]
async fn main() -> LLMResult<()> {
    // 创建带自动持久化的智能体
    let agent = quick_agent_with_postgres(
        "你是一个专业的 AI 助手。"
    ).await?
    .with_session_id("019bda9f-9ffd-7a80-a9e5-88b05e81a7d4")
    .with_name("流式持久化 Agent")
    .with_sliding_window(2)  // 保留最近 2 轮对话
    .build_async()
    .await;

    // 流式对话,自动持久化
    let mut stream = agent.chat_stream(&user_input).await?;
    while let Some(result) = stream.next().await {
        match result {
            Ok(text) => print!("{}", text),
            Err(e) => eprintln!("错误: {}", e),
        }
    }

    Ok(())
}

特性

  • 自动持久化:消息自动保存到数据库
  • 滑动窗口:可配置上下文窗口大小
  • 会话管理:支持跨重启恢复对话

手动持久化

完全控制持久化的内容和时机。

位置: examples/streaming_manual_persistence/

use mofa_sdk::persistence::{PersistenceContext, PostgresStore};

#[tokio::main]
async fn main() -> LLMResult<()> {
    // 连接数据库
    let store = PostgresStore::shared(&database_url).await?;

    // 创建持久化上下文(新会话或现有会话)
    let ctx = PersistenceContext::new(store, user_id, tenant_id, agent_id).await?;

    // 手动保存用户消息
    let user_msg_id = ctx.save_user_message(&user_input).await?;

    // 流式响应
    let mut stream = agent.chat_stream(&user_input).await?;
    let mut full_response = String::new();

    while let Some(result) = stream.next().await {
        if let Ok(text) = result {
            print!("{}", text);
            full_response.push_str(&text);
        }
    }

    // 手动保存助手响应
    let assistant_msg_id = ctx.save_assistant_message(&full_response).await?;

    Ok(())
}

何时使用手动持久化

  • 精细控制保存内容
  • 消息附加自定义元数据
  • 基于响应质量的条件持久化
  • 与现有事务边界集成

从数据库加载智能体配置

从 PostgreSQL 数据库加载智能体配置。

位置: examples/agent_from_database_streaming/

use mofa_sdk::persistence::{AgentStore, PostgresStore, PersistencePlugin};

#[tokio::main]
async fn main() -> Result<()> {
    let store = PostgresStore::connect(&database_url).await?;

    // 从数据库加载智能体配置
    let config = store
        .get_agent_by_code_and_tenant_with_provider(tenant_id, "chat-assistant")
        .await?
        .ok_or_else(|| anyhow!("Agent not found"))?;

    // 创建持久化插件
    let persistence = PersistencePlugin::from_store(
        "persistence-plugin",
        store,
        user_id,
        tenant_id,
        config.agent.id,
        session_id,
    );

    // 从数据库配置构建智能体
    let agent = LLMAgentBuilder::from_agent_config(&config)?
        .with_persistence_plugin(persistence)
        .build_async()
        .await;

    // 带数据库支持的流式对话
    let mut stream = agent.chat_stream(&user_input).await?;
    // ...

    Ok(())
}

数据库表结构

需要的表:

  • entity_agent — 智能体配置
  • entity_provider — LLM 提供商配置
  • entity_session — 对话会话
  • entity_message — 消息历史

运行示例

# 初始化数据库
psql -d your-database -f scripts/sql/migrations/postgres_init.sql

# 设置环境变量
export DATABASE_URL="postgres://user:pass@localhost:5432/mofa"
export OPENAI_API_KEY="sk-xxx"

# 运行自动持久化
cargo run -p streaming_persistence

# 运行手动持久化
cargo run -p streaming_manual_persistence

# 运行数据库驱动配置
export AGENT_CODE="chat-assistant"
export USER_ID="550e8400-e29b-41d4-a716-446655440003"
cargo run -p agent_from_database_streaming

可用示例

示例描述
streaming_persistence带滑动窗口的自动持久化
streaming_manual_persistence手动消息持久化控制
agent_from_database_streaming从数据库加载智能体配置

相关链接