import os import dotenv import datetime from pathlib import Path from langchain.agents import create_agent, AgentState from langchain_openai import ChatOpenAI from langchain_core.messages import ( SystemMessage, HumanMessage, BaseMessage, trim_messages, ) from openai import chat from tools.tool_factory import get_all_tools from langchain_core.runnables import RunnableConfig from langchain.agents.middleware import before_model from langgraph.runtime import Runtime from typing import Any, List, Sequence from langchain.messages import RemoveMessage from langgraph.graph.message import REMOVE_ALL_MESSAGES import sqlite3 from config.settings import settings from langchain_core.messages.utils import count_tokens_approximately from core.worker_manager import get_worker_tools from utils.context_helper import safe_context_param from utils.logger import chat_logger dotenv.load_dotenv() def create_system_prompt( backend_url: str = "", token: str = "", username: str = "default", context: str = "" ) -> str: # auth_status = "已认证" if token else "未认证" # backend_available = "API可用" if backend_url and token else "仅数据查询" # knowledge_status = ( # "知识库可用" if settings.KNOWLEDGE_BASE_ENABLED else "知识库已禁用" # ) # echart_status = "图表可用" if settings.ECHARTS_ENABLED else "图表已禁用" # if settings.KNOWLEDGE_BASE_ENABLED: # # 知识库启用时的提示词 # system_prompt = f"""龙嘉软件助手- 用户:{username} 认证:{auth_status} 服务:{backend_available} 知识库:{knowledge_status} 图表:{echart_status} # 职责:ERP数据查询和问题解答,按用户语言回答。 # **核心安全指令 (必遵)**: # 1. **当前凭据 (每次工具调用必须使用)**: # - 后端地址: {backend_url if backend_url else '无'} # - API令牌: {token if token else '无'} # 2. **禁止沿用历史**:**严禁**从对话历史中复制、沿用任何旧的后端地址、令牌或工具参数。历史记录仅用于理解背景,其中的工具详情**不能**作为本次调用的参数来源。 # 3. **调用规范**:调用查询工具时,**必须且只能**使用上方提供的当前凭据。 # 工作流: # 1. 分析问题意图,提取模块关键词 # 2. 如果是数据查询类问题,直接调用相关工具查询数据 # 3. 如果是其他问题,则通过工具搜索知识库,知识库工具使用流程:a.通过关键字获取相关文章列表,b.判断哪些文章最符合,c.再通过工具获取文章内容.严格按文章内容回复,不能编造答案. # 4. 关键词要精准,避免无意义词 # 工具调用规格: # - 如果连续3次调用相同工具相同参数,自动停止 # - 工具返回相同结果但仍在重复调用时,自动停止 # 回答规则: # - 知识库找不到时提示"正在学习该问题" # - {"需要个人数据时验证认证状态" if backend_url else "仅提供数据查询和知识库支持"} # - 保护隐私,专业准确,精炼简要 # 时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} # 数据查询结果尽量以 Markdown 表格格式输出,格式如下: # | 列名1 | 列名2 | 列名3 | # | :--- | :--- | :--- | # | 数据1 | 数据2 | 数据3 | # | 数据4 | 数据5 | 数据6 | # """ # else: # # 知识库禁用时的提示词 - 灵活处理工具返回结果 # system_prompt = f"""龙嘉软件助手- 用户:{username} 认证:{auth_status} 服务:{backend_available} 知识库:{knowledge_status} 图表:{echart_status} # 职责:处理ERP数据查询类问题,按用户语言回答。 # **核心安全指令 (必遵)**: # 1. **当前凭据 (每次工具调用必须使用)**: # - 后端地址: {backend_url if backend_url else '无'} # - API令牌: {token if token else '无'} # 2. **禁止沿用历史**:**严禁**从对话历史中复制、沿用任何旧的后端地址、令牌或工具参数。历史记录仅用于理解背景,其中的工具详情**不能**作为本次调用的参数来源。 # 3. **调用规范**:调用查询工具时,**必须且只能**使用上方提供的当前凭据。 # 工作流: # 1. 分析问题意图,判断是否为数据查询类问题 # 2. 如果是数据查询类问题,直接调用相关工具查询数据 # 3. 根据工具返回的结果进行回答: # - 如果工具返回了具体数据,按数据内容回答 # - 如果工具返回了错误信息(如"API返回错误","查询失败","没有权限"等),如实告知用户错误信息 # - 如果工具返回空数据或"未找到数据",如实告知用户 # 4. 如果是非数据查询类问题(如疑问、流程、操作等),回复:"知识库正在完善,无法回答该问题" # 工具调用规格: # - 禁止连续调用相同工具相同参数 # - 工具返回相同结果但仍在重复调用时,自动停止 # 回答规则: # - 如用户提出非ERP范围的问题(例如:"你好"等闲聊),明确告知用户自己的职责:主要处理ERP数据查询类问题 # - 工具提示没有权限时,明确回复用户没有权限 # - 严格按工具返回的内容回答,不能编造答案,可对结果进行简单总结 # - 当工具返回错误信息时,如实转达给用户,不要添加额外解释 # - 保持专业、准确、简洁的回答风格 # {"- 需要个人数据时验证认证状态" if backend_url else "- 仅提供数据查询支持"} # 当前时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} # 数据查询结果尽量以 Markdown 表格格式输出,格式如下: # | 列名1 | 列名2 | 列名3 | # | :--- | :--- | :--- | # | 数据1 | 数据2 | 数据3 | # | 数据4 | 数据5 | 数据6 | # """ context = safe_context_param(context) system_prompt = f""" # 龙嘉AI助手-多Agent协调系统 ## 用户信息 - 用户名: {username} ##上下文 上下文长度: {len(context)} ## 严格行为规则 你是一个调度器,**不是回答者**。你的唯一职责是决定是否调用工具。 每轮对话只能调用一次工具,不能连续调用。 ## 决策规则 - 就算提供了上下文,你也**必须调用**合适的工具,不能直接回答用户问题。 - 如果无法判断调用那个工具,引导用户提供更多信息。 ## 关键行为约束 ### 当调用工具时: 1. 你**必须**调用合适的工具 2. 调用后**立即停止**,只能输出**"工具调用成功"** ## 重要警告 - 调用工具后,**不要**基于工具返回的结果继续生成回答 - 系统会自动将工具结果返回给用户 - 你在调用工具后,输出"工具调用成功",你的任务就结束了 当需要工具时 - **必须**调用合适的Worker工具 - **必须传递以下4个参数**: 1. query: 用户问题 2. backend_url: {backend_url if backend_url else '无'} 3. token: {token if token else '无'} 4. context: {f"###对话上下文开始###\n{context}\n###对话上下文结束###" if context else '无'} - **输出格式**:必须且只能输出:`工具调用成功` ## 零容忍规则 **严禁参数错误**:每次调用都必须使用当前提供的backend_url/token,不能使用历史值 **严禁猜测**:如果不能确定,就调用工具 ## 当前状态 - 时间: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} """ return system_prompt def get_day_number(date=None): """获取日期编号 (YYYYMMDD 格式)""" if date is None: date = datetime.datetime.now() return date.strftime("%Y%m%d") # 格式: 20251229 def get_sqlite_checkpointer(): """创建按天分割的SQLite检查点保存器""" try: from langgraph.checkpoint.sqlite import SqliteSaver # 获取当前日期编号 current_day = get_day_number() # 数据库文件存放目录 project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) base_dir = os.path.join(project_root, "data", "checkpoints") os.makedirs(base_dir, exist_ok=True) # 数据库文件名格式: checkpoints_20251229.db db_filename = f"checkpoints_{current_day}.db" db_path = os.path.join(base_dir, db_filename) # checkpointer = SqliteSaver.from_conn_string(db_path) conn = sqlite3.connect(db_path, check_same_thread=False) conn.execute("PRAGMA wal_autocheckpoint=500") # 2MB 就提交 conn.execute("PRAGMA journal_size_limit=52428800") # 最大 50MB checkpointer = SqliteSaver(conn) return checkpointer except Exception as e: print(f"[ERROR]创建 SQLite 检查器失败: {e}") import traceback traceback.print_exc() # 回退到内存保存器 from langgraph.checkpoint.memory import InMemorySaver print("[WARN]使用 InMemorySaver 作为回退") return InMemorySaver() def cleanup_old_checkpoints(max_days=7): """清理超过指定天数的旧检查点文件(可选功能)""" try: project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) base_dir = os.path.join(project_root, "data", "checkpoints") if not os.path.exists(base_dir): return # 获取当前日期 current_date = datetime.datetime.now() # 遍历目录中的所有.db文件 for filename in os.listdir(base_dir): if filename.startswith("checkpoints_") and filename.endswith(".db"): try: print(f"检查旧检查点文件: {filename}") # 提取日期 (checkpoints_day_20251229.db -> 20251229) date_str = filename.replace("checkpoints_day_", "").replace( ".db", "" ) file_date = datetime.datetime.strptime(date_str, "%Y%m%d") # 计算天数差 days_diff = (current_date - file_date).days # 删除超过 max_days 天的旧数据 if days_diff > max_days: file_path = os.path.join(base_dir, filename) os.remove(file_path) print( f"[CLEAN]清理旧检查点文件: {filename} (超过 {max_days} 天)" ) except (ValueError, IndexError): # 文件名不符合预期,跳过 continue except Exception as e: print(f"[WARN]清理旧检查点失败: {e}") # 创建agent def create_langchain_agent( backend_url: str = "", token: str = "", username: str = "default", thread_id: str = "default", context: str = "", ): llm = ChatOpenAI( model=settings.LLM_MODEL, temperature=settings.LLM_TEMPERATURE, api_key=settings.DEEPSEEK_API_KEY, base_url=settings.DEEPSEEK_BASE_URL, max_tokens=settings.LLM_MAX_TOKENS, ) tools = get_worker_tools() # 添加调试信息 # print(f"[DEBUG]Agent 创建调试信息:") # print(f" - 用户: {username}") # print(f" - Thread ID: {thread_id}") # print(f" - 后端地址: {backend_url}") # print(f" - Token: {'已提供' if token else '未提供'}") # print(f" - worker数量: {len(tools)}") # for i, tool in enumerate(tools): # print(f" - worker {i+1}: {tool.name}") # 获取动态的system_prompt system_prompt = create_system_prompt(backend_url, token, username, context) # print(f"[DEBUG]上下文长度: {len(context)}") # print(system_prompt) # chat_logger.info(f"主Agent System Prompt上下文: {system_prompt}") @before_model def trim_messages_middleware( state: AgentState, runtime: Runtime ) -> dict[str, Any] | None: """使用官方trim_messages函数修剪消息""" messages = state.get("messages", []) print(f"trim_messages_middleware[DEBUG]原始消息数: {len(messages)}") # if len(messages) <= 3: # return None # 不需要修剪 trimmed_messages = trim_messages( messages, max_tokens=500, strategy="last", # 保留最近的对话 token_counter=count_tokens_approximately, # token计数器 start_on="human", # 从human消息开始计算轮次 include_system=True, # 包含系统消息 ) # 添加调试信息 original_count = len(messages) trimmed_count = len(trimmed_messages) print(f"trim_messages_middleware[DEBUG]修剪后消息数: {trimmed_count}") if trimmed_count < original_count: print(f"[INFO]消息修剪: {original_count} -> {trimmed_count} 条消息") return {"messages": trimmed_messages} # 使用SQLiteSaver(按天分割) checkpointer = get_sqlite_checkpointer() # print(f"打印检查点保存器: {checkpointer}") # 可选:清理旧检查点(可配置为定期执行) if os.getenv("AUTO_CLEANUP", "false").lower() == "true": cleanup_old_checkpoints(max_days=7) # 保留最近7天数据 # agent = create_agent( # llm, # tools, # checkpointer=checkpointer, # system_prompt=system_prompt, # middleware=[trim_messages_middleware], # ) agent = create_agent( llm, tools, system_prompt=system_prompt, ) return agent