Define functions
In [56]:
import csv
import xml.etree.ElementTree as ET
from os import listdir
import re
import subprocess
from tempfile import mkdtemp
from glob import glob
target = 'target'
ana_name = 'ana'
TALKER_SEP = '_TALKER_'
def get_talker(all_talkers,indices, b):
for i in range(len(indices)):
if indices[i] > b:
return all_talkers[i-1].strip()
def get_annotations_text(root, text, as_list=False):
texts = {}
talkers = {}
all_talkers = re.findall('([א-ת])+ [א-ת]+:\n', text)
all_talkers = re.findall('\n.*:\n', text)
valid_talkers = []
for talker in all_talkers:
if len(talker) > 2 and len(talker) <= 40:
valid_talkers.append(talker)
all_talkers = valid_talkers
talkers_indices=[]
i = 0
for name in all_talkers:
i = text.find(name, i)
talkers_indices.append(i)
for n in root.iter('{http://www.tei-c.org/ns/1.0}seg'):
ana = ''
for a in n.iter():
attr = a.attrib
#print(attr)
if ana_name in attr:
ana = attr[ana_name][1:]
# print("ana="+ana)
if target in attr:
b,e = (attr[target].split('=')[1]).split(",")
#print(b+"," + e)
a = attr[target].split('#')[0]
t = text[int(b):int(e) + 1]
#print("b,e %s,%s " %(b,e))
if len(t) >= 1:
talker = get_talker(all_talkers, talkers_indices, int(b))
if ana in texts:
#print("a="+a)
#print("t="+t)
texts[ana].append(t)
#print("appending " + t + " to " + ana)
else:
#print("inserting " + t + " to " + ana + "...")
texts[ana]=[t]
talkers[ana] = talker
cats = {}
for aa in root.iter('{http://www.tei-c.org/ns/1.0}fsDecl'):
for a in aa.iter('{http://www.tei-c.org/ns/1.0}fsDecl'):
# print (a.tag)
att = a.attrib
for x in a.iter():
if x.tag == '{http://www.tei-c.org/ns/1.0}fsDescr' and 'type' in att:
name = x.text
t = att['type']
if not t in cats:
cats[t] = name
annotaions_text = {}
for c in cats:
if as_list:
annotaions_text[cats[c]] = []
else:
annotaions_text[cats[c]] = ''
for n in root.iter('{http://www.tei-c.org/ns/1.0}fs'):
id = n.attrib['{http://www.w3.org/XML/1998/namespace}id']
if 'type' in n.attrib:
t=n.attrib['type']
#print('type='+t)
if id in texts:
anno_cat = cats[t]
anno_text = ''.join(texts[id])
anno_talker = "?"
if talkers[id] is not None:
anno_talker = talkers[id].strip()
if as_list:
annotaions_text[anno_cat].append((anno_talker,anno_text))
else:
annotaions_text[anno_cat] = annotaions_text[anno_cat] + "\t" + anno_talker + TALKER_SEP + anno_text
return annotaions_text
def get_annotations(annotation_path, as_list=False, with_text=False):
text_filenames = glob('{}/*.txt'.format(annotation_path))
assert len(text_filenames) == 1
text_filename = text_filenames[0]
xml_filenames = glob('{}/*/*.xml'.format(annotation_path))
assert len(xml_filenames) == 1
xml_filename = xml_filenames[0]
with open(text_filename,'r',encoding='utf-8') as file:
text = file.read()
text = text.replace("\n",'\n ')
tree = ET.parse(xml_filename)
root = tree.getroot()
annotations = get_annotations_text(root, text, as_list=as_list)
if with_text:
return annotations, text
else:
return annotations
Catma is used by Bar Ilan University (BIU) to do manual classification / tagging of protocol parts
Original protocol files are uploaded Catma which parses them into text, which BIU manually tags according to certain predefined tags (related to law)
Need to export the corpus from Catma and provide the .tar.gz file as input for this notebook
In [48]:
corpus_filename = '/pipelines/data/catma/ההסדרים_אקראיים1909171124.tar.gz'
In [49]:
corpus_dir = mkdtemp()
subprocess.check_call('tar -xzvf "{}" -C "{}"'.format(corpus_filename, corpus_dir), shell=True)
annotation_paths = glob('{}/*/*'.format(corpus_dir))
for i,p in enumerate(annotation_paths): print(i,p)
Choose an annotation path to get annotations from
In [50]:
annotation_path = annotation_paths[0]
annotation_path
Out[50]:
In [51]:
get_annotations(annotation_path, as_list=True)
Out[51]:
Get annotation statistics
In [68]:
from dataflows import Flow, printer
known_categories = [
'Judicial decision',
'constitutional turns',
'Doubt',
'Anticipating Judicial Review'
]
def get_year(text):
return re.findall('[2][0][0-9][0-9]', text)[0]
yearly_counts = {}
def get_annotation_file_stats(annotation_paths):
for annotation_path in annotation_paths:
annotations, text = get_annotations(annotation_path, as_list=True, with_text=True)
year = get_year(text)
if not yearly_counts.get(year):
yearly_counts[year] = {c: 0 for c in known_categories}
row = {
'year': year,
'dirname': annotation_path.replace(corpus_dir, '').strip('/'),
**{
c: 0 for c in known_categories
}
}
for category, category_annotations in annotations.items():
assert category in known_categories
row[category] = len(category_annotations)
yearly_counts[year][category] += len(category_annotations)
yield row
def get_yearly_counts():
for year, counts in yearly_counts.items():
yield {
'year': year,
**counts
}
Flow(
get_annotation_file_stats(annotation_paths),
get_yearly_counts(),
printer(tablefmt='html')
).process()
Out[68]: