python - Celery: how to limit number of tasks in queue and stop feeding when full? -
i new celery , here question have:
suppose have script supposed fetch new data db , send workers using celery.
tasks.py
# celery task celery import celery app = celery('tasks', broker='amqp://guest@localhost//') @app.task def process_data(x): # x pass
fetch_db.py
# fetch new data db , dispatch workers. tasks import process_data while true: # run db query here fetch new data db fetched_data process_data.delay(fetched_data) sleep(30);
here concern: data being fetched every 30 seconds. process_data() function take longer , depending on amount of workers (especially if few) queue might throttled understand.
- i cannot increase number of workers.
- i can modify code refrain feeding queue when full.
the question how set queue size , how know full? in general, how deal situation?
you can set rabbitmq x-max-length
in queue predeclare using kombu
example :
import time celery import celery kombu import queue, exchange class config(object): broker_url = "amqp://guest@localhost//" celery_queues = ( queue( 'important', exchange=exchange('important'), routing_key="important", queue_arguments={'x-max-length': 10} ), ) app = celery('tasks') app.config_from_object(config) @app.task(queue='important') def process_data(x): pass
or using policies
rabbitmqctl set_policy ten "^one-meg$" '{"max-length-bytes":1000000}' --apply-to queues
Comments
Post a Comment