In [1]:
from celery import Celery

# linux:~> celery -A workflow_sample worker --loglevel=info
app = Celery('workflow_sample', broker='redis://localhost:6379/0')


# Define a couple of basic tasks.

# "workflow" Task 1

def add(obj, eng):
    obj["value"] += 2

# "workflow" Task 2

def print_res(obj, eng):
    print(obj.get("value"))

# Create a workflow out of them.

workflow_tasks = [add, print_res]


# "celery" Task

# Mark our execution process as a celery task with this decorator.
@app.task
def run_workflow(data):

    # Note that the imports that this function requires must be done inside
    # it since our code will not be running in the global context.

    from workflow.engine import GenericWorkflowEngine

    wfe = GenericWorkflowEngine()
    
    wfe.setWorkflow(workflow_tasks)
    
    wfe.process(data)


# linux:~> python workflow_sample.py

# Code that runs when we call this script directly. This way we can start
# as many workflows as we wish and let celery handle how they are
# distributed and when they run.

if __name__ == "__main__":
    run_workflow.delay([
                        {"value": 10}, 
                        {"value": 20}, 
                        {"value": 30}
                       ]
                      )

In [2]:
!pwd && ls


/var/tmp/python3/notebooks/pylibs/workflow/examples/celery_workflow
__pycache__		     workflow_sample.py
workflow_and_celery_1.ipynb  workflow_sample_worker.txt

Worker

celery -A workflow_sample worker --loglevel=info &

Run the Task

python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 
python workflow_sample.py 

Analyzing the log files from worker


In [3]:
!gvim workflow_sample_worker.txt
# :v/\v(TaskPool)/d

In [4]:
!grep "TaskPool" workflow_sample_worker.txt


[2017-02-08 15:10:42,471: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '263146a6-38d4-46e5-975f-51e2e66c1321', {'id': '263146a6-38d4-46e5-975f-51e2e66c1321', 'eta': None, 'kwargsrepr': '{}', 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'timelimit': [None, None], 'reply_to': '274ff7bf-f248-3fe4-953b-1b32173c0055', 'root_id': '263146a6-38d4-46e5-975f-51e2e66c1321', 'retries': 0, 'expires': None, 'lang': 'py', 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '263146a6-38d4-46e5-975f-51e2e66c1321', 'origin': 'gen20077@goldbeef.anim.odw.com.cn', 'group': None, 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"callbacks": null, "chain": null, "chord": null, "errbacks": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:01,597: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', 'f05ca2d8-e7cf-4aff-bb57-b8bfa71214f4', {'id': 'f05ca2d8-e7cf-4aff-bb57-b8bfa71214f4', 'expires': None, 'kwargsrepr': '{}', 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'timelimit': [None, None], 'reply_to': '08ec6981-e5e8-33dd-b221-0cba2c53689a', 'root_id': 'f05ca2d8-e7cf-4aff-bb57-b8bfa71214f4', 'retries': 0, 'eta': None, 'lang': 'py', 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': 'f05ca2d8-e7cf-4aff-bb57-b8bfa71214f4', 'origin': 'gen20095@goldbeef.anim.odw.com.cn', 'group': None, 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"callbacks": null, "chain": null, "chord": null, "errbacks": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:14,586: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '4e0ae8ab-5d47-4a9e-8d54-4cb89399afb0', {'id': '4e0ae8ab-5d47-4a9e-8d54-4cb89399afb0', 'expires': None, 'group': None, 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'lang': 'py', 'reply_to': 'eb33a9d6-2f76-3dff-b1e9-e72309b1068b', 'root_id': '4e0ae8ab-5d47-4a9e-8d54-4cb89399afb0', 'retries': 0, 'eta': None, 'timelimit': [None, None], 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '4e0ae8ab-5d47-4a9e-8d54-4cb89399afb0', 'origin': 'gen20112@goldbeef.anim.odw.com.cn', 'kwargsrepr': '{}', 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"chord": null, "chain": null, "errbacks": null, "callbacks": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:18,585: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '23594755-7d69-4fa1-85be-3020a658bbc4', {'id': '23594755-7d69-4fa1-85be-3020a658bbc4', 'expires': None, 'kwargsrepr': '{}', 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'timelimit': [None, None], 'reply_to': 'a6e207cf-7acc-3609-8364-e8afb1356822', 'root_id': '23594755-7d69-4fa1-85be-3020a658bbc4', 'retries': 0, 'eta': None, 'lang': 'py', 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '23594755-7d69-4fa1-85be-3020a658bbc4', 'origin': 'gen20124@goldbeef.anim.odw.com.cn', 'group': None, 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"errbacks": null, "callbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:24,623: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '3c380deb-3a51-4850-ad68-a003f8bd2784', {'id': '3c380deb-3a51-4850-ad68-a003f8bd2784', 'expires': None, 'kwargsrepr': '{}', 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'lang': 'py', 'reply_to': '00e4704c-6426-3b2a-a51d-4317c139afbd', 'root_id': '3c380deb-3a51-4850-ad68-a003f8bd2784', 'retries': 0, 'eta': None, 'timelimit': [None, None], 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '3c380deb-3a51-4850-ad68-a003f8bd2784', 'origin': 'gen20135@goldbeef.anim.odw.com.cn', 'group': None, 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"errbacks": null, "callbacks": null, "chord": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:26,011: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '205701ca-bd70-4e29-81ef-2716bb6d1b4c', {'id': '205701ca-bd70-4e29-81ef-2716bb6d1b4c', 'expires': None, 'kwargsrepr': '{}', 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'lang': 'py', 'reply_to': 'cf485662-4ee0-3412-b798-69bbba40d35e', 'root_id': '205701ca-bd70-4e29-81ef-2716bb6d1b4c', 'retries': 0, 'eta': None, 'timelimit': [None, None], 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '205701ca-bd70-4e29-81ef-2716bb6d1b4c', 'origin': 'gen20145@goldbeef.anim.odw.com.cn', 'group': None, 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"callbacks": null, "chain": null, "chord": null, "errbacks": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:27,266: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '59438c33-f4c3-4b4b-a60c-06b50446aa2e', {'id': '59438c33-f4c3-4b4b-a60c-06b50446aa2e', 'expires': None, 'group': None, 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'lang': 'py', 'reply_to': '6b8c287c-7e31-32af-bed7-4db6af06cffc', 'root_id': '59438c33-f4c3-4b4b-a60c-06b50446aa2e', 'retries': 0, 'eta': None, 'timelimit': [None, None], 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '59438c33-f4c3-4b4b-a60c-06b50446aa2e', 'origin': 'gen20155@goldbeef.anim.odw.com.cn', 'kwargsrepr': '{}', 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"callbacks": null, "chord": null, "errbacks": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:28,408: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '84ea61a9-49fe-48d3-8cd5-ec1efba2227a', {'id': '84ea61a9-49fe-48d3-8cd5-ec1efba2227a', 'eta': None, 'group': None, 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'lang': 'py', 'reply_to': '84582448-6ebd-351a-8efb-c20075200a38', 'root_id': '84ea61a9-49fe-48d3-8cd5-ec1efba2227a', 'retries': 0, 'expires': None, 'timelimit': [None, None], 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '84ea61a9-49fe-48d3-8cd5-ec1efba2227a', 'origin': 'gen20166@goldbeef.anim.odw.com.cn', 'kwargsrepr': '{}', 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"chord": null, "errbacks": null, "callbacks": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:29,580: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', 'd30e007d-7613-4385-b54b-2331f85c82fe', {'id': 'd30e007d-7613-4385-b54b-2331f85c82fe', 'eta': None, 'group': None, 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'lang': 'py', 'reply_to': '62a8dfce-76a8-3369-8af4-f9e4eb38bd36', 'root_id': 'd30e007d-7613-4385-b54b-2331f85c82fe', 'retries': 0, 'expires': None, 'timelimit': [None, None], 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': 'd30e007d-7613-4385-b54b-2331f85c82fe', 'origin': 'gen20176@goldbeef.anim.odw.com.cn', 'kwargsrepr': '{}', 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"errbacks": null, "chord": null, "callbacks": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:30,699: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', '3faab6c9-dab8-423f-8c6d-ece0daee80b3', {'id': '3faab6c9-dab8-423f-8c6d-ece0daee80b3', 'eta': None, 'kwargsrepr': '{}', 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'timelimit': [None, None], 'reply_to': '5cfeae2b-f616-30d0-b29a-a72318861314', 'root_id': '3faab6c9-dab8-423f-8c6d-ece0daee80b3', 'retries': 0, 'expires': None, 'lang': 'py', 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': '3faab6c9-dab8-423f-8c6d-ece0daee80b3', 'origin': 'gen20188@goldbeef.anim.odw.com.cn', 'group': None, 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"callbacks": null, "errbacks": null, "chord": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{})
[2017-02-08 15:11:31,833: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7f34d426b6a8> (args:('workflow_sample.run_workflow', 'e790b947-1b05-4a5c-9c12-b76808d90099', {'id': 'e790b947-1b05-4a5c-9c12-b76808d90099', 'eta': None, 'group': None, 'argsrepr': "([{'value': 10}, {'value': 20}, {'value': 30}],)", 'lang': 'py', 'reply_to': '9e714d2f-a115-38c9-a152-6757492ac4a2', 'root_id': 'e790b947-1b05-4a5c-9c12-b76808d90099', 'retries': 0, 'expires': None, 'timelimit': [None, None], 'parent_id': None, 'delivery_info': {'priority': 0, 'routing_key': 'celery', 'redelivered': None, 'exchange': ''}, 'correlation_id': 'e790b947-1b05-4a5c-9c12-b76808d90099', 'origin': 'gen20198@goldbeef.anim.odw.com.cn', 'kwargsrepr': '{}', 'task': 'workflow_sample.run_workflow'}, b'[[[{"value": 10}, {"value": 20}, {"value": 30}]], {}, {"callbacks": null, "chain": null, "chord": null, "errbacks": null}]', 'application/json', 'utf-8') kwargs:{})

In [5]:
!grep "TaskPool" workflow_sample_worker.txt | wc -l


11