网上搜到的huey教程基本上是在python代码内使用python运行huey_consumer.py的方式来运行huey任务消息队列,这种方式简直是画蛇添足。

通过查阅以及查看huey_consumer.py代码原理,最终实现了将huey嵌入项目代码中,使用multiprocessing,通过主进程创建两个子进程分别创建消费者与生产者,进而将huey集成到python项目中。

代码与注释

from huey import SqliteHuey
from huey.consumer import Consumer
from huey.consumer_options import ConsumerConfig
from multiprocessing import Process


# 创建 Huey 对象
huey = SqliteHuey(filename='demo/huey/sql/worker.db', fsync=True)

# 如果在子进程运行生产者,变量需要在主进程输入
# input_beans=input('How many beans? ')

@huey.task()
def my_task(num):
    """创建消费者需要执行的任务

    这里是输出主进程输入豆子数量并执行return返回
    不记录返回值,可用return None,并且生产者不用get
    如果返回了值,必须用get输出返回值,这样数据库内的返回值才会清理
    """
    print(f"输入有{num}个豆子")

    return f"执行print:输入有{num}个豆子完毕"



def start_consumer():
    """创建消费者进程函数
    写法解释:
    https://www.wnark.com/archives/237.html

    配置参数来源:
    https://huey.readthedocs.io/en/latest/contrib.html

    有两个配置没配:
    1."flush_locks" 指定了是否需要清空锁
    2."extra_locks" 可以用来指定额外的锁 该函数的作用是设置和协调工作进程和调度器的执行,并注册信号处理程序。
    """
    config = ConsumerConfig(
        workers=2,  # 消费者数量,压测只允许一次压一种方案
        worker_type='thread',  # 消费者类型,使用多线程的方式调用多进程
        initial_delay=1,   # 最小的轮询间隔,与-d相同。
        backoff=2,  # 指数退避使用这个速率,-b.
        max_delay=10,  # 可能的最大轮询间隔, -m.
        periodic=False,  # 允许周期性任务,控制周期由多进程/多线程运行
        check_worker_health=False,  # 关闭工人健康检查。压测死了就死了。
        health_check_interval=1,  # 指定了健康检查的轮询间隔
    )
    # 调用 ConsumerConfig 实例对象的 validate 方法,用于验证部分关键参数的合法性。
    config.validate()
    # Huey 消费者配置
    consumer = Consumer(huey, **config.values)
    # 到这一步消费者就正在运行了
    consumer.run()


# def start_producer():
#     """创建多进程运行生产者进程函数
#     """
#     beans = input_beans
#     # 启动生产者任务
#     x = my_task(int(beans)).get(blocking=True)
#     print(x)

def start_producer_main():
    """在主进程运行生产者进程函数
    如果是输入,必须在主进程中输入,因为:标准输入流 (stdin) 在创建时未附加到 Process 进程。
    由于多个进程从同一个流读取输入没有意义,默认情况下假定 stdin 将由主进程独占读取。

    https://stackoverflow.com/questions/63893634
    """
    input_beans=input('How many beans? ')
    beans = input_beans
    # 启动生产者任务
    x = my_task(int(beans)).get(blocking=True)
    print(x)


# 在单独的子进程启动消费者进程
consumer_process = Process(target=start_consumer)
consumer_process.start()

# 在单独子进程启动生产者进程
# producer_process = Process(target=start_producer)
# producer_process.start()

# 在主进程开始循环生产者发送命令
while True:
    start_producer_main()



# 等待消费者进程和生产者进程结束
consumer_process.join()
# producer_process.join()

为什么

其实huey作者建议我只使用multiprocessing.Pool标准库来实现我的想法:

如果您希望所有内容都在单个进程中运行,您可以:

huey_consumer作为父进程中的子进程运行,或者 建议:根本不要使用 huey,因为它似乎不适合您的用例。
老实说,我不知道你为什么坚持在这里使用 Huey,因为它通常也意味着运行 Redis 服务器(尽管你可以只使用
Sqlite)。但即便如此,你可能最好只使用multiprocessing.Pool标准库,因为一切都发生在你的单一应用程序的上下文中

但基于以下几个原因,我最后还是将huey集成到我的软件中:
一开始我选择使用fastapi作为api server,因为它足够轻量。但是fastapi自带的后台任务组件Background Tasks不推荐用于重载场景,而且fastapi建议的Celery已经停止了对Windows平台的官方支持。
为了减少跨平台的兼容性问题,没使用redis,而是使用sqlite来存储消息队列数据。
通过比较,我发现 huey 可以在 Windows + Linux 上运行并支持 sqlite,所以我决定使用 huey。

对比

celery: 最新版本的celery不再维护支持Windows了
Python-RQ: 更简洁,可以支持sqlite,使用python的crontab添加任务,但不支持多进程同时运行,只能排队运行。 https://python-rq.org/docs/scheduling/
Huey: 配置项更多,可以支持sqlite,使用类似celery的方式添加任务。
Dramatiq:官方不支持sqlite作为存储后端,虽然理论上redis也够用。

最后修改:2023 年 03 月 03 日
如果觉得我的文章对你有用,请随意赞赏