proactive_tasks.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. """
  2. 主动消息定时任务
  3. """
  4. from app.tasks.celery_app import celery_app
  5. from app.core.database import SessionLocal
  6. from app.models.conversation import UserActivity, Conversation
  7. from app.models.user import User
  8. from app.models.affection import AffectionScore
  9. from app.services.affection_service import should_send_proactive, calculate_proactive_interval
  10. from app.services.proactive_message_service import ProactiveMessageService
  11. from sqlalchemy import select
  12. from datetime import datetime, timedelta
  13. from loguru import logger
  14. @celery_app.task
  15. def check_proactive_messages_task():
  16. """
  17. 定时任务:检查并发送主动消息
  18. 每分钟执行一次
  19. """
  20. logger.info("开始检查主动消息...")
  21. db = SessionLocal()
  22. try:
  23. # 查询需要检查的对话
  24. now = datetime.now()
  25. result = db.execute(
  26. select(
  27. UserActivity,
  28. Conversation,
  29. AffectionScore,
  30. User
  31. )
  32. .join(Conversation, UserActivity.conversation_id == Conversation.id)
  33. .join(
  34. AffectionScore,
  35. (AffectionScore.user_id == Conversation.user_id) &
  36. (AffectionScore.character_id == Conversation.character_id)
  37. )
  38. .join(User, Conversation.user_id == User.id)
  39. .where(
  40. AffectionScore.current_score >= 40, # 只检查好感度 >= 40 的
  41. UserActivity.last_message_at < now - timedelta(minutes=3), # 超过3分钟未互动
  42. (UserActivity.next_proactive_at == None) |
  43. (UserActivity.next_proactive_at <= now)
  44. )
  45. )
  46. count = 0
  47. for activity, conv, affection, user in result:
  48. try:
  49. # 计算空闲时间
  50. minutes_idle = (now - activity.last_message_at).total_seconds() / 60
  51. # 判断是否应该发送
  52. if should_send_proactive(affection.current_score, minutes_idle):
  53. # 生成并发送消息(这里需要异步转同步)
  54. # 实际生产环境建议使用异步任务队列
  55. logger.info(
  56. f"准备为用户 {user.id} 和角色 {conv.character_id} 发送主动消息"
  57. )
  58. # 这里简化处理,实际应该调用异步函数
  59. # 可以发送到另一个队列或直接保存到待发送表
  60. # 计算下次发送时间
  61. interval = calculate_proactive_interval(affection.current_score)
  62. if interval:
  63. next_time = now + timedelta(minutes=interval)
  64. activity.next_proactive_at = next_time
  65. db.commit()
  66. count += 1
  67. except Exception as e:
  68. logger.error(f"处理对话 {conv.id} 时出错: {e}")
  69. continue
  70. logger.info(f"主动消息检查完成,触发 {count} 条消息")
  71. except Exception as e:
  72. logger.error(f"检查主动消息任务失败: {e}")
  73. finally:
  74. db.close()
  75. @celery_app.task
  76. def send_proactive_message_task(conversation_id: int):
  77. """
  78. 异步任务:生成并发送主动消息
  79. """
  80. # 这里需要实现实际的发送逻辑
  81. # 包括调用AI生成、保存到数据库、发送推送通知等
  82. pass