base_tool.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. import re
  2. import html
  3. import requests
  4. import json
  5. from typing import List, Dict, Any, Optional, Callable
  6. from pathlib import Path
  7. from utils.logger import chat_logger
  8. def html_to_text(html_content: str) -> str:
  9. """HTML转文本"""
  10. if not html_content:
  11. return ""
  12. clean = re.compile(r"<[^>]+>")
  13. text = clean.sub("", html_content)
  14. text = html.unescape(text)
  15. return re.sub(r"\s+", " ", text).strip()
  16. def get_unique_match_count(search_text: str, filter_words: List[str]) -> int:
  17. """获取唯一匹配计数"""
  18. sorted_keywords = sorted(filter_words, key=len, reverse=True)
  19. match_count = 0
  20. remaining_text = search_text.lower()
  21. for keyword in sorted_keywords:
  22. kw_lower = keyword.lower()
  23. if kw_lower in remaining_text:
  24. match_count += 1
  25. remaining_text = remaining_text.replace(kw_lower, "", 1)
  26. return match_count
  27. def calculate_relevance_score(
  28. doc_name: str, doc_keywords: str, search_keywords: List[str]
  29. ) -> float:
  30. """
  31. 计算文档与搜索关键词的相关性得分
  32. Args:
  33. doc_name: 文档标题
  34. doc_keywords: 文档关键词
  35. search_keywords: 搜索关键词列表
  36. Returns:
  37. 相关性得分(0-100)
  38. """
  39. # 合并搜索文本
  40. search_text = f"{doc_name} {doc_keywords}".lower()
  41. search_keywords_lower = [kw.lower() for kw in search_keywords]
  42. # 权重设置
  43. TITLE_WEIGHT = 0.7 # 标题权重(提高)
  44. KEYWORD_WEIGHT = 0.3 # 关键词权重
  45. EXACT_MATCH_BONUS = 0.5 # 精确匹配奖励(提高)
  46. PARTIAL_MATCH_FACTOR = 0.3 # 部分匹配因子
  47. total_score = 0.0
  48. # 1. 标题匹配得分(重新设计)
  49. title_score = 0.0
  50. doc_name_lower = doc_name.lower()
  51. for keyword in search_keywords_lower:
  52. if keyword in doc_name_lower:
  53. # 基础得分:基于关键词长度和位置
  54. base_score = min(len(keyword) * 2, 10) # 每个字符2分,最多10分
  55. # 位置权重:标题开头和结尾的匹配更重要
  56. if doc_name_lower.startswith(keyword):
  57. base_score *= 1.5
  58. elif doc_name_lower.endswith(keyword):
  59. base_score *= 1.3
  60. # 精确匹配奖励:完全包含关键词
  61. if f" {keyword} " in f" {doc_name_lower} ":
  62. base_score *= 1 + EXACT_MATCH_BONUS
  63. title_score += base_score
  64. # 2. 关键词匹配得分
  65. keyword_score = 0.0
  66. for keyword in search_keywords_lower:
  67. if keyword in doc_keywords.lower():
  68. keyword_score += min(len(keyword) * 1.5, 8) # 每个字符1.5分,最多8分
  69. # 3. 计算总得分
  70. total_score = (title_score * TITLE_WEIGHT) + (keyword_score * KEYWORD_WEIGHT)
  71. # 4. 匹配数量奖励(重要改进)
  72. matched_count = sum(1 for kw in search_keywords_lower if kw in search_text)
  73. if matched_count > 0:
  74. coverage_ratio = matched_count / len(search_keywords_lower)
  75. # 匹配越多,奖励越大
  76. total_score *= 1 + coverage_ratio * 0.5
  77. # 5. 特殊关键词优先级(针对你的具体需求)
  78. priority_keywords = ["交期", "审核", "修改", "终止", "失败", "错误", "明细", "功能"]
  79. for keyword in priority_keywords:
  80. if keyword in search_keywords_lower and keyword in search_text:
  81. total_score *= 1.2 # 提高优先级关键词奖励
  82. # 6. 长标题惩罚调整(避免长标题得分过低)
  83. if len(doc_name) > 30:
  84. # 长标题轻微惩罚,但不要过度惩罚
  85. total_score *= 0.9
  86. # 7. 确保至少匹配一个关键词就有基础分
  87. if matched_count == 0:
  88. return 0.0
  89. return min(total_score, 100.0)
  90. def find_most_relevant_document(
  91. doc_list: List[dict], search_keywords: List[str], max_matches: int = 10
  92. ) -> List[dict]:
  93. """
  94. 找到最相关的文档
  95. Args:
  96. doc_list: 文档列表
  97. search_keywords: 搜索关键词
  98. max_matches: 最大返回数量(增加到10)
  99. Returns:
  100. 按相关性排序的文档列表
  101. """
  102. scored_docs = []
  103. for doc in doc_list:
  104. doc_id = doc["DocID"]
  105. doc_name = doc["DocName"]
  106. doc_keywords = doc.get("keyword", "")
  107. # 计算相关性得分
  108. score = calculate_relevance_score(doc_name, doc_keywords, search_keywords)
  109. # 降低过滤门槛,只要匹配至少一个关键词就考虑
  110. match_count = sum(
  111. 1
  112. for kw in search_keywords
  113. if kw.lower() in f"{doc_name} {doc_keywords}".lower()
  114. )
  115. if match_count > 0:
  116. scored_docs.append(
  117. {
  118. "doc_id": doc_id,
  119. "doc_name": doc_name,
  120. "keywords": doc_keywords,
  121. "relevance_score": score,
  122. "match_count": match_count,
  123. }
  124. )
  125. # 按相关性得分降序排序
  126. scored_docs.sort(key=lambda x: x["relevance_score"], reverse=True)
  127. return scored_docs[:max_matches]
  128. def call_csharp_api(
  129. backend_url: str, token: str, uoName: str, functionName: str, SParms: dict
  130. ) -> str:
  131. """调用C# API的通用方法"""
  132. print(f"🔧 API调用调试信息:")
  133. print(f" - 后端地址: {backend_url}")
  134. print(f" - Token: {'已配置' if token else '未配置'}")
  135. print(f" - 功能: {functionName}")
  136. print(f" - 参数: {SParms}")
  137. if not backend_url or not token:
  138. error_msg = f"错误:未配置后端地址或认证令牌。后端: {backend_url or '未配置'}, Token: {'已配置' if token else '未配置'}"
  139. print(f"❌ {error_msg}")
  140. return error_msg
  141. headers = {
  142. "Accept": "application/json, text/plain, */*",
  143. "Content-Type": "application/json",
  144. "X-TOKEN": token,
  145. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
  146. }
  147. payload = {
  148. "token": token,
  149. "CList": [
  150. {
  151. "uoName": uoName,
  152. "functionName": functionName,
  153. "SParms": SParms,
  154. "ifcommit": True,
  155. "returnStrList": [],
  156. }
  157. ],
  158. "language": "zh-cn",
  159. }
  160. try:
  161. response = requests.post(backend_url, headers=headers, json=payload, timeout=60)
  162. if response.status_code == 200:
  163. data = response.json()
  164. chat_logger.info(
  165. f"API响应数据:uoName={uoName}, functionName={functionName}, SParms={SParms}"
  166. )
  167. # 检查是否存在ErrMsg字段,如果有则直接返回错误信息
  168. if "ErrMsg" in data and data["ErrMsg"]:
  169. error_msg = f"API返回错误: {data['ErrMsg']}"
  170. print(f"❌ {error_msg}")
  171. return error_msg
  172. return process_api_response(data)
  173. else:
  174. error_msg = f"API请求失败,状态码: {response.status_code}"
  175. chat_logger.error(
  176. f"uoName={uoName}, functionName={functionName}, SParms={SParms}, error_msg: {error_msg}"
  177. )
  178. return error_msg
  179. except Exception as e:
  180. error_msg = f"API调用异常: {str(e)}"
  181. chat_logger.error(
  182. f"uoName={uoName}, functionName={functionName}, SParms={SParms}, error_msg: {error_msg}"
  183. )
  184. return error_msg
  185. def process_api_response(data: Dict[str, Any]) -> str:
  186. """处理API响应"""
  187. try:
  188. inner_json_str = data.get("reJob", {}).get("0", "{}")
  189. print("inner_json_str:" + inner_json_str)
  190. inner_data = json.loads(inner_json_str)
  191. if "err_msg" in inner_data:
  192. return f"API返回错误: {inner_data['err_msg']}"
  193. warning_msg = None
  194. if "warning_msg" in inner_data:
  195. warning_msg = inner_data["warning_msg"]
  196. if "data" in inner_data:
  197. data_list = inner_data["data"]
  198. if not data_list:
  199. return "没有数据"
  200. if isinstance(data_list[0], dict):
  201. headers = list(data_list[0].keys())
  202. result = [",".join(headers)]
  203. for row in data_list:
  204. result.append(",".join([str(row.get(h, "")) for h in headers]))
  205. if warning_msg:
  206. result.append(f"# 警告: {warning_msg}")
  207. print(result)
  208. return "\n".join(result)
  209. return json.dumps(data, ensure_ascii=False)
  210. except Exception as e:
  211. return f"响应处理错误: {str(e)}"
  212. # 工具配置管理函数
  213. def load_tool_config(
  214. config_path: Path, get_default_config: Optional[Callable] = None
  215. ) -> Dict[str, Any]:
  216. """
  217. 加载工具配置的通用函数
  218. Args:
  219. config_path: 配置文件路径
  220. get_default_config: 获取默认配置的回调函数,如果不提供则返回空字典
  221. """
  222. if not config_path.exists():
  223. print(f"警告: 配置文件不存在: {config_path}")
  224. if get_default_config:
  225. return get_default_config()
  226. return {}
  227. try:
  228. with open(config_path, "r", encoding="utf-8") as f:
  229. return json.load(f)
  230. except json.JSONDecodeError as e:
  231. print(f"错误: 配置文件格式不正确: {e}")
  232. if get_default_config:
  233. return get_default_config()
  234. return {}
  235. except Exception as e:
  236. print(f"错误: 读取配置文件失败: {e}")
  237. if get_default_config:
  238. return get_default_config()
  239. return {}
  240. def assemble_tool_description(tool_config: Dict[str, Any]) -> str:
  241. """组装工具描述,将所有键值组合成一个完整的字符串"""
  242. if not tool_config:
  243. return ""
  244. description_parts = []
  245. # 基础描述
  246. if "基础描述" in tool_config:
  247. description_parts.append(tool_config["基础描述"])
  248. # 功能说明
  249. if "功能说明" in tool_config:
  250. description_parts.append(f"\n功能: {tool_config['功能说明']}")
  251. # 入参说明
  252. if "入参说明" in tool_config:
  253. if isinstance(tool_config["入参说明"], dict):
  254. description_parts.append("\n参数:")
  255. for param, desc in tool_config["入参说明"].items():
  256. description_parts.append(f" {param}: {desc}")
  257. else:
  258. description_parts.append(f"\n参数说明: {tool_config['入参说明']}")
  259. # 返回值说明
  260. if "返回值说明" in tool_config:
  261. if isinstance(tool_config["返回值说明"], dict):
  262. description_parts.append("\n返回:")
  263. for key, value in tool_config["返回值说明"].items():
  264. if isinstance(value, list):
  265. description_parts.append(f" {key}:")
  266. for item in value:
  267. description_parts.append(f" - {item}")
  268. else:
  269. description_parts.append(f" {key}: {value}")
  270. else:
  271. description_parts.append(f"\n返回结果: {tool_config['返回值说明']}")
  272. # 输出格式要求
  273. if "输出格式要求" in tool_config:
  274. if isinstance(tool_config["输出格式要求"], list):
  275. description_parts.append("\n输出要求:")
  276. for requirement in tool_config["输出格式要求"]:
  277. description_parts.append(f" - {requirement}")
  278. else:
  279. description_parts.append(f"\n注意: {tool_config['输出格式要求']}")
  280. # 使用示例
  281. if "使用示例" in tool_config:
  282. if isinstance(tool_config["使用示例"], list):
  283. description_parts.append("\n示例:")
  284. for example in tool_config["使用示例"]:
  285. description_parts.append(f" - {example}")
  286. else:
  287. description_parts.append(f"\n示例: {tool_config['使用示例']}")
  288. return "\n".join(description_parts)
  289. def get_tool_prompt(
  290. tool_name: str, default_config_func: Optional[Callable] = None
  291. ) -> str:
  292. """
  293. 获取工具的完整提示词
  294. Args:
  295. tool_name: 工具名称
  296. default_config_func: 获取默认配置的函数
  297. """
  298. # 计算配置文件路径
  299. current_file = Path(__file__)
  300. config_path = current_file.parent.parent / "config" / "tool_config.json"
  301. # 加载配置
  302. config = load_tool_config(config_path, default_config_func)
  303. # 获取工具配置
  304. tool_config = config.get(tool_name, {})
  305. # 如果配置为空且提供了默认配置函数,使用默认配置
  306. if not tool_config and default_config_func:
  307. default_config = default_config_func()
  308. if isinstance(default_config, dict) and tool_name in default_config:
  309. tool_config = default_config[tool_name]
  310. elif isinstance(default_config, dict) and not default_config:
  311. # 如果返回的是整个配置字典
  312. tool_config = default_config
  313. else:
  314. tool_config = {}
  315. # 组装描述
  316. if tool_config:
  317. return assemble_tool_description(tool_config)
  318. else:
  319. return f"执行 {tool_name} 功能"