Parallel Processing in Snorkel

In this notebook, we'll do the same preprocessing as in the introduction tutorial, but using multiple processes to do it in parallel.

Initializing a SnorkelSession

First, we initialize a SnorkelSession, which manages a connection to a database automatically for us, and will enable us to save intermediate results.

We need to specify a connection to a database that supports multiple connections.


In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
os.environ['SNORKELDB'] = 'postgres:///snorkel'

from snorkel import SnorkelSession
session = SnorkelSession()

Loading the Corpus


In [ ]:
from snorkel.parser import TSVDocPreprocessor

doc_preprocessor = TSVDocPreprocessor('../intro/data/articles.tsv', max_docs=n_docs)

Running a CorpusParser

The only thing we do differently from the introduction tutorial is specify how many processes to use:


In [ ]:
from snorkel.parser.spacy_parser import Spacy
from snorkel.parser import CorpusParser

corpus_parser = CorpusParser(parser=Spacy())
%time corpus_parser.apply(doc_preprocessor, count=2591, parallelism=4)

Generating Candidates

We can also do candidate generation in parallel. We'll repeat some code from the introduction.


In [ ]:
from snorkel.models import candidate_subclass

Spouse = candidate_subclass('Spouse', ['person1', 'person2'])

In [ ]:
from snorkel.candidates import Ngrams, CandidateExtractor
from snorkel.matchers import PersonMatcher

ngrams         = Ngrams(n_max=7)
person_matcher = PersonMatcher(longest_match_only=True)
cand_extractor = CandidateExtractor(Spouse, [ngrams, ngrams], [person_matcher, person_matcher])

In [ ]:
from snorkel.models import Document
from util import number_of_people

docs = session.query(Document).order_by(Document.name).all()

train_sents = set()
dev_sents   = set()
test_sents  = set()

for i, doc in enumerate(docs):
    for s in doc.sentences:
        if number_of_people(s) <= 5:
            if i % 10 == 8:
                dev_sents.add(s)
            elif i % 10 == 9:
                test_sents.add(s)
            else:
                train_sents.add(s)

Finally, we'll again apply the candidate extractor with a specified number of parallel processes.


In [ ]:
%%time
for i, sents in enumerate([train_sents, dev_sents, test_sents]):
    cand_extractor.apply(sents, split=i, parallelism=4)
    print("Number of candidates:", session.query(Spouse).filter(Spouse.split == i).count())