affection_service.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. """
  2. 好感度计算和管理服务
  3. """
  4. from typing import Dict, Optional
  5. from datetime import datetime
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from sqlalchemy import select, update
  8. from app.models.affection import AffectionScore, AffectionLog
  9. from app.models.conversation import Message
  10. import random
  11. import json
  12. # 好感度等级定义
  13. AFFECTION_LEVELS = {
  14. "厌恶": (-100, -60),
  15. "陌生": (-60, -20),
  16. "普通": (-20, 20),
  17. "熟人": (20, 50),
  18. "朋友": (50, 80),
  19. "挚友": (80, 95),
  20. "恋人": (95, 101), # 101是上限
  21. }
  22. def get_affection_level(score: int) -> str:
  23. """根据分数获取好感度等级"""
  24. for level, (min_score, max_score) in AFFECTION_LEVELS.items():
  25. if min_score <= score < max_score:
  26. return level
  27. return "恋人" # 满分
  28. def get_next_level_score(current_score: int) -> Optional[int]:
  29. """获取下一等级所需分数"""
  30. current_level = get_affection_level(current_score)
  31. level_names = list(AFFECTION_LEVELS.keys())
  32. if current_level == "恋人":
  33. return None # 已达最高等级
  34. current_index = level_names.index(current_level)
  35. next_level = level_names[current_index + 1]
  36. return AFFECTION_LEVELS[next_level][0]
  37. class AffectionService:
  38. """好感度服务"""
  39. @staticmethod
  40. async def get_or_create_affection(
  41. db: AsyncSession,
  42. user_id: int,
  43. character_id: int
  44. ) -> AffectionScore:
  45. """获取或创建好感度记录"""
  46. result = await db.execute(
  47. select(AffectionScore).where(
  48. AffectionScore.user_id == user_id,
  49. AffectionScore.character_id == character_id
  50. )
  51. )
  52. affection = result.scalar_one_or_none()
  53. if not affection:
  54. affection = AffectionScore(
  55. user_id=user_id,
  56. character_id=character_id,
  57. current_score=0,
  58. level="普通"
  59. )
  60. db.add(affection)
  61. await db.commit()
  62. await db.refresh(affection)
  63. return affection
  64. @staticmethod
  65. async def update_affection(
  66. db: AsyncSession,
  67. affection_id: int,
  68. score_change: int,
  69. reason: str,
  70. message_id: Optional[int] = None,
  71. sentiment_data: Optional[Dict] = None
  72. ):
  73. """更新好感度"""
  74. # 获取当前好感度
  75. result = await db.execute(
  76. select(AffectionScore).where(AffectionScore.id == affection_id)
  77. )
  78. affection = result.scalar_one()
  79. # 计算新分数(限制在-100到100之间)
  80. new_score = max(-100, min(100, affection.current_score + score_change))
  81. new_level = get_affection_level(new_score)
  82. # 更新好感度
  83. await db.execute(
  84. update(AffectionScore)
  85. .where(AffectionScore.id == affection_id)
  86. .values(
  87. current_score=new_score,
  88. level=new_level,
  89. last_interaction=datetime.now(),
  90. total_interactions=AffectionScore.total_interactions + 1
  91. )
  92. )
  93. # 创建变化记录
  94. log = AffectionLog(
  95. affection_score_id=affection_id,
  96. message_id=message_id,
  97. score_change=score_change,
  98. reason=reason,
  99. sentiment_analysis=sentiment_data
  100. )
  101. db.add(log)
  102. await db.commit()
  103. return new_score, new_level
  104. @staticmethod
  105. def analyze_sentiment_from_ai_response(ai_response: str) -> Dict:
  106. """
  107. 从AI回复中提取情感分析结果
  108. AI回复格式应包含:<sentiment>{"affection_change": 3, "reason": "..."}</sentiment>
  109. """
  110. try:
  111. # 查找<sentiment>标签
  112. start = ai_response.find("<sentiment>")
  113. end = ai_response.find("</sentiment>")
  114. if start != -1 and end != -1:
  115. sentiment_json = ai_response[start + 11:end]
  116. sentiment_data = json.loads(sentiment_json)
  117. # 提取纯文本内容(去除sentiment标签)
  118. clean_content = ai_response[:start] + ai_response[end + 12:]
  119. return {
  120. "content": clean_content.strip(),
  121. "affection_change": sentiment_data.get("affection_change", 0),
  122. "reason": sentiment_data.get("reason", ""),
  123. "sentiment_data": sentiment_data
  124. }
  125. except Exception as e:
  126. # 解析失败,使用默认值
  127. pass
  128. # 默认返回
  129. return {
  130. "content": ai_response,
  131. "affection_change": 1, # 默认+1
  132. "reason": "正常对话",
  133. "sentiment_data": {}
  134. }
  135. def calculate_proactive_interval(affection_score: int) -> Optional[int]:
  136. """
  137. 根据好感度计算主动消息间隔(分钟)
  138. 返回:随机间隔的分钟数,如果不应发送则返回None
  139. """
  140. if affection_score >= 90:
  141. return random.randint(1, 10) # 1-10分钟
  142. elif affection_score >= 80:
  143. return random.randint(10, 30) # 10-30分钟
  144. elif affection_score >= 60:
  145. return random.randint(30, 120) # 0.5-2小时
  146. elif affection_score >= 40:
  147. return random.randint(120, 360) # 2-6小时
  148. else:
  149. return None # 不发送
  150. def should_send_proactive(affection_score: int, minutes_idle: int) -> bool:
  151. """判断是否应该发送主动消息"""
  152. if affection_score >= 90 and minutes_idle >= 3:
  153. return True
  154. elif affection_score >= 80 and minutes_idle >= 10:
  155. return True
  156. elif affection_score >= 60 and minutes_idle >= 30:
  157. return True
  158. elif affection_score >= 40 and minutes_idle >= 120:
  159. return True
  160. return False