관리-도구
편집 파일: jwt_issuer.py
import random import os import string from datetime import datetime, timedelta from pathlib import Path from defence360agent.subsys.panels.base import InvalidTokenException from defence360agent.contracts.config import UIRole, UserType from defence360agent.utils import atomic_rewrite UIRoleToUserType = { UIRole.ADMIN: UserType.ROOT, UIRole.CLIENT: UserType.NON_ROOT, } class JWTIssuer: JWT_SECRET_FILE = Path("/var/imunify360/.api-secret.key") JWT_SECRET_FILE_PREV = Path("/var/imunify360/.api-secret-prev.key") TOKEN_EXPIRATION_TTL = timedelta(hours=6) SECRET_EXPIRATION_TTL = timedelta(days=1) @classmethod def is_secret_expired(cls): try: stat = os.stat(cls.JWT_SECRET_FILE) except FileNotFoundError: st_mtime = 0.0 else: st_mtime = stat.st_mtime return ( datetime.now().timestamp() - st_mtime > cls.SECRET_EXPIRATION_TTL.seconds ) @classmethod def _get_secret(cls) -> str: if cls.is_secret_expired(): new_secret = "".join( random.choice(string.ascii_uppercase + string.digits) for _ in range(64) ) if not cls.JWT_SECRET_FILE.exists(): cls.JWT_SECRET_FILE.touch() atomic_rewrite( str(cls.JWT_SECRET_FILE), new_secret, backup=str(cls.JWT_SECRET_FILE_PREV), uid=-1, permissions=0o600, ) return new_secret else: return cls.JWT_SECRET_FILE.read_text() @classmethod def get_token(cls, user_name: str, user_type: UIRole) -> str: """ Generates a token with several encoded fields: user name, user type, expiration timestamp """ import jwt return jwt.encode( { "user_type": user_type, "username": user_name, "exp": (datetime.now() + cls.TOKEN_EXPIRATION_TTL).timestamp(), }, cls._get_secret(), ) @classmethod def _parse_token(cls, token: str, secret: str) -> dict | None: import jwt # if handle these exceptions at global level, # jwt shoud be imported there, # increasing memory consumation try: return jwt.decode(token, secret, algorithms=["HS256"]) except jwt.PyJWTError: pass @classmethod def parse_token(cls, token: str): for secret_pth in [cls.JWT_SECRET_FILE, cls.JWT_SECRET_FILE_PREV]: if not secret_pth.exists(): continue decoded = cls._parse_token(token, secret_pth.read_text()) if decoded: return { "user_name": decoded["username"], "user_type": UIRoleToUserType[decoded["user_type"]], } else: raise InvalidTokenException("INVALID_TOKEN")