security.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. """
  2. 安全相关工具:JWT、密码加密、API Key加密
  3. """
  4. from datetime import datetime, timedelta
  5. from typing import Optional, Dict
  6. from jose import JWTError, jwt
  7. from passlib.context import CryptContext
  8. from cryptography.fernet import Fernet
  9. from app.core.config import settings
  10. # 密码加密上下文
  11. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  12. # API Key加密器
  13. cipher = Fernet(settings.ENCRYPTION_KEY.encode())
  14. def verify_password(plain_password: str, hashed_password: str) -> bool:
  15. """验证密码"""
  16. return pwd_context.verify(plain_password, hashed_password)
  17. def get_password_hash(password: str) -> str:
  18. """生成密码哈希"""
  19. return pwd_context.hash(password)
  20. def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
  21. """
  22. 创建访问令牌
  23. """
  24. to_encode = data.copy()
  25. if expires_delta:
  26. expire = datetime.utcnow() + expires_delta
  27. else:
  28. expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
  29. to_encode.update({"exp": expire, "type": "access"})
  30. encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
  31. return encoded_jwt
  32. def create_refresh_token(data: dict) -> str:
  33. """
  34. 创建刷新令牌
  35. """
  36. to_encode = data.copy()
  37. expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
  38. to_encode.update({"exp": expire, "type": "refresh"})
  39. encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
  40. return encoded_jwt
  41. def decode_token(token: str) -> Optional[Dict]:
  42. """
  43. 解码令牌
  44. """
  45. try:
  46. payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
  47. return payload
  48. except JWTError:
  49. return None
  50. def encrypt_api_key(api_key: str) -> str:
  51. """
  52. 加密API Key
  53. """
  54. return cipher.encrypt(api_key.encode()).decode()
  55. def decrypt_api_key(encrypted_key: str) -> str:
  56. """
  57. 解密API Key
  58. """
  59. return cipher.decrypt(encrypted_key.encode()).decode()