# -*- coding: utf-8 -*- """ @File : mycelery.py.py @Time : 2023/3/9 16:16 @Author : geekbing @LastEditTime : - @LastEditors : - @Description : 设置环境变量,配置Celery应用程序 """ import logging import os from celery import Celery from celery.signals import after_setup_logger from django.conf import settings # 设置Django的环境变量 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings") app = Celery("backend") # 配置Celery应用程序 app.config_from_object("django.conf:settings", namespace="CELERY") # 自动检测每个已注册应用中的异步任务 app.autodiscover_tasks(settings.INSTALLED_APPS) app.conf.update( CELERY_QUEUES={ "beat_tasks": { "exchange": "beat_tasks", "exchange_type": "direct", "binding_key": "beat_tasks", }, "work_queue": { "exchange": "work_queue", "exchange_type": "direct", "binding_key": "work_queue", }, }, # 定义任务队列 CELERY_DEFAULT_QUEUE="work_queue", # 默认的任务队列 CELERY_FORCE_EXECV=True, # 有些情况下可以防止死锁 task_reject_on_worker_lost=True, # 任务丢失后拒绝执行 task_acks_late=True, # 任务执行完后,不立即删除任务结果 CELERY_WORKER_CONCURRENCY=2, # 并发数, 设置得接近于你的CPU核心数,或者稍微高一点 CELERY_MAX_TASKS_PER_CHILD=50, # 每个worker最多执行50个任务便自我销毁释放内存 CELERY_PREFETCH_MULTIPLIER=1, # 每次从任务队列取任务的数量 CELERY_ACCEPT_CONTENT=[ "application/json", # 指定接受的内容类型 ], CELERY_TASK_SERIALIZER="json", # 指定任务序列化类型 CELERY_RESULT_SERIALIZER="json", # 指定任务结果序列化类型 ) @after_setup_logger.connect def setup_logger(logger, *args, **kwargs): fh = logging.FileHandler("logs/celery.log", "a", encoding="utf-8") fh.setLevel(logging.INFO) # 再创建一个handler, 用于输出到控制台 ch = logging.StreamHandler() ch.setLevel(logging.INFO) # 定义handler的输出格式 formatter = logging.Formatter( "%(asctime)s %(levelname)s [pid:%(process)d] [%(name)s %(filename)s->%(funcName)s:%(lineno)s] %(message)s" ) fh.setFormatter(formatter) ch.setFormatter(formatter) # 给logger添加handler logger.addHandler(fh) logger.addHandler(ch)