See the DataFlows documentation for more details regarding the Flow object and processing functions.
Feel free to modify and commit changes which demonstrate additional functionality or relevant data.
In [1]:
import yaml
from dataflows import Flow, filter_rows, cache, dump_to_path
from datapackage_pipelines_knesset.common_flow import load_knesset_data, load_member_names
import tabulator
In [2]:
# Limit processing of protocol parts for development
PROCESS_PARTS_LIMIT = 500
# Enable caching of protocol parts data (not efficient, should only be used for local development with sensible PROCESS_PARTS_LIMIT)
PROCESS_PARTS_CACHE = True
# Filter the meetings to be processed, these kwargs are passed along to DataFlows filter_rows processor for meetings resource
MEETINGS_FILTER_ROWS_KWARGS = {'equals': [{'KnessetNum': 20}]}
# Don'e use local data - loads everything from knesset data remote storage
# When set to False - also enables caching, so you won't download from remote storage on 2nd run.
USE_DATA = False
In [3]:
# Loads a dict containing mapping between knesset member id and the member name
member_names = load_member_names(use_data=USE_DATA)
# define flow steps for loading the source committee meetings data
# the actual loading is done later in the Flow
load_steps = (
load_knesset_data('people/committees/meeting-attendees/datapackage.json', USE_DATA),
filter_rows(**MEETINGS_FILTER_ROWS_KWARGS)
)
if not USE_DATA:
# when loading from URL - enable caching which will skip loading on 2nd run
load_steps = (cache(*load_steps, cache_path='.cache/people-committee-meeting-attendees-knesset-20'),)
Last command's output log should contain urls to datapackage.json files, open them and check the table schema to see the resource metadata and available fields which you can use in the processing functions.
Check the frictionlessdata docs for more details about the datapackage file format.
In [4]:
from collections import defaultdict
stats = defaultdict(int)
member_attended_meetings = defaultdict(int)
def process_meeting_protocol_part(row):
stats['processed parts'] += 1
if row['body'] and 'אנחנו ככנסת צריכים להיות ערוכים' in row['body']:
stats['meetings contain text: we as knesset need to be prepared'] += 1
def process_meeting(row):
stats['total meetings'] += 1
if row['attended_mk_individual_ids']:
for mk_id in row['attended_mk_individual_ids']:
member_attended_meetings[mk_id] += 1
parts_filename = row['parts_parsed_filename']
if parts_filename:
if PROCESS_PARTS_LIMIT and stats['processed parts'] < PROCESS_PARTS_LIMIT:
steps = (load_knesset_data('committees/meeting_protocols_parts/' + parts_filename, USE_DATA),)
if not USE_DATA and PROCESS_PARTS_CACHE:
steps = (cache(*steps, cache_path='.cache/committee-meeting-protocol-parts/' + parts_filename),)
steps += (process_meeting_protocol_part,)
Flow(*steps).process()
process_steps = (process_meeting,)
In [5]:
Flow(*load_steps, *process_steps, dump_to_path('data/committee-meeting-attendees-parts')).process()
Out[5]:
In [6]:
from collections import deque
top_attended_member_names = [member_names[mk_id] for mk_id, num_attended in
deque(sorted(member_attended_meetings.items(), key=lambda kv: kv[1]), maxlen=5)]
print('\n')
print('-- top attended members --')
print(top_attended_member_names)
print('\n')
print('-- stats --')
print(yaml.dump(dict(stats), default_flow_style=False, allow_unicode=True))
In [ ]: