In [43]:
from celery import Celery
from time import sleep
celery = Celery()
celery.config_from_object({
'BROKER_URL': 'amqp://localhost',
'CELERY_RESULT_BACKEND': 'amqp://',
'CELERYD_POOL_RESTARTS': True, # Required for /worker/pool/restart API
})
@celery.task
def add(x, y):
return x + y
@celery.task
def sub(x, y):
sleep(30) # Simulate work
return x - y
In [2]:
# Done once for the whole docs
import requests, json
api_root = 'http://localhost:5555/api'
task_api = '{}/task'.format(api_root)
In [6]:
args = {'args': [1, 2]}
url = '{}/async-apply/tasks.add'.format(task_api)
print(url)
resp = requests.post(url, data=json.dumps(args))
reply = resp.json()
reply
Out[6]:
We can see that we created a new task and it's pending. Note that the API is async, meaning it won't wait until the task finish.
In [5]:
url = '{}/result/{}'.format(task_api, reply['task-id'])
print(url)
resp = requests.get(url)
resp.json()
Out[5]:
In [7]:
# Run a task
args = {'args': [1, 2]}
resp = requests.post('{}/async-apply/tasks.sub'.format(task_api), data=json.dumps(args))
reply = resp.json()
# Now revoke it
url = '{}/revoke/{}'.format(task_api, reply['task-id'])
print(url)
resp = requests.post(url, data='terminate=True')
resp.json()
Out[7]:
Update rate limit for a task.
In [20]:
worker = 'miki-manjaro' # You'll need to get the worker name from the worker API (seel below)
url = '{}/rate-limit/{}'.format(task_api, worker)
print(url)
resp = requests.post(url, params={'taskname': 'tasks.add', 'ratelimit': '10'})
resp.json()
Out[20]:
In [22]:
url = '{}/timeout/{}'.format(task_api, worker)
print(url)
resp = requests.post(url, params={'taskname': 'tasks.add', 'hard': '3.14', 'soft': '3'}) # You can omit soft or hard
resp.json()
Out[22]:
In [55]:
# Once for the documentation
worker_api = '{}/worker'.format(api_root)
In [25]:
url = '{}/workers'.format(api_root) # Only one not under /worker
print(url)
resp = requests.get(url)
workers = resp.json()
workers
Out[25]:
In [30]:
worker = workers.keys()[0]
url = '{}/shutdown/{}'.format(worker_api, worker)
print(url)
resp = requests.post(url)
resp.json()
Out[30]:
Restart a worker pool, you need to have CELERYD_POOL_RESTARTS enabled in your configuration).
In [43]:
pool_api = '{}/pool'.format(worker_api)
url = '{}/restart/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url)
resp.json()
Out[43]:
In [53]:
url = '{}/grow/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'n': '10'})
resp.json()
Out[53]:
In [54]:
url = '{}/shrink/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'n': '3'})
resp.json()
Out[54]:
Autoscale a pool.
In [58]:
url = '{}/autoscale/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'min': '3', 'max': '10'})
resp.json()
Out[58]:
Add a consumer to a queue.
In [62]:
queue_api = '{}/queue'.format(worker_api)
url = '{}/add-consumer/{}'.format(queue_api, worker)
print(url)
resp = requests.post(url, params={'queue': 'jokes'})
resp.json()
Out[62]:
Cancel a consumer queue.
In [63]:
url = '{}/cancel-consumer/{}'.format(queue_api, worker)
print(url)
resp = requests.post(url, params={'queue': 'jokes'})
resp.json()
Out[63]:
In [ ]: