python - Celery: how to limit number of tasks in queue and stop feeding when full? -


i new celery , here question have:

suppose have script supposed fetch new data db , send workers using celery.

tasks.py

# celery task celery import celery  app = celery('tasks', broker='amqp://guest@localhost//')  @app.task def process_data(x):     # x     pass 

fetch_db.py

# fetch new data db , dispatch workers. tasks import process_data  while true:     # run db query here fetch new data db fetched_data      process_data.delay(fetched_data)      sleep(30); 

here concern: data being fetched every 30 seconds. process_data() function take longer , depending on amount of workers (especially if few) queue might throttled understand.

  1. i cannot increase number of workers.
  2. i can modify code refrain feeding queue when full.

the question how set queue size , how know full? in general, how deal situation?

you can set rabbitmq x-max-length in queue predeclare using kombu

example :

import time celery import celery kombu import queue, exchange  class config(object):     broker_url = "amqp://guest@localhost//"      celery_queues = (         queue(             'important',             exchange=exchange('important'),             routing_key="important",             queue_arguments={'x-max-length': 10}         ),     )  app = celery('tasks') app.config_from_object(config)   @app.task(queue='important') def process_data(x):     pass 

or using policies

rabbitmqctl set_policy ten "^one-meg$" '{"max-length-bytes":1000000}' --apply-to queues 

Comments

Popular posts from this blog

sublimetext3 - what keyboard shortcut is to comment/uncomment for this script tag in sublime -

java - No use of nillable="0" in SOAP Webservice -

ubuntu - Laravel 5.2 quickstart guide gives Not Found Error -