网上搜到的huey教程基本上是在python代码内使用python运行huey_consumer.py
的方式来运行huey任务消息队列,这种方式简直是画蛇添足。
通过查阅以及查看huey_consumer.py代码原理,最终实现了将huey嵌入项目代码中,使用multiprocessing,通过主进程创建两个子进程分别创建消费者与生产者,进而将huey集成到python项目中。
代码与注释
from huey import SqliteHuey
from huey.consumer import Consumer
from huey.consumer_options import ConsumerConfig
from multiprocessing import Process
# 创建 Huey 对象
huey = SqliteHuey(filename='demo/huey/sql/worker.db', fsync=True)
# 如果在子进程运行生产者,变量需要在主进程输入
# input_beans=input('How many beans? ')
@huey.task()
def my_task(num):
"""创建消费者需要执行的任务
这里是输出主进程输入豆子数量并执行return返回
不记录返回值,可用return None,并且生产者不用get
如果返回了值,必须用get输出返回值,这样数据库内的返回值才会清理
"""
print(f"输入有{num}个豆子")
return f"执行print:输入有{num}个豆子完毕"
def start_consumer():
"""创建消费者进程函数
写法解释:
https://www.wnark.com/archives/237.html
配置参数来源:
https://huey.readthedocs.io/en/latest/contrib.html
有两个配置没配:
1."flush_locks" 指定了是否需要清空锁
2."extra_locks" 可以用来指定额外的锁 该函数的作用是设置和协调工作进程和调度器的执行,并注册信号处理程序。
"""
config = ConsumerConfig(
workers=2, # 消费者数量,压测只允许一次压一种方案
worker_type='thread', # 消费者类型,使用多线程的方式调用多进程
initial_delay=1, # 最小的轮询间隔,与-d相同。
backoff=2, # 指数退避使用这个速率,-b.
max_delay=10, # 可能的最大轮询间隔, -m.
periodic=False, # 允许周期性任务,控制周期由多进程/多线程运行
check_worker_health=False, # 关闭工人健康检查。压测死了就死了。
health_check_interval=1, # 指定了健康检查的轮询间隔
)
# 调用 ConsumerConfig 实例对象的 validate 方法,用于验证部分关键参数的合法性。
config.validate()
# Huey 消费者配置
consumer = Consumer(huey, **config.values)
# 到这一步消费者就正在运行了
consumer.run()
# def start_producer():
# """创建多进程运行生产者进程函数
# """
# beans = input_beans
# # 启动生产者任务
# x = my_task(int(beans)).get(blocking=True)
# print(x)
def start_producer_main():
"""在主进程运行生产者进程函数
如果是输入,必须在主进程中输入,因为:标准输入流 (stdin) 在创建时未附加到 Process 进程。
由于多个进程从同一个流读取输入没有意义,默认情况下假定 stdin 将由主进程独占读取。
https://stackoverflow.com/questions/63893634
"""
input_beans=input('How many beans? ')
beans = input_beans
# 启动生产者任务
x = my_task(int(beans)).get(blocking=True)
print(x)
# 在单独的子进程启动消费者进程
consumer_process = Process(target=start_consumer)
consumer_process.start()
# 在单独子进程启动生产者进程
# producer_process = Process(target=start_producer)
# producer_process.start()
# 在主进程开始循环生产者发送命令
while True:
start_producer_main()
# 等待消费者进程和生产者进程结束
consumer_process.join()
# producer_process.join()
为什么
其实huey作者建议我只使用multiprocessing.Pool标准库来实现我的想法:
如果您希望所有内容都在单个进程中运行,您可以:
huey_consumer作为父进程中的子进程运行,或者 建议:根本不要使用 huey,因为它似乎不适合您的用例。
老实说,我不知道你为什么坚持在这里使用 Huey,因为它通常也意味着运行 Redis 服务器(尽管你可以只使用
Sqlite)。但即便如此,你可能最好只使用multiprocessing.Pool标准库,因为一切都发生在你的单一应用程序的上下文中
但基于以下几个原因,我最后还是将huey集成到我的软件中:
一开始我选择使用fastapi作为api server,因为它足够轻量。但是fastapi自带的后台任务组件Background Tasks不推荐用于重载场景,而且fastapi建议的Celery已经停止了对Windows平台的官方支持。
为了减少跨平台的兼容性问题,没使用redis,而是使用sqlite来存储消息队列数据。
通过比较,我发现 huey 可以在 Windows + Linux 上运行并支持 sqlite,所以我决定使用 huey。
对比
celery: 最新版本的celery不再维护支持Windows了
Python-RQ: 更简洁,可以支持sqlite,使用python的crontab添加任务,但不支持多进程同时运行,只能排队运行。 https://python-rq.org/docs/scheduling/
Huey: 配置项更多,可以支持sqlite,使用类似celery的方式添加任务。
Dramatiq:官方不支持sqlite作为存储后端,虽然理论上redis也够用。
版权属于:寒夜方舟
本文链接:https://www.wnark.com/archives/239.html
本站所有原创文章采用署名-非商业性使用 4.0 国际 (CC BY-NC 4.0)。 您可以自由地转载和修改,但请注明引用文章来源和不可用于商业目的。声明:本博客完全禁止任何商业类网站转载,包括但不限于CSDN,51CTO,百度文库,360DOC,AcFun,哔哩哔哩等网站。