Environment Setup


In [4]:
from os import path as p, chdir

if 'examples' in p.abspath('.'):
    chdir('..')

Examples

Fetch a webpage

In this example, we fetch the title of a webpage.


In [5]:
from riko.modules.fetchpage import pipe

url = 'https://news.ycombinator.com/'
next(pipe(conf={'url': url, 'start': '<title>', 'end': '</title>'}))


Out[5]:
{'content': b'Hacker News'}

Fetch a webpage using an xpath

Here, we fetch the the first hackernews story link using an xpath.


In [6]:
from riko.modules.xpathfetchpage import pipe

xpath = '/html/body/center/table/tr[3]/td/table/tr[1]/td[3]/a'
next(pipe(conf={'url': 'https://news.ycombinator.com/', 'xpath': xpath}))


Out[6]:
{'class': 'storylink',
 'content': 'Quintuple: a Python 5-qubit quantum computer simulator',
 'href': 'https://arxiv.org/abs/1606.09225'}

Word Count

Here, we use several pipes to count the number of words on a webpage.


In [8]:
### Create a SyncPipe flow ###
#
# `SyncPipe` is a convenience class that creates chainable flows
# and allows for parallel processing.
from riko.collections import SyncPipe

### Set the pipe configurations ###
#
# Notes:
#   1. the `detag` option will strip all html tags from the result
#   2. fetch the text contained inside the 'body' tag of the hackernews homepage
#   3. replace newlines with spaces and assign the result to 'content'
#   4. tokenize the resulting text using whitespace as the delimeter
#   5. count the number of times each token appears
#   6. obtain the raw stream
#   7. extract the first word and its count
url = 'https://news.ycombinator.com/'
fetch_conf = {'url': url, 'start': '<body>', 'end': '</body>', 'detag': True}  # 1
replace_conf = {'rule': [{'find': '\r\n', 'replace': ' '}, {'find': '\n', 'replace': ' '}]}

flow = (
    SyncPipe('fetchpage', conf=fetch_conf)                                     # 2
        .strreplace(conf=replace_conf, assign='content')                       # 3
        .tokenizer(conf={'delimiter': ' '}, emit=True)                   # 4
        .count(conf={'count_key': 'content'}))                                 # 5

stream = flow.output                                                           # 6 
next(stream)                                                                   # 7


Out[8]:
{"'sad": 1}

Fetching feeds

riko can fetch rss feeds from both local and remote filepaths via "source" pipes. Each "source" pipe returns a stream, i.e., an iterator of dictionaries, aka items.


In [8]:
from riko.modules.fetch import pipe

### Fetch an RSS feed ###
stream = pipe(conf={'url': 'https://news.ycombinator.com/rss'})
item = next(stream)
item['title'], item['link'], item['comments']


Out[8]:
('Quintuple: a Python 5-qubit quantum computer simulator',
 'https://arxiv.org/abs/1606.09225',
 'https://news.ycombinator.com/item?id=12106163')

In [9]:
from riko.modules.fetchsitefeed import pipe 

### Fetch the first RSS feed found ###
#
# Note: regardless of how you fetch an RSS feed, it will have the same
# structure
stream = pipe(conf={'url': 'http://arstechnica.com/rss-feeds/'})
item = next(stream)
item.keys()


Out[9]:
dict_keys(['title_detail', 'author.uri', 'tags', 'summary_detail', 'author_detail', 'author.name', 'y:published', 'y:title', 'content', 'title', 'pubDate', 'guidislink', 'id', 'summary', 'dc:creator', 'authors', 'published_parsed', 'links', 'y:id', 'author', 'link', 'published'])

In [10]:
item['title'], item['author'], item['id']


Out[10]:
('Gravity doesn’t care about quantum spin',
 'Chris Lee',
 'http://arstechnica.com/?p=924009')

Please see the FAQ for a complete list of supported file types and protocols. Please see Fetching data and feeds for more examples.

Synchronous processing

riko can modify streams via the 40 built-in pipes


In [11]:
from riko.collections import SyncPipe

### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}

### Create a SyncPipe flow ###
#
# `SyncPipe` is a convenience class that creates chainable flows
# and allows for parallel processing.
#
# The following flow will:
#   1. fetch the hackernews RSS feed
#   2. filter for items with '.com' in the link
#   3. sort the items ascending by title
#   4. fetch the first comment from each item
#   5. flatten the result into one raw stream
#   6. extract the first item's content
#
# Note: sorting is not lazy so take caution when using this pipe

flow = (
    SyncPipe('fetch', conf=fetch_conf)               # 1
        .filter(conf={'rule': filter_rule})          # 2
        .sort(conf={'rule': {'sort_key': 'title'}})  # 3
        .xpathfetchpage(conf=xpath_conf))            # 4

stream = flow.output                                 # 5
next(stream)['content']                              # 6


Out[11]:
'Open Artificial Pancreas home:'

Please see alternate workflow creation for an alternative (function based) method for creating a stream. Please see pipes for a complete list of available pipes.

Parallel processing

An example using riko's parallel API to spawn a ThreadPool. You can instead enable a ProcessPool by additionally passing threads=False to SyncPipe, i.e., SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False).


In [12]:
from riko.collections import SyncPipe

### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}

### Create a parallel SyncPipe flow ###
#
# The following flow will:
#   1. fetch the hackernews RSS feed 
#   2. filter for items with '.com' in the article link
#   3. fetch the first comment from all items in parallel (using 4 workers)
#   4. flatten the result into one raw stream
#   5. extract the first item's content
#
# Note: no point in sorting after the filter since parallel fetching doesn't guarantee 
# order
flow = (
    SyncPipe('fetch', conf=fetch_conf, parallel=True, workers=4)  # 1
        .filter(conf={'rule': filter_rule})                       # 2
        .xpathfetchpage(conf=xpath_conf))                         # 3

stream = flow.output                                              # 4
next(stream)['content']                                           # 5


Out[12]:
'He uses the following example for when to throw your own errors:'

Asynchronous processing

To enable asynchronous processing, you must install the async module.

pip install riko[async]

An example using riko's asynchronous API.


In [2]:
from riko.bado import coroutine, react
from riko.collections import AsyncPipe

### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}

### Create an AsyncPipe flow ###
#
# The following flow will:
#   1. fetch the hackernews RSS feed
#   2. filter for items with '.com' in the article link
#   3. asynchronously fetch the first comment from each item (using 4 connections)
#   4. flatten the result into one raw stream
#   5. extract the first item's content
#
# Note: no point in sorting after the filter since async fetching doesn't guarantee 
# order
@coroutine
def run(reactor):
    stream = yield (
        AsyncPipe('fetch', conf=fetch_conf, connections=4)  # 1
            .filter(conf={'rule': filter_rule})             # 2
            .xpathfetchpage(conf=xpath_conf)                # 3
            .output)                                        # 4
    
    print(next(stream)['content'])                          # 5
    
try:
    react(run)
except SystemExit:
    pass


Here's how iteration works ():

Design Principles

The primary data structures in riko are the item and stream. An item is just a python dictionary, and a stream is an iterator of items. You can create a stream manually with something as simple as [{'content': 'hello world'}]. You manipulate streams in riko via pipes. A pipe is simply a function that accepts either a stream or item, and returns a stream. pipes are composable: you can use the output of one pipe as the input to another pipe.

riko pipes come in two flavors; operators and processors. operators operate on an entire stream at once and are unable to handle individual items. Example operators include pipecount, pipefilter, and pipereverse.


In [14]:
from riko.modules.reverse import pipe

stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
next(pipe(stream))


Out[14]:
{'title': 'riko pt. 2'}

processors process individual items and can be parallelized across threads or processes. Example processors include pipefetchsitefeed, pipehash, pipeitembuilder, and piperegex.


In [15]:
from riko.modules.hash import pipe

item = {'title': 'riko pt. 1'}
stream = pipe(item, field='title')
next(stream)


Out[15]:
{'hash': 1323840151, 'title': 'riko pt. 1'}

Some processors, e.g., pipetokenizer, return multiple results.


In [16]:
from riko.modules.tokenizer import pipe

item = {'title': 'riko pt. 1'}
tokenizer_conf = {'delimiter': ' '}
stream = pipe(item, conf=tokenizer_conf, field='title')
next(stream)


Out[16]:
{'tokenizer': [{'content': 'riko'},
  {'content': 'pt.'},
  {'content': '1'}],
 'title': 'riko pt. 1'}

In [17]:
# In this case, if we just want the result, we can `emit` it instead
stream = pipe(item, conf=tokenizer_conf, field='title', emit=True)
next(stream)


Out[17]:
{'content': 'riko'}

operators are split into sub-types of aggregators and composers. aggregators, e.g., count, combine all items of an input stream into a new stream with a single item; while composers, e.g., filter, create a new stream containing some or all items of an input stream.


In [26]:
from riko.modules.count import pipe

stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
next(pipe(stream))


Out[26]:
{'count': 2}

processors are split into sub-types of source and transformer. sources, e.g., itembuilder, can create a stream while transformers, e.g. hash can only transform items in a stream.


In [27]:
from riko.modules.itembuilder import pipe

attrs = {'key': 'title', 'value': 'riko pt. 1'}
next(pipe(conf={'attrs': attrs}))


Out[27]:
{'title': 'riko pt. 1'}

The following table summaries these observations:

type sub-type input output parallelizable? creates streams?
operator aggregator stream stream *
operator composer stream stream
processor source item stream
processor transformer item stream

If you are unsure of the type of pipe you have, check its metadata.

* the output stream of an aggregator is an iterator of only 1 item.


In [3]:
from riko.modules import fetchpage, count

fetchpage.async_pipe.__dict__


Out[3]:
{'__wrapped__': <function riko.modules.fetchpage.async_pipe>,
 'name': 'fetchpage',
 'sub_type': 'source',
 'type': 'processor'}

In [29]:
count.pipe.__dict__


Out[29]:
{'__wrapped__': <function riko.modules.count.pipe>,
 'name': 'count',
 'sub_type': 'aggregator',
 'type': 'operator'}

The SyncPipe and AsyncPipe classes (among other things) perform this check for you to allow for convenient method chaining and transparent parallelization.


In [30]:
from riko.collections import SyncPipe

attrs = [
    {'key': 'title', 'value': 'riko pt. 1'},
    {'key': 'content', 'value': "Let's talk about riko!"}]
flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash()
flow.list[0]


Out[30]:
{'content': "Let's talk about riko!",
 'hash': 2092196356,
 'title': 'riko pt. 1'}

Please see the cookbook for advanced examples including how to wire in vales from other pipes or accept user input.

Command-line Interface

riko provides a command, runpipe, to execute workflows. A workflow is simply a file containing a function named pipe that creates a flow and processes the resulting stream.

CLI Setup

flow.py


In [31]:
from __future__ import print_function
from riko.collections import SyncPipe

conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]}
conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]}

def pipe(test=False):
    kwargs = {'conf': conf1, 'test': test}
    flow = SyncPipe('itembuilder', **kwargs).strreplace(conf=conf2)
    stream = flow.output
    
    for i in stream:
        print(i)

CLI Usage

Now to execute flow.py, type the command runpipe flow. You should then see the following output in your terminal:

https://google.co.uk

runpipe will also search the examples directory for workflows. Type runpipe demo and you should see the following output:

Deadline to clear up health law eligibility near 682