In [43]:
from celery import Celery
from time import sleep
celery = Celery()
celery.config_from_object({
'BROKER_URL': 'amqp://localhost',
'CELERY_RESULT_BACKEND': 'amqp://',
'CELERYD_POOL_RESTARTS': True, # Required for /worker/pool/restart API
})
@celery.task
def add(x, y):
return x + y
@celery.task
def sub(x, y):
sleep(30) # Simulate work
return x - y
In [3]:
# Done once for the whole docs
import requests, json
api_root = 'http://localhost:5555/api'
task_api = '{}/task'.format(api_root)
In [6]:
args = {'args': [1, 2]}
url = '{}/async-apply/tasks.add'.format(task_api)
print(url)
resp = requests.post(url, data=json.dumps(args))
reply = resp.json()
reply
Out[6]:
We can see that we created a new task and it's pending. Note that the API is async, meaning it won't wait until the task finish.
For create task and wait results you can use 'apply' API.
In [7]:
args = {'args': [1, 2]}
url = '{}/apply/tasks.add'.format(task_api)
print(url)
resp = requests.post(url, data=json.dumps(args))
reply = resp.json()
reply
Out[7]:
In [5]:
url = '{}/result/{}'.format(task_api, reply['task-id'])
print(url)
resp = requests.get(url)
resp.json()
Out[5]:
In [7]:
# Run a task
args = {'args': [1, 2]}
resp = requests.post('{}/async-apply/tasks.sub'.format(task_api), data=json.dumps(args))
reply = resp.json()
# Now revoke it
url = '{}/revoke/{}'.format(task_api, reply['task-id'])
print(url)
resp = requests.post(url, data='terminate=True')
resp.json()
Out[7]:
Update rate limit for a task.
In [20]:
worker = 'miki-manjaro' # You'll need to get the worker name from the worker API (seel below)
url = '{}/rate-limit/{}'.format(task_api, worker)
print(url)
resp = requests.post(url, params={'taskname': 'tasks.add', 'ratelimit': '10'})
resp.json()
Out[20]:
In [22]:
url = '{}/timeout/{}'.format(task_api, worker)
print(url)
resp = requests.post(url, params={'taskname': 'tasks.add', 'hard': '3.14', 'soft': '3'}) # You can omit soft or hard
resp.json()
Out[22]:
In [55]:
# Once for the documentation
worker_api = '{}/worker'.format(api_root)
In [25]:
url = '{}/workers'.format(api_root) # Only one not under /worker
print(url)
resp = requests.get(url)
workers = resp.json()
workers
Out[25]:
In [30]:
worker = workers.keys()[0]
url = '{}/shutdown/{}'.format(worker_api, worker)
print(url)
resp = requests.post(url)
resp.json()
Out[30]:
Restart a worker pool, you need to have CELERYD_POOL_RESTARTS enabled in your configuration).
In [43]:
pool_api = '{}/pool'.format(worker_api)
url = '{}/restart/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url)
resp.json()
Out[43]:
In [53]:
url = '{}/grow/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'n': '10'})
resp.json()
Out[53]:
In [54]:
url = '{}/shrink/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'n': '3'})
resp.json()
Out[54]:
Autoscale a pool.
In [58]:
url = '{}/autoscale/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'min': '3', 'max': '10'})
resp.json()
Out[58]:
Add a consumer to a queue.
In [62]:
queue_api = '{}/queue'.format(worker_api)
url = '{}/add-consumer/{}'.format(queue_api, worker)
print(url)
resp = requests.post(url, params={'queue': 'jokes'})
resp.json()
Out[62]:
Cancel a consumer queue.
In [63]:
url = '{}/cancel-consumer/{}'.format(queue_api, worker)
print(url)
resp = requests.post(url, params={'queue': 'jokes'})
resp.json()
Out[63]:
In [7]:
url = '{}/queues/length'.format(api_root)
print(url)
resp = requests.get(url)
resp.json()
Out[7]: