In [4]:
from os import path as p, chdir
if 'examples' in p.abspath('.'):
chdir('..')
In this example, we fetch the title of a webpage.
In [5]:
from riko.modules.fetchpage import pipe
url = 'https://news.ycombinator.com/'
next(pipe(conf={'url': url, 'start': '<title>', 'end': '</title>'}))
Out[5]:
Here, we fetch the the first hackernews story link using an xpath.
In [6]:
from riko.modules.xpathfetchpage import pipe
xpath = '/html/body/center/table/tr[3]/td/table/tr[1]/td[3]/a'
next(pipe(conf={'url': 'https://news.ycombinator.com/', 'xpath': xpath}))
Out[6]:
Here, we use several pipes to count the number of words on a webpage.
In [8]:
### Create a SyncPipe flow ###
#
# `SyncPipe` is a convenience class that creates chainable flows
# and allows for parallel processing.
from riko.collections import SyncPipe
### Set the pipe configurations ###
#
# Notes:
# 1. the `detag` option will strip all html tags from the result
# 2. fetch the text contained inside the 'body' tag of the hackernews homepage
# 3. replace newlines with spaces and assign the result to 'content'
# 4. tokenize the resulting text using whitespace as the delimeter
# 5. count the number of times each token appears
# 6. obtain the raw stream
# 7. extract the first word and its count
url = 'https://news.ycombinator.com/'
fetch_conf = {'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1
replace_conf = {'rule': [{'find': '\r\n', 'replace': ' '}, {'find': '\n', 'replace': ' '}]}
flow = (
SyncPipe('fetchpage', conf=fetch_conf) # 2
.strreplace(conf=replace_conf, assign='content') # 3
.tokenizer(conf={'delimiter': ' '}, emit=True) # 4
.count(conf={'count_key': 'content'})) # 5
stream = flow.output # 6
next(stream) # 7
Out[8]:
riko
can fetch rss feeds from both local and remote filepaths via "source"
pipes
. Each "source" pipe
returns a stream
, i.e., an iterator of
dictionaries, aka items
.
In [8]:
from riko.modules.fetch import pipe
### Fetch an RSS feed ###
stream = pipe(conf={'url': 'https://news.ycombinator.com/rss'})
item = next(stream)
item['title'], item['link'], item['comments']
Out[8]:
In [9]:
from riko.modules.fetchsitefeed import pipe
### Fetch the first RSS feed found ###
#
# Note: regardless of how you fetch an RSS feed, it will have the same
# structure
stream = pipe(conf={'url': 'http://arstechnica.com/rss-feeds/'})
item = next(stream)
item.keys()
Out[9]:
In [10]:
item['title'], item['author'], item['id']
Out[10]:
Please see the FAQ for a complete list of supported file types and protocols. Please see Fetching data and feeds for more examples.
riko
can modify streams
via the 40 built-in pipes
In [11]:
from riko.collections import SyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create a SyncPipe flow ###
#
# `SyncPipe` is a convenience class that creates chainable flows
# and allows for parallel processing.
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the link
# 3. sort the items ascending by title
# 4. fetch the first comment from each item
# 5. flatten the result into one raw stream
# 6. extract the first item's content
#
# Note: sorting is not lazy so take caution when using this pipe
flow = (
SyncPipe('fetch', conf=fetch_conf) # 1
.filter(conf={'rule': filter_rule}) # 2
.sort(conf={'rule': {'sort_key': 'title'}}) # 3
.xpathfetchpage(conf=xpath_conf)) # 4
stream = flow.output # 5
next(stream)['content'] # 6
Out[11]:
Please see alternate workflow creation for an alternative (function based) method for
creating a stream
. Please see pipes for a complete list of available pipes
.
An example using riko
's parallel API to spawn a ThreadPool
. You can instead enable a ProcessPool
by additionally passing threads=False
to SyncPipe
, i.e., SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False)
.
In [12]:
from riko.collections import SyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create a parallel SyncPipe flow ###
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the article link
# 3. fetch the first comment from all items in parallel (using 4 workers)
# 4. flatten the result into one raw stream
# 5. extract the first item's content
#
# Note: no point in sorting after the filter since parallel fetching doesn't guarantee
# order
flow = (
SyncPipe('fetch', conf=fetch_conf, parallel=True, workers=4) # 1
.filter(conf={'rule': filter_rule}) # 2
.xpathfetchpage(conf=xpath_conf)) # 3
stream = flow.output # 4
next(stream)['content'] # 5
Out[12]:
To enable asynchronous processing, you must install the async
module.
pip install riko[async]
An example using riko
's asynchronous API.
In [2]:
from riko.bado import coroutine, react
from riko.collections import AsyncPipe
### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
### Create an AsyncPipe flow ###
#
# The following flow will:
# 1. fetch the hackernews RSS feed
# 2. filter for items with '.com' in the article link
# 3. asynchronously fetch the first comment from each item (using 4 connections)
# 4. flatten the result into one raw stream
# 5. extract the first item's content
#
# Note: no point in sorting after the filter since async fetching doesn't guarantee
# order
@coroutine
def run(reactor):
stream = yield (
AsyncPipe('fetch', conf=fetch_conf, connections=4) # 1
.filter(conf={'rule': filter_rule}) # 2
.xpathfetchpage(conf=xpath_conf) # 3
.output) # 4
print(next(stream)['content']) # 5
try:
react(run)
except SystemExit:
pass
The primary data structures in riko
are the item
and stream
. An item
is just a python dictionary, and a stream
is an iterator of items
. You can
create a stream
manually with something as simple as
[{'content': 'hello world'}]
. You manipulate streams
in
riko
via pipes
. A pipe
is simply a function that accepts either a
stream
or item
, and returns a stream
. pipes
are composable: you
can use the output of one pipe
as the input to another pipe
.
riko
pipes
come in two flavors; operators
and processors
.
operators
operate on an entire stream
at once and are unable to handle
individual items. Example operators
include pipecount
, pipefilter
,
and pipereverse
.
In [14]:
from riko.modules.reverse import pipe
stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
next(pipe(stream))
Out[14]:
processors
process individual items
and can be parallelized across
threads or processes. Example processors
include pipefetchsitefeed
,
pipehash
, pipeitembuilder
, and piperegex
.
In [15]:
from riko.modules.hash import pipe
item = {'title': 'riko pt. 1'}
stream = pipe(item, field='title')
next(stream)
Out[15]:
Some processors
, e.g., pipetokenizer
, return multiple results.
In [16]:
from riko.modules.tokenizer import pipe
item = {'title': 'riko pt. 1'}
tokenizer_conf = {'delimiter': ' '}
stream = pipe(item, conf=tokenizer_conf, field='title')
next(stream)
Out[16]:
In [17]:
# In this case, if we just want the result, we can `emit` it instead
stream = pipe(item, conf=tokenizer_conf, field='title', emit=True)
next(stream)
Out[17]:
operators
are split into sub-types of aggregators
and composers
. aggregators
, e.g., count
, combine
all items
of an input stream
into a new stream
with a single item
;
while composers
, e.g., filter
, create a new stream
containing
some or all items
of an input stream
.
In [26]:
from riko.modules.count import pipe
stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
next(pipe(stream))
Out[26]:
processors
are split into sub-types of source
and transformer
.
sources
, e.g., itembuilder
, can create a stream
while
transformers
, e.g. hash
can only transform items in a stream
.
In [27]:
from riko.modules.itembuilder import pipe
attrs = {'key': 'title', 'value': 'riko pt. 1'}
next(pipe(conf={'attrs': attrs}))
Out[27]:
The following table summaries these observations:
type | sub-type | input | output | parallelizable? | creates streams? |
---|---|---|---|---|---|
operator | aggregator | stream | stream * | ||
operator | composer | stream | stream | ||
processor | source | item | stream | √ | √ |
processor | transformer | item | stream | √ |
If you are unsure of the type of pipe
you have, check its metadata.
*
the output stream
of an aggregator
is an iterator of only 1 item
.
In [3]:
from riko.modules import fetchpage, count
fetchpage.async_pipe.__dict__
Out[3]:
In [29]:
count.pipe.__dict__
Out[29]:
The SyncPipe
and AsyncPipe
classes (among other things) perform this
check for you to allow for convenient method chaining and transparent
parallelization.
In [30]:
from riko.collections import SyncPipe
attrs = [
{'key': 'title', 'value': 'riko pt. 1'},
{'key': 'content', 'value': "Let's talk about riko!"}]
flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash()
flow.list[0]
Out[30]:
Please see the cookbook for advanced examples including how to wire in vales from other pipes or accept user input.
riko
provides a command, runpipe
, to execute workflows
. A
workflow
is simply a file containing a function named pipe
that creates
a flow
and processes the resulting stream
.
flow.py
In [31]:
from __future__ import print_function
from riko.collections import SyncPipe
conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]}
conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]}
def pipe(test=False):
kwargs = {'conf': conf1, 'test': test}
flow = SyncPipe('itembuilder', **kwargs).strreplace(conf=conf2)
stream = flow.output
for i in stream:
print(i)
Now to execute flow.py
, type the command runpipe flow
. You should
then see the following output in your terminal:
https://google.co.uk
runpipe
will also search the examples
directory for workflows
. Type
runpipe demo
and you should see the following output:
Deadline to clear up health law eligibility near 682