agent_manager.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import asyncio
  2. import time
  3. import hashlib
  4. from typing import Dict, Optional
  5. from core.agent import create_langchain_agent
  6. from utils.logger import chat_logger
  7. class AgentManager:
  8. def __init__(self):
  9. self._local_agent_cache = {} # 仅缓存agent配置,不缓存实例
  10. self._is_shutdown = False
  11. async def initialize(self):
  12. """异步初始化管理器"""
  13. self._is_shutdown = False
  14. chat_logger.info("Agent管理器初始化完成")
  15. async def shutdown(self):
  16. """异步关闭管理器"""
  17. self._is_shutdown = True
  18. self._local_agent_cache.clear()
  19. chat_logger.info("Agent管理器已关闭")
  20. def _get_agent_config_key(
  21. self,
  22. thread_id: str,
  23. username: str,
  24. backend_url: str,
  25. token: str,
  26. context: str = "",
  27. ) -> str:
  28. """生成agent配置的缓存key"""
  29. key_data = f"{thread_id}:{username}:{backend_url}:{token}:{context}"
  30. return hashlib.md5(key_data.encode()).hexdigest()
  31. def _get_user_identifier(self, username: str, token: str) -> str:
  32. """生成用户标识符"""
  33. if not username or username == "default":
  34. username_part = "anonymous"
  35. else:
  36. username_part = username
  37. if token and len(token) >= 8:
  38. token_part = token[:8]
  39. else:
  40. token_part = "notoken"
  41. return f"{username_part}_{token_part}"
  42. async def get_agent_instance(
  43. self,
  44. thread_id: str,
  45. username: str,
  46. backend_url: str,
  47. token: str,
  48. context: str = "",
  49. ):
  50. if self._is_shutdown:
  51. raise RuntimeError("Agent管理器已关闭")
  52. clean_username = username or "anonymous"
  53. clean_backend = backend_url or ""
  54. clean_token = token or ""
  55. clean_context = context or ""
  56. user_id = self._get_user_identifier(clean_username, clean_token)
  57. config_key = self._get_agent_config_key(
  58. thread_id, clean_username, clean_backend, clean_token, clean_context
  59. )
  60. print(f"config_key: {config_key}")
  61. # 检查本地配置缓存
  62. current_time = time.time()
  63. if config_key in self._local_agent_cache:
  64. agent_instance, timestamp = self._local_agent_cache[config_key]
  65. if current_time - timestamp <= 300: # 5分钟本地缓存
  66. chat_logger.info(f"使用本地缓存的agent配置: 用户={user_id}")
  67. return agent_instance
  68. chat_logger.info(f"创建新的agent实例: 用户={user_id}")
  69. agent_instance = await self._create_agent_async(
  70. backend_url=clean_backend,
  71. token=clean_token,
  72. username=clean_username,
  73. thread_id=thread_id,
  74. context=clean_context,
  75. )
  76. # 缓存agent配置到本地
  77. self._local_agent_cache[config_key] = (agent_instance, current_time)
  78. chat_logger.info(f"Agent配置已缓存: 用户={user_id}")
  79. return agent_instance
  80. async def _create_agent_async(
  81. self,
  82. backend_url: str,
  83. token: str,
  84. username: str,
  85. thread_id: str,
  86. context: str = "",
  87. ):
  88. """创建agent实例"""
  89. def sync_create_agent():
  90. return create_langchain_agent(
  91. backend_url=backend_url,
  92. token=token,
  93. username=username,
  94. thread_id=thread_id,
  95. context=context,
  96. )
  97. loop = asyncio.get_event_loop()
  98. return await loop.run_in_executor(None, sync_create_agent)
  99. async def clear_user_agent(
  100. self, thread_id: str, username: str, backend_url: str, token: str
  101. ):
  102. """清除特定用户的agent配置缓存"""
  103. config_key = self._get_agent_config_key(thread_id, username, backend_url, token)
  104. # 清除本地缓存
  105. if config_key in self._local_agent_cache:
  106. del self._local_agent_cache[config_key]
  107. user_id = self._get_user_identifier(username, token)
  108. chat_logger.info(f"已清除用户Agent配置缓存: {user_id}")
  109. async def get_cache_status(self):
  110. """获取缓存状态"""
  111. if self._is_shutdown:
  112. return {"status": "shutdown", "cache_size": 0}
  113. return {
  114. "local_config_cache_size": len(self._local_agent_cache),
  115. "status": "active",
  116. "message": "",
  117. }
  118. async def clear_cache(self):
  119. """清空本地配置缓存"""
  120. local_count = len(self._local_agent_cache)
  121. self._local_agent_cache.clear()
  122. chat_logger.info(f"清空本地配置缓存: {local_count}个配置")
  123. return local_count
  124. # 全局实例
  125. agent_manager = AgentManager()