Follow the DataFlows Tutorial installation instructions.
Save this tutorial in curreny working directory (right-click and save on following link): https://raw.githubusercontent.com/frictionlessdata/datapackage-pipelines/master/TUTORIAL.ipynb
Start Jupyter Lab in the dataflows environment and open the datapackage pipelines tutorial notebook you downloaded.
In [1]:
%%sh
python3 -m pip install -qU datapackage-pipelines[seedup]
This installs datapackage-pipelines with speed optimizations, if you encounter problems installing it, remove the [speedup]
suffix.
Verify you have the latest datapackage-pipelines version
In [6]:
%%sh
dpp version
Datapackage-pipelines uses the DataFlows library's Flow objects as the basic building blocks for larger pipeline systems.
It's recommended to follow the DataFlows Tutorial to get a better understanding of the DataFlows concepts which will be used here.
Run the following cell to create a file called countries_population_flow.py
which scrapes a list of countries populations from wikipedia.
This flow is copied from the DataFlows tutorial, the processing function country_population
is exactly the same, the flow and how we run it is changed to integrate with Datapackage Pipelines:
In [1]:
%%writefile countries_population_flow.py
# notice that we don't import any datapackage-pipelines modules
# all the flow code is written purely with the DataFlows library
from dataflows import Flow, dump_to_path, load, add_metadata, printer, update_resource
from xml.etree import ElementTree
from urllib.request import urlopen
# Generator flow step, copied from the DataFlows tutorial
# it just spews rows of data - in this case, countries populations scraped from Wikipedia
def country_population():
# Read the Wikipedia page and parse it using etree
page = urlopen('https://en.wikipedia.org/wiki/List_of_countries_and_dependencies_by_population').read()
tree = ElementTree.fromstring(page)
# Iterate on all tables, rows and cells
for table in tree.findall('.//table'):
if 'wikitable' in table.attrib.get('class', ''):
for row in table.find('tbody').findall('tr'):
cells = row.findall('td')
if len(cells) > 3:
# If a matching row is found...
name = cells[1].find('.//a').attrib.get('title')
population = cells[2].text
# ... yield a row with the information
yield dict(
name=name,
population=population
)
# The main entrypoint for Datapackage Pipelines, each flow file should have a single flow function
def flow(*args):
return Flow(
country_population(),
update_resource('res_1', **{
# Set a proper name for the resource
'name': 'countries_population',
# Always set a path as well, even if you don't intend to save it to the filesystem
'path': 'countries_population.csv',
# dpp:streaming property is required to let Datapackage Pipelines know it should handle this resource
'dpp:streaming': True,
})
)
# Entrypoint for running the flow directly, without Datapackage Pipelines
if __name__ == '__main__':
# Add a printer step and run the flow
Flow(flow(), printer(num_rows=1, tablefmt='html')).process()
Run the flow:
In [3]:
%run countries_population_flow.py
This is standard DataFlows library usage, now let's see what datapackage-pipelines provides
In [4]:
%%writefile pipeline-spec.yaml
countries-population:
pipeline:
- flow: countries_population_flow
- run: dump.to_path
parameters:
out-path: data/countries_population
In [6]:
%%sh
dpp
Run the pipeline:
In [7]:
%%sh
dpp run ./countries-population
In [8]:
%%writefile pipeline-spec.yaml
countries-population:
pipeline:
- flow: countries_population_flow
- run: dump.to_path
parameters:
out-path: data/countries_population
sorted_countries_by_name:
dependencies:
- pipeline: ./countries-population
- datapackage: data/countries_population/datapackage.json
pipeline:
- run: load
parameters:
from: data/countries_population/datapackage.json
resources: ['countries_population']
- run: sort
parameters:
resources: ['countries_population']
sort-by: '{name}'
- run: dump.to_path
parameters:
out-path: data/sorted_countries_by_name
Clear the pipelines state using dpp init
and list the available pipelines:
In [10]:
%%sh
dpp init
dpp
You can see that the new pipeline can't run until it's dependency is executed.
Let's run all the "Dirty" dependencies:
In [11]:
%%sh
dpp run --dirty all
Inspect the created datapackage
In [12]:
from dataflows import Flow, load, printer
Flow(
load('data/sorted_countries_by_name/datapackage.json'),
printer(num_rows=1, tablefmt='html')
).process()
Out[12]:
Datapackage Pipelines has a standard library of processors, like the sort
processor used previously. These processors correspond to DataFlows standard library processors.
See the Datapackage Pipelines README for reference and usage examples.
An example showing usage of common processors:
In [16]:
%%writefile pipeline-spec.yaml
double-winners:
pipeline:
- run: load
parameters:
name: emmies
from: https://raw.githubusercontent.com/datahq/dataflows/master/data/emmy.csv
- run: load
parameters:
name: oscars
from: https://raw.githubusercontent.com/datahq/dataflows/master/data/academy.csv
- run: filter
parameters:
resources: ['emmies']
in:
- winner: 1
- run: concatenate
parameters:
target: {'name': 'emmies_filtered'}
resources: ['emmies']
fields:
emmy_nominee: ['nominee']
- run: join
parameters:
source:
name: 'emmies_filtered'
key: ['emmy_nominee']
delete: true
target:
name: 'oscars'
key: ['Name']
fields: {}
full: false
- run: filter
parameters:
in:
- Winner: "1"
- run: dump.to_path
parameters:
out-path: data/double_winners
Run the pipeline:
In [17]:
%%sh
dpp run ./double-winners
Print the datapackage:
In [18]:
from dataflows import Flow, printer, load
Flow(load('data/double_winners/datapackage.json'), printer(tablefmt='html', num_rows=1)).process()
Out[18]:
Running pipelines on your laptop is fine for many use-cases but sometimes you want to run pipelines in a more reproducible, scalable and automatic fashion.
The Datapackage Pipelines Server is a Docker image which provides the core functionality to achieve this.
To start a local pipelines server for development, you will need to install Docker for Windows, Mac or Linux
Pull the datapackage-pipelines image:
In [120]:
%%sh
docker pull frictionlessdata/datapackage-pipelines
Start a local pipelines server, mounting the current working directory into the container:
In [31]:
%%sh
docker run -d --name dpp -v `pwd`:/pipelines:rw -p 5000:5000 frictionlessdata/datapackage-pipelines server
After a few seconds, the pipelines dashboad should be available at http://localhost:5000
New / modified pipelines and dirty dependencies are executed by the pipelines server automatically.
The server also supports scheduled pipelines for periodical execution.
Let's see this in action:
In [32]:
%%writefile pipeline-spec.yaml
countries-population:
schedule:
# minute hour day_of_week day_of_month month_of_year
crontab: '* * * * *'
pipeline:
- flow: countries_population_flow
- run: dump.to_path
parameters:
out-path: data/countries_population
sorted_countries_by_name:
dependencies:
- pipeline: ./countries-population
- datapackage: data/countries_population/datapackage.json
pipeline:
- run: load
parameters:
from: data/countries_population/datapackage.json
resources: ['countries_population']
- run: sort
parameters:
resources: ['countries_population']
sort-by: '{name}'
- run: dump.to_path
parameters:
out-path: data/sorted_countries_by_name
Inspect the Pipelines server logs and wait for Update Pipelines
task to complete and pipelines to start running
In [56]:
%%sh
docker logs dpp --tail 5
Refresh the dashboard to see the new pipelines and execution logs: http://localhost:5000
In [ ]: