logo
0
0
WeChat Login

M2: Function Call 实现数据库查询

apt update # 更新软件源 apt install -y python3 python3-pip python3-venv # 安装 Python3 + pip + 虚拟环境模块 python3 -m venv /workspace/venv source /workspace/venv/bin/activate pip install -r requirements.txt

1. 在 /workspace 下创建虚拟环境(命名为 venv,可自定义)

python3 -m venv /workspace/venv

2. 激活虚拟环境(激活后终端提示符会带 (venv) 标识)

source /workspace/venv/bin/activate

bash scripts/download_qwen3_models.sh

模块目标

使用 Function Call 技术让生成的 SQL 在真实数据库上执行并返回结果。

关键功能点

  • ✅ 扩展 State 结构 (添加 execution_result 字段)
  • ✅ 实现数据库工具 (tools/db.py,支持 SQLite)
  • ✅ 创建 SQL 执行节点 (execute_sql_node)
  • ✅ 准备示例数据库 (Chinook)
  • ✅ 图结构: parse_intent → generate_sql → execute_sql → echo → END

新增内容 (相比 M1)

  • graphs/state.py: 新增 execution_resultexecuted_at 字段
  • graphs/nodes/execute_sql.py: SQL 执行节点实现
  • tools/db.py: 数据库客户端封装,支持 SQLite 查询
  • scripts/setup_db.py: 自动下载 Chinook 示例数据库
  • data/chinook.db: Chinook 示例数据库 (音乐商店数据)
  • tests/test_m2_acceptance.py: M2 验收测试 (8 个测试用例)

快速开始

1. 切换到 M2 分支

git checkout 02-func-call-db

2. 配置 API Key (如果还没配置)

# 复制环境变量模板 cp .env.example .env # 编辑 .env 文件,填入你的 API Key # 推荐使用 DeepSeek (国内访问快,价格低)

.env 配置示例:

LLM_PROVIDER=deepseek DEEPSEEK_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx # 数据库配置 (默认值,通常不需要修改) DB_TYPE=sqlite DB_PATH=data/chinook.db

3. 下载示例数据库

python scripts/setup_db.py

这会自动下载 Chinook 数据库 (约 1MB),包含 11 个表:

  • Album: 专辑 (347 条)
  • Artist: 艺术家 (275 条)
  • Customer: 客户 (59 条)
  • Employee: 员工 (8 条)
  • Genre: 音乐风格 (25 条)
  • Invoice: 发票 (412 条)
  • InvoiceLine: 发票明细 (2240 条)
  • MediaType: 媒体类型 (5 条)
  • Playlist: 播放列表 (18 条)
  • PlaylistTrack: 播放列表歌曲 (8715 条)
  • Track: 歌曲 (3503 条)

4. 运行测试

测试数据库工具:

python tools/db.py

测试 SQL 执行节点:

python graphs/nodes/execute_sql.py

运行完整图:

python graphs/base_graph.py

运行 M2 验收测试:

python tests/test_m2_acceptance.py

验收标准

所有测试用例必须成功执行并返回正确结果 (100% 通过率)

测试用例示例

  1. Simple SELECT: "Show all albums"
  2. Count aggregation: "How many tracks are there?"
  3. Top N with ORDER BY: "What are the top 5 longest tracks?"
  4. WHERE clause: "Show albums by AC/DC"
  5. JOIN query: "Show all albums with their artist names"
  6. GROUP BY aggregation: "Count albums by artist"
  7. Multiple tables: "Show customer names and their total invoice amounts"
  8. Date filtering: "Show invoices from 2010"

预期输出

================================================== Starting NL2SQL Graph (M2 - Function Call DB) ================================================== === Parse Intent Node === Question: Show all albums Intent: {...} === Generate SQL Node === Question: Show all albums LLM Response: SELECT * FROM Album; Extracted SQL: SELECT * FROM Album; === Execute SQL Node === SQL: SELECT * FROM Album; ✓ Query successful Rows returned: 100 Columns: AlbumId, Title, ArtistId First row: AlbumId: 1 Title: For Those About To Rock We Salute You ArtistId: 1 === Echo Node === Session ID: xxx Question: Show all albums Generated SQL: SELECT * FROM Album; Execution Result: ✓ Success Rows: 100 Columns: AlbumId, Title, ArtistId First row: {'AlbumId': 1, 'Title': 'For Those About To Rock We Salute You', 'ArtistId': 1} ================================================== SQL Generated: ✓ SQL Executed: ✓

核心实现

1. 数据库客户端 (tools/db.py)

from tools.db import db_client # 执行 SQL 查询 result = db_client.query("SELECT * FROM Album LIMIT 5") if result["ok"]: print(f"Rows: {result['row_count']}") print(f"Columns: {result['columns']}") for row in result['rows']: print(row) else: print(f"Error: {result['error']}")

特点:

  • 统一接口,支持多种数据库 (当前支持 SQLite)
  • 只读模式:只允许 SELECT 查询
  • 自动限制返回行数 (默认 100 行)
  • 完整的错误处理
  • 结果以字典列表形式返回

返回格式:

{ "ok": True, # 是否成功 "rows": [ # 查询结果 {"col1": "value1", ...}, ... ], "columns": ["col1", "col2"], # 列名 "row_count": 10, # 行数 "error": None # 错误信息 }

2. SQL 执行节点 (graphs/nodes/execute_sql.py)

def execute_sql_node(state: NL2SQLState) -> NL2SQLState: # 1. 获取生成的 SQL # 2. 调用数据库客户端执行 # 3. 处理结果和错误 # 4. 返回更新后的 State

特点:

  • 自动从 State 获取 candidate_sql
  • 调用数据库客户端执行查询
  • 将执行结果存入 State 的 execution_result
  • 记录执行时间戳

3. Chinook 数据库

标准的示例数据库,模拟音乐商店业务:

主要表关系:

Artist (艺术家) ↓ (1:N) Album (专辑) ↓ (1:N) Track (歌曲) → Genre (风格) ↓ (N:M) PlaylistTrack → Playlist (播放列表) ↓ InvoiceLine → Invoice → Customer (客户)

技术亮点

1. Function Call 模式

使用 Function Call 模式实现数据库查询:

  1. LLM 生成 SQL (function arguments)
  2. 系统执行 SQL (function execution)
  3. 返回结果给 LLM (function result)

这是典型的 Agent Tool Use 模式。

2. 安全设计

  • 只读模式: 只允许 SELECT 查询,拒绝 INSERT/UPDATE/DELETE
  • 结果限制: 默认最多返回 100 行
  • 错误隔离: SQL 错误不会导致系统崩溃
  • 参数化查询: 支持 prepared statements (防 SQL 注入)

3. 结果结构化

查询结果以字典列表形式返回,便于:

  • JSON 序列化
  • 后续处理和转换
  • 展示给用户
  • 传递给其他节点

数据库工具详解

支持的操作

from tools.db import db_client # 1. 查询数据 result = db_client.query("SELECT * FROM Album LIMIT 10") # 2. 获取表名 tables = db_client.get_table_names() # ['Album', 'Artist', 'Customer', ...] # 3. 获取表结构 schema = db_client.get_table_schema("Album") # {'table_name': 'Album', 'columns': [...]} # 4. 获取所有表结构 all_schemas = db_client.get_all_schemas() # 5. 测试连接 if db_client.test_connection(): print("Connected!")

错误处理

result = db_client.query("SELECT * FROM NonExistent") if not result["ok"]: print(f"Error: {result['error']}") # Error: Database error: no such table: NonExistent

常见错误:

  • 表不存在
  • 列不存在
  • SQL 语法错误
  • 非 SELECT 查询 (被拒绝)

限制与改进方向

当前限制

  • ❌ 只支持 SQLite (M3 可扩展到 MySQL/PostgreSQL)
  • ❌ 没有 SQL 校验 (M4 会添加)
  • ❌ 没有执行沙箱 (M5 会添加)
  • ❌ 结果不展示给用户 (M9 会添加答案生成)
  • ❌ 没有超时控制 (M5 会添加)

改进方向

  • M3: 注入真实 Schema,提升 SQL 生成质量
  • M4: 添加 SQL 语法校验和修复
  • M5: 添加执行沙箱和安全限制
  • M9: 将查询结果转换为自然语言回答
  • M11: 添加查询日志和追踪

常见问题

Q: 数据库文件没下载成功? A:

  1. 检查网络连接
  2. 手动下载: https://github.com/lerocha/chinook-database
  3. Chinook_Sqlite.sqlite 重命名为 chinook.db 并放入 data/ 目录

Q: 查询失败 "no such table"? A:

  1. 运行 python tools/db.py 查看可用表名
  2. 检查表名大小写 (SQLite 区分大小写)
  3. 确认数据库文件路径正确

Q: 如何使用自己的数据库? A: 修改 .env 文件:

DB_TYPE=sqlite DB_PATH=path/to/your/database.db

Q: 如何添加 MySQL/PostgreSQL 支持? A: 在 tools/db.pyDatabaseClient 中添加相应的驱动和连接逻辑:

if self.db_type == "mysql": import mysql.connector conn = mysql.connector.connect(...) elif self.db_type == "postgresql": import psycopg2 conn = psycopg2.connect(...)

Q: 执行时间太长怎么办? A: M5 模块会添加超时控制和查询优化。当前可以通过 fetch_limit 参数限制返回行数:

result = db_client.query(sql, fetch_limit=10)

Chinook 数据库说明

业务场景

模拟一个在线音乐商店,类似 iTunes Store。

主要实体

  • Artist: 艺术家 (如 AC/DC, Iron Maiden)
  • Album: 专辑
  • Track: 歌曲
  • Genre: 音乐风格 (Rock, Jazz, Metal...)
  • Customer: 客户
  • Invoice: 发票
  • Employee: 员工

常见查询示例

-- 1. 查询所有艺术家 SELECT * FROM Artist; -- 2. 查询 AC/DC 的所有专辑 SELECT a.Title FROM Album a JOIN Artist ar ON a.ArtistId = ar.ArtistId WHERE ar.Name = 'AC/DC'; -- 3. 统计每个风格的歌曲数量 SELECT g.Name, COUNT(*) as TrackCount FROM Track t JOIN Genre g ON t.GenreId = g.GenreId GROUP BY g.Name ORDER BY TrackCount DESC; -- 4. 查询消费最多的前 10 个客户 SELECT c.FirstName, c.LastName, SUM(i.Total) as TotalSpent FROM Customer c JOIN Invoice i ON c.CustomerId = i.CustomerId GROUP BY c.CustomerId ORDER BY TotalSpent DESC LIMIT 10;

下一步

  • M3 (03-schema-ingestion): 引入 Schema 感知,大幅提升 SQL 生成质量
  • M4 (04-sql-guardrail): 添加 SQL 校验和自动修复
  • M5 (05-exec-sandbox): 添加执行沙箱和安全限制

许可证

MIT License

About

DeepSeek-R1-0528-Qwen3-8B一键启动

23.51 GiB
0 forks0 stars2 branches0 TagREADMEMIT license
Language
Python97.3%
Shell2.6%
Dockerfile0.1%