In [4]:
from os import path as p, chdir
if 'examples' in p.abspath('.'):
chdir('..')
In this example, we fetch the title of a webpage.
In [5]:
from riko.modules.fetchpage import pipe
url = 'https://news.ycombinator.com/'
next(pipe(conf={'url': url, 'start': '<title>', 'end': '</title>'}))
Out[5]:
Here, we fetch the the first hackernews story link using an xpath.
In [6]:
from riko.modules.xpathfetchpage import pipe
xpath = '/html/body/center/table/tr[3]/td/table/tr[1]/td[3]/a'
next(pipe(conf={'url': 'https://news.ycombinator.com/', 'xpath': xpath}))
Out[6]:
Here, we use several pipes to count the number of words on a webpage.
In [8]:
### Create a SyncPipe flow ###
#
# `SyncPipe` is a convenience class that creates chainable flows
# and allows for parallel processing.
from riko.collections import SyncPipe
### Set the pipe configurations ###
#
# Notes:
# 1. the `detag` option will strip all html tags from the result
# 2. fetch the text contained inside the 'body' tag of the hackernews homepage
# 3. replace newlines with spaces and assign the result to 'content'
# 4. tokenize the resulting text using whitespace as the delimeter
# 5. count the number of times each token appears
# 6. obtain the raw stream
# 7. extract the first word and its count
url = 'https://news.ycombinator.com/'
fetch_conf = {'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1
replace_conf = {'rule': [{'find': '\r\n', 'replace': ' '}, {'find': '\n', 'replace': ' '}]}
flow = (
SyncPipe('fetchpage', conf=fetch_conf) # 2
.strreplace(conf=replace_conf, assign='content') # 3
.tokenizer(conf={'delimiter': ' '}, emit=True) # 4
.count(conf={'count_key': 'content'})) # 5
stream = flow.output # 6
next(stream) # 7
Out[8]:
riko can fetch rss feeds from both local and remote filepaths via "source"
pipes. Each "source" pipe returns a stream, i.e., an iterator of
dictionaries, aka items.
In [8]:
from riko.modules.fetch import pipe
### Fetch an RSS feed ###
stream = pipe(conf={'url': 'https://news.ycombinator.com/rss'})
item = next(stream)
item['title'], item['link'], item['comments']
Out[8]:
In [9]:
from riko.modules.fetchsitefeed import pipe
### Fetch the first RSS feed found ###
#
# Note: regardless of how you fetch an RSS feed, it will have the same
# structure
stream = pipe(conf={'url': 'http://arstechnica.com/rss-feeds/'})
item = next(stream)
item.keys()
Out[9]:
In [10]:
item['title'], item['author'], item['id']
Out[10]:
Please see the FAQ for a complete list of supported file types and protocols. Please see Fetching data and feeds for more examples.
riko can modify streams via the 40 built-in pipes
In [11]:
from riko.collections import SyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create a SyncPipe flow ###
#
# `SyncPipe` is a convenience class that creates chainable flows
# and allows for parallel processing.
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the link
# 3. sort the items ascending by title
# 4. fetch the first comment from each item
# 5. flatten the result into one raw stream
# 6. extract the first item's content
#
# Note: sorting is not lazy so take caution when using this pipe
flow = (
SyncPipe('fetch', conf=fetch_conf) # 1
.filter(conf={'rule': filter_rule}) # 2
.sort(conf={'rule': {'sort_key': 'title'}}) # 3
.xpathfetchpage(conf=xpath_conf)) # 4
stream = flow.output # 5
next(stream)['content'] # 6
Out[11]:
Please see alternate workflow creation for an alternative (function based) method for
creating a stream. Please see pipes for a complete list of available pipes.
An example using riko's parallel API to spawn a ThreadPool. You can instead enable a ProcessPool by additionally passing threads=False to SyncPipe, i.e., SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False).
In [12]:
from riko.collections import SyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create a parallel SyncPipe flow ###
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the article link
# 3. fetch the first comment from all items in parallel (using 4 workers)
# 4. flatten the result into one raw stream
# 5. extract the first item's content
#
# Note: no point in sorting after the filter since parallel fetching doesn't guarantee
# order
flow = (
SyncPipe('fetch', conf=fetch_conf, parallel=True, workers=4) # 1
.filter(conf={'rule': filter_rule}) # 2
.xpathfetchpage(conf=xpath_conf)) # 3
stream = flow.output # 4
next(stream)['content'] # 5
Out[12]:
To enable asynchronous processing, you must install the async module.
pip install riko[async]
An example using riko's asynchronous API.
In [2]:
from riko.bado import coroutine, react
from riko.collections import AsyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create an AsyncPipe flow ###
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the article link
# 3. asynchronously fetch the first comment from each item (using 4 connections)
# 4. flatten the result into one raw stream
# 5. extract the first item's content
#
# Note: no point in sorting after the filter since async fetching doesn't guarantee
# order
@coroutine
def run(reactor):
stream = yield (
AsyncPipe('fetch', conf=fetch_conf, connections=4) # 1
.filter(conf={'rule': filter_rule}) # 2
.xpathfetchpage(conf=xpath_conf) # 3
.output) # 4
print(next(stream)['content']) # 5
try:
react(run)
except SystemExit:
pass
The primary data structures in riko are the item and stream. An item
is just a python dictionary, and a stream is an iterator of items. You can
create a stream manually with something as simple as
[{'content': 'hello world'}]. You manipulate streams in
riko via pipes. A pipe is simply a function that accepts either a
stream or item, and returns a stream. pipes are composable: you
can use the output of one pipe as the input to another pipe.
riko pipes come in two flavors; operators and processors.
operators operate on an entire stream at once and are unable to handle
individual items. Example operators include pipecount, pipefilter,
and pipereverse.
In [14]:
from riko.modules.reverse import pipe
stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
next(pipe(stream))
Out[14]:
processors process individual items and can be parallelized across
threads or processes. Example processors include pipefetchsitefeed,
pipehash, pipeitembuilder, and piperegex.
In [15]:
from riko.modules.hash import pipe
item = {'title': 'riko pt. 1'}
stream = pipe(item, field='title')
next(stream)
Out[15]:
Some processors, e.g., pipetokenizer, return multiple results.
In [16]:
from riko.modules.tokenizer import pipe
item = {'title': 'riko pt. 1'}
tokenizer_conf = {'delimiter': ' '}
stream = pipe(item, conf=tokenizer_conf, field='title')
next(stream)
Out[16]:
In [17]:
# In this case, if we just want the result, we can `emit` it instead
stream = pipe(item, conf=tokenizer_conf, field='title', emit=True)
next(stream)
Out[17]:
operators are split into sub-types of aggregators
and composers. aggregators, e.g., count, combine
all items of an input stream into a new stream with a single item;
while composers, e.g., filter, create a new stream containing
some or all items of an input stream.
In [26]:
from riko.modules.count import pipe
stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
next(pipe(stream))
Out[26]:
processors are split into sub-types of source and transformer.
sources, e.g., itembuilder, can create a stream while
transformers, e.g. hash can only transform items in a stream.
In [27]:
from riko.modules.itembuilder import pipe
attrs = {'key': 'title', 'value': 'riko pt. 1'}
next(pipe(conf={'attrs': attrs}))
Out[27]:
The following table summaries these observations:
| type | sub-type | input | output | parallelizable? | creates streams? |
|---|---|---|---|---|---|
| operator | aggregator | stream | stream * | ||
| operator | composer | stream | stream | ||
| processor | source | item | stream | √ | √ |
| processor | transformer | item | stream | √ |
If you are unsure of the type of pipe you have, check its metadata.
* the output stream of an aggregator is an iterator of only 1 item.
In [3]:
from riko.modules import fetchpage, count
fetchpage.async_pipe.__dict__
Out[3]:
In [29]:
count.pipe.__dict__
Out[29]:
The SyncPipe and AsyncPipe classes (among other things) perform this
check for you to allow for convenient method chaining and transparent
parallelization.
In [30]:
from riko.collections import SyncPipe
attrs = [
{'key': 'title', 'value': 'riko pt. 1'},
{'key': 'content', 'value': "Let's talk about riko!"}]
flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash()
flow.list[0]
Out[30]:
Please see the cookbook for advanced examples including how to wire in vales from other pipes or accept user input.
riko provides a command, runpipe, to execute workflows. A
workflow is simply a file containing a function named pipe that creates
a flow and processes the resulting stream.
flow.py
In [31]:
from __future__ import print_function
from riko.collections import SyncPipe
conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]}
conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]}
def pipe(test=False):
kwargs = {'conf': conf1, 'test': test}
flow = SyncPipe('itembuilder', **kwargs).strreplace(conf=conf2)
stream = flow.output
for i in stream:
print(i)
Now to execute flow.py, type the command runpipe flow. You should
then see the following output in your terminal:
https://google.co.uk
runpipe will also search the examples directory for workflows. Type
runpipe demo and you should see the following output:
Deadline to clear up health law eligibility near 682