Datapackage Pipelines Tutorial

This tutorial is built as a Jupyter notebook which allows you to run and modify the code inline and can be used as a starting point for new Datapackage Pipelines projects.

Installation

Follow the DataFlows Tutorial installation instructions.

Save this tutorial in curreny working directory (right-click and save on following link): https://raw.githubusercontent.com/frictionlessdata/datapackage-pipelines/master/TUTORIAL.ipynb

Start Jupyter Lab in the dataflows environment and open the datapackage pipelines tutorial notebook you downloaded.

Install datapackage-pipelines


In [1]:
%%sh
python3 -m pip install -qU datapackage-pipelines[seedup]

This installs datapackage-pipelines with speed optimizations, if you encounter problems installing it, remove the [speedup] suffix.

Verify you have the latest datapackage-pipelines version


In [6]:
%%sh
dpp version


Installed version: 2.0.0
Latest version: 2.0.0

Create a flow

Datapackage-pipelines uses the DataFlows library's Flow objects as the basic building blocks for larger pipeline systems.

It's recommended to follow the DataFlows Tutorial to get a better understanding of the DataFlows concepts which will be used here.

Run the following cell to create a file called countries_population_flow.py which scrapes a list of countries populations from wikipedia.

This flow is copied from the DataFlows tutorial, the processing function country_population is exactly the same, the flow and how we run it is changed to integrate with Datapackage Pipelines:


In [1]:
%%writefile countries_population_flow.py

# notice that we don't import any datapackage-pipelines modules
# all the flow code is written purely with the DataFlows library
from dataflows import Flow, dump_to_path, load, add_metadata, printer, update_resource
from xml.etree import ElementTree
from urllib.request import urlopen


# Generator flow step, copied from the DataFlows tutorial
# it just spews rows of data - in this case, countries populations scraped from Wikipedia
def country_population():
    # Read the Wikipedia page and parse it using etree
    page = urlopen('https://en.wikipedia.org/wiki/List_of_countries_and_dependencies_by_population').read()
    tree = ElementTree.fromstring(page)
    # Iterate on all tables, rows and cells
    for table in tree.findall('.//table'):
        if 'wikitable' in table.attrib.get('class', ''):
            for row in table.find('tbody').findall('tr'):
                cells = row.findall('td')
                if len(cells) > 3:
                    # If a matching row is found...
                    name = cells[1].find('.//a').attrib.get('title')
                    population = cells[2].text
                    # ... yield a row with the information
                    yield dict(
                        name=name,
                        population=population
                    )


# The main entrypoint for Datapackage Pipelines, each flow file should have a single flow function
def flow(*args):
    return Flow(
        country_population(),
        update_resource('res_1', **{
            # Set a proper name for the resource
            'name': 'countries_population',
            # Always set a path as well, even if you don't intend to save it to the filesystem
            'path': 'countries_population.csv',
            # dpp:streaming property is required to let Datapackage Pipelines know it should handle this resource
            'dpp:streaming': True,
        })
    )


# Entrypoint for running the flow directly, without Datapackage Pipelines
if __name__ == '__main__':
    # Add a printer step and run the flow
    Flow(flow(), printer(num_rows=1, tablefmt='html')).process()


Overwriting countries_population_flow.py

Run the flow:


In [3]:
%run countries_population_flow.py


countries_population

# name (string) population (string)
1 China 1,394,640,000
2 India 1,338,310,000
...
240Pitcairn Islands50

This is standard DataFlows library usage, now let's see what datapackage-pipelines provides

Create a pipeline spec

Datapackage-pipelines uses yaml files to to define pipelines of flow steps.

Create a spec to run the countries population flow and save to a path:


In [4]:
%%writefile pipeline-spec.yaml
countries-population:
  pipeline:
  - flow: countries_population_flow
  - run: dump.to_path
    parameters:
      out-path: data/countries_population


Overwriting pipeline-spec.yaml

Using dpp

dpp is the CLI interface to the datapackage-pipelines library. It is used to list and run available pipelines.

Let's list the available pipelines to see if our countries-population pipeline is available:


In [6]:
%%sh
dpp


Available Pipelines:
- ./countries-population 

Run the pipeline:


In [7]:
%%sh
dpp run ./countries-population


./countries-population: WAITING FOR OUTPUT

./countries-population: RUNNING, processed 100 rows

./countries-population: RUNNING, processed 200 rows

./countries-population: RUNNING, processed 240 rows

./countries-population: SUCCESS, processed 240 rows
INFO    :RESULTS:
INFO    :SUCCESS: ./countries-population {'bytes': 6425, 'count_of_rows': 240, 'dataset_name': '_', 'hash': '1b1585349acef8e155d112fe0cb4b3fc'}

Pipeline Dependencies

Let's add another pipeline which depends on the countries-population pipeline.

This time we will use just the pipeline spec yaml to write the pipeline, without any DataFlows code (although DataFlows library is used to implement the processors we are using here):


In [8]:
%%writefile pipeline-spec.yaml

countries-population:
  pipeline:
  - flow: countries_population_flow
  - run: dump.to_path
    parameters:
      out-path: data/countries_population

sorted_countries_by_name:
  dependencies:
  - pipeline: ./countries-population
  - datapackage: data/countries_population/datapackage.json
  pipeline:
  - run: load
    parameters:
      from: data/countries_population/datapackage.json
      resources: ['countries_population']
  - run: sort
    parameters:
      resources: ['countries_population']
      sort-by: '{name}'
  - run: dump.to_path
    parameters:
      out-path: data/sorted_countries_by_name


Overwriting pipeline-spec.yaml

Clear the pipelines state using dpp init and list the available pipelines:


In [10]:
%%sh
dpp init
dpp


Available Pipelines:
- ./countries-population (*)
- ./sorted_countries_by_name (*)(E)
	Dirty dependency: Cannot run until dependency is executed: ./countries-population

You can see that the new pipeline can't run until it's dependency is executed.

Let's run all the "Dirty" dependencies:


In [11]:
%%sh
dpp run --dirty all


./countries-population: WAITING FOR OUTPUT

./countries-population: RUNNING, processed 100 rows

./countries-population: RUNNING, processed 200 rows

./countries-population: RUNNING, processed 240 rows

./countries-population: SUCCESS, processed 240 rows

./countries-population: SUCCESS, processed 240 rows
./sorted_countries_by_name: WAITING FOR OUTPUT

./countries-population: SUCCESS, processed 240 rows
./sorted_countries_by_name: RUNNING, processed 100 rows

./countries-population: SUCCESS, processed 240 rows
./sorted_countries_by_name: RUNNING, processed 200 rows

./countries-population: SUCCESS, processed 240 rows
./sorted_countries_by_name: RUNNING, processed 240 rows

./countries-population: SUCCESS, processed 240 rows
./sorted_countries_by_name: SUCCESS, processed 240 rows
INFO    :RESULTS:
INFO    :SUCCESS: ./countries-population {'bytes': 6425, 'count_of_rows': 240, 'dataset_name': '_', 'hash': '1b1585349acef8e155d112fe0cb4b3fc'}
INFO    :SUCCESS: ./sorted_countries_by_name {'bytes': 6492, 'count_of_rows': 240, 'dataset_name': '_', 'hash': 'a63e74300bbe619d4a8efba26bc43688'}

Inspect the created datapackage


In [12]:
from dataflows import Flow, load, printer

Flow(
    load('data/sorted_countries_by_name/datapackage.json'),
    printer(num_rows=1, tablefmt='html')
).process()


countries_population

# name (string) population (string)
1 Abkhazia 240,705
2 Afghanistan31,575,018
...
240Zimbabwe 14,848,905
Out[12]:
(<datapackage.package.Package at 0x7f070f4db668>, {})

Pipeline processors

Datapackage Pipelines has a standard library of processors, like the sort processor used previously. These processors correspond to DataFlows standard library processors.

See the Datapackage Pipelines README for reference and usage examples.

An example showing usage of common processors:


In [16]:
%%writefile pipeline-spec.yaml

double-winners:
  pipeline:
  - run: load
    parameters:
      name: emmies
      from: https://raw.githubusercontent.com/datahq/dataflows/master/data/emmy.csv
  - run: load
    parameters:
      name: oscars
      from: https://raw.githubusercontent.com/datahq/dataflows/master/data/academy.csv
  - run: filter
    parameters:
      resources: ['emmies']
      in:
      - winner: 1
  - run: concatenate
    parameters:
      target: {'name': 'emmies_filtered'}
      resources: ['emmies']
      fields:
        emmy_nominee: ['nominee']
  - run: join
    parameters:
      source:
        name: 'emmies_filtered'
        key: ['emmy_nominee']
        delete: true
      target:
        name: 'oscars'
        key: ['Name']
      fields: {}
      full: false
  - run: filter
    parameters:
      in:
      - Winner: "1"
  - run: dump.to_path
    parameters:
      out-path: data/double_winners


Overwriting pipeline-spec.yaml

Run the pipeline:


In [17]:
%%sh
dpp run ./double-winners


./double-winners: WAITING FOR OUTPUT

./double-winners: RUNNING, processed 98 rows

./double-winners: SUCCESS, processed 98 rows
INFO    :RESULTS:
INFO    :SUCCESS: ./double-winners {'bytes': 6766, 'count_of_rows': 98, 'dataset_name': '_', 'hash': 'bc61b69dc87b0da0348049802c616d95'}

Print the datapackage:


In [18]:
from dataflows import Flow, printer, load
Flow(load('data/double_winners/datapackage.json'), printer(tablefmt='html', num_rows=1)).process()


oscars

# Year (string) Ceremony (integer)Award (string) Winner (string)Name (string) Film (string)
1 1931/1932 5Actress 1Helen Hayes The Sin of Madelon Claudet
2 1932/1933 6Actress 1Katharine HepburnMorning Glory
...
98 2015 88Honorary Award1Gena Rowlands
Out[18]:
(<datapackage.package.Package at 0x7f0724018940>, {})

Pipelines Server

Running pipelines on your laptop is fine for many use-cases but sometimes you want to run pipelines in a more reproducible, scalable and automatic fashion.

The Datapackage Pipelines Server is a Docker image which provides the core functionality to achieve this.

To start a local pipelines server for development, you will need to install Docker for Windows, Mac or Linux

Pull the datapackage-pipelines image:


In [120]:
%%sh
docker pull frictionlessdata/datapackage-pipelines


Using default tag: latest
latest: Pulling from frictionlessdata/datapackage-pipelines
Digest: sha256:50fd5b40523146af0e46275f836357bf27097c1d9c83726b03da884e56d385bb
Status: Image is up to date for frictionlessdata/datapackage-pipelines:latest

Start a local pipelines server, mounting the current working directory into the container:


In [31]:
%%sh
docker run -d --name dpp -v `pwd`:/pipelines:rw -p 5000:5000 frictionlessdata/datapackage-pipelines server


33254bf2b410b28d20cb7d4989144d49488b9b9eea1af7dfe3861d97a58216d6

After a few seconds, the pipelines dashboad should be available at http://localhost:5000

New / modified pipelines and dirty dependencies are executed by the pipelines server automatically.

The server also supports scheduled pipelines for periodical execution.

Let's see this in action:


In [32]:
%%writefile pipeline-spec.yaml

countries-population:
  schedule:
    # minute hour day_of_week day_of_month month_of_year
    crontab: '* * * * *'
  pipeline:
  - flow: countries_population_flow
  - run: dump.to_path
    parameters:
      out-path: data/countries_population

sorted_countries_by_name:
  dependencies:
  - pipeline: ./countries-population
  - datapackage: data/countries_population/datapackage.json
  pipeline:
  - run: load
    parameters:
      from: data/countries_population/datapackage.json
      resources: ['countries_population']
  - run: sort
    parameters:
      resources: ['countries_population']
      sort-by: '{name}'
  - run: dump.to_path
    parameters:
      out-path: data/sorted_countries_by_name


Overwriting pipeline-spec.yaml

Inspect the Pipelines server logs and wait for Update Pipelines task to complete and pipelines to start running


In [56]:
%%sh
docker logs dpp --tail 5


[2018-10-16 13:32:00,346: INFO/ForkPoolWorker-1(86)] Update Pipelines (scheduled)
[2018-10-16 13:32:00,353: INFO/ForkPoolWorker-1(86)] 4017b8d9 QUEUEING SCHEDULED task ./countries-population
[2018-10-16 13:32:00,356: INFO/MainProcess(38)] Received task: datapackage_pipelines.celery_tasks.celery_tasks.execute_pipeline_task[9c70f3a8-f598-4232-9c59-5ced466a3ae2]  
[2018-10-16 13:32:00,357: INFO/ForkPoolWorker-2(87)] 4017b8d9 RUNNING ./countries-population
[2018-10-16 13:32:00,588: INFO/ForkPoolWorker-1(86)] Task datapackage_pipelines.celery_tasks.celery_tasks.update_pipelines[064cd648-04c0-43ed-9b11-2ec476b8d693] succeeded in 0.24168486997950822s: None

Refresh the dashboard to see the new pipelines and execution logs: http://localhost:5000

Next Steps


In [ ]: