# -*- coding: utf-8 -*- """ @File : redis_manager.py @Time : 2023/9/7 18:42 @Author : geekbing @LastEditTime : - @LastEditors : - @Description : redis客户端Manager """ import asyncio import functools import inspect import json import pickle from random import Random from typing import Tuple from awaits.awaitable import awaitable from loguru import logger from redis import ConnectionPool, StrictRedis from rediscluster import RedisCluster, ClusterConnectionPool from backend import settings from apps.exceptions.error import RedisError class RedisManager: """非线程安全,可能存在问题""" _cluster_pool = dict() _pool = dict() @property def client(self): pool = ConnectionPool( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB, max_connections=100, password=settings.REDIS_PASSWORD, decode_responses=True, ) return StrictRedis(connection_pool=pool, decode_responses=True) @staticmethod def delete_client(redis_id: int, cluster: bool): """ 根据redis_id和是否是集群删除客户端 :param redis_id: :param cluster: :return: """ if cluster: RedisManager._cluster_pool.pop(redis_id) else: RedisManager._pool.pop(redis_id) @staticmethod def get_cluster_client(redis_id: int, address: str): """ 获取redis集群客户端 :param redis_id: :param address: :return: """ cluster = RedisManager._cluster_pool.get(redis_id) if cluster is not None: return cluster client = RedisManager.get_cluster(address) RedisManager._cluster_pool[redis_id] = client return client @staticmethod def get_single_node_client(redis_id: int, address: str, password: str, db: int): """ 获取redis单实例客户端 :param redis_id: :param address: :param password: :param db: :return: """ node = RedisManager._pool.get(redis_id) if node is not None: return node if ":" not in address: raise Exception("redis连接未包含端口号,请检查配置") host, port = address.split(":") pool = ConnectionPool( host=host, port=port, db=db, max_connections=100, password=password, decode_responses=True, ) client = StrictRedis(connection_pool=pool) RedisManager._pool[redis_id] = client return client @staticmethod def refresh_redis_client(redis_id: int, address: str, password: str, db: str): """ 刷新redis客户端 :param redis_id: :param address: :param password: :param db: :return: """ host, port = address.split(":") pool = ConnectionPool( host=host, port=port, db=db, max_connections=100, password=password, decode_responses=True, ) client = StrictRedis(connection_pool=pool, decode_responses=True) RedisManager._pool[redis_id] = client @staticmethod def refresh_redis_cluster(redis_id: int, addr: str): RedisManager._cluster_pool[redis_id] = RedisManager.get_cluster(addr) @staticmethod def get_cluster(address: str): """ 获取集群连接池 :param address: :return: """ try: nodes = address.split(",") startup_nodes = [ {"host": n.split(":")[0], "port": n.split(":")[1]} for n in nodes if ":" in n ] if len(startup_nodes) == 0: raise Exception("找不到集群节点,请检查配置") pool = ClusterConnectionPool( startup_nodes=startup_nodes, max_connections=100, decode_responses=True ) client = RedisCluster(connection_pool=pool, decode_responses=True) return client except Exception as e: raise RedisError(f"获取Redis连接失败, {e}") class RedisHelper: prefix = "fastapi" redis_client = RedisManager().client @staticmethod @awaitable def execute_command(client, command, *args, **kwargs): return client.execute_command(command, *args, **kwargs) @staticmethod @awaitable def ping(): """ test redis client :return: """ return RedisHelper.redis_client.ping() @staticmethod def get_address_record(address: str): """ 获取ip是否已经开启录制 :param address: :return: """ key = RedisHelper.get_key(f"record:ip:{address}") return RedisHelper.redis_client.get(key) @staticmethod def get_user_record(user_id: str): """ 获取当前用户是否已开启录制 :param user_id: :return: """ key = RedisHelper.get_key(f"user:id:{user_id}") return RedisHelper.redis_client.get(key) @staticmethod @awaitable def cache_record(user_id: str, request): """ 缓存录制数据 :param user_id: 开启录制的用户id :param request: 客户端请求流量 :return: """ key = RedisHelper.get_key(f"id:{user_id}:requests") RedisHelper.redis_client.rpush(key, request) ttl = RedisHelper.redis_client.ttl(key) if ttl < 0: RedisHelper.redis_client.expire(key, 3600) @staticmethod def set_address_record( user_id: int, address: str, regex: str, is_local: bool, ): """ 设置录制状态 :param user_id: :param address: :param regex: 录制的url正则 :param is_local: False: 其他端录制,True: 本机录制 :return: """ # 默认录制数据保存1小时 value = json.dumps( { "user_id": user_id, "regex": regex, "ip": address, "local": is_local, }, ensure_ascii=False, ) RedisHelper.redis_client.set( RedisHelper.get_key(f"record:ip:{address}"), value, ex=3600 ) RedisHelper.redis_client.set( RedisHelper.get_key(f"user:id:{user_id}"), value, ex=3600 ) # 清除上次录制数据 RedisHelper.redis_client.delete(RedisHelper.get_key(f"id:{user_id}:requests")) @staticmethod def remove_address_record(address: str): """ 停止录制任务 :param address: :return: """ return RedisHelper.redis_client.delete( RedisHelper.get_key(f"record:ip:{address}") ) @staticmethod def remove_user_record(user_id: str): """ 停止录制任务 :param user_id: :return: """ return RedisHelper.redis_client.delete( RedisHelper.get_key(f"user:id:{user_id}") ) @staticmethod def list_record_data(user_id: str): """ 获取录制数据 :param user_id: 开启录制的用户id :return: """ key = RedisHelper.get_key(f"id:{user_id}:requests") data = RedisHelper.redis_client.lrange(key, 0, -1) return [json.loads(x) for x in data] @staticmethod def remove_record_data(user_id: str, index: int): """ 删除录制数据 :param user_id: 开启录制的用户id :param index: :return: """ key = RedisHelper.get_key(f"id:{user_id}:requests") RedisHelper.redis_client.lset(key, index, "DELETED") RedisHelper.redis_client.lrem(key, 1, "DELETED") @staticmethod def async_delete_prefix(key: str): """ 根据前缀删除数据 :param key: :return: """ for k in RedisHelper.redis_client.scan_iter(f"{key}*"): RedisHelper.redis_client.delete(k) logger.bind(name=None).debug(f"delete redis key: {k}") @staticmethod def delete_prefix(key: str): """ 根据前缀删除数据 :param key: :return: """ for k in RedisHelper.redis_client.scan_iter(f"{key}*"): RedisHelper.redis_client.delete(k) logger.bind(name=None).debug(f"delete redis key: {k}") @staticmethod def get_key(_redis_key: str, args_key: bool = True, *args, **kwargs): if not args_key: return f"{RedisHelper.prefix}:{_redis_key}" filter_args = [ a for a in args if not str(a).startswith((" 0 else ''}" ) @staticmethod def get_key_with_suffix(cls_name: str, key: str, args: tuple, key_suffix): filter_args = [a for a in args if not str(args[0]).startswith("