In [10]:
STONES = ['parse_by_line', 'parse_by_window', 'parse_by_phrase', 'n_syll_project']
In [2]:
eg_path1='/Volumes/Present/DH/corpora/chadwyck_poetry/xml/african-american/beadlesa/Z200265018.xml'
eg_path2='/Volumes/Present/DH/corpora/chadwyck_poetry/xml/english/miscell2/Z200439011.xml'
eg_path3='/Volumes/Present/DH/corpora/chadwyck_poetry/xml/faber/fa0101/Z200557409.xml'
In [3]:
MAXLINES=1000
In [4]:
import prosodic as p
In [9]:
from llp import tools
def read_file(path_to_txt_or_xml_file):
try:
if path_to_txt_or_xml_file.endswith('.xml'):
txt=text_plain(path_to_txt_or_xml_file)
else:
#with open(path_to_txt_or_xml_file) as f:
# txt=f.read()
txt=tools.read(path_to_txt_or_xml_file)
return txt
except (IOError,UnicodeDecodeError) as e:
print('!!',e)
print('!!',path_to_txt_or_xml_file)
print()
return ''
In [9]:
#read_file(eg_path1)
In [10]:
def counts(string, sub):
count = start = 0
while True:
start = string.find(sub, start) + 1
if start > 0:
count+=1
else:
return count
In [11]:
def get_data_from_line(line,meter,require_parse_data=True):
"""
Get data from the prosodic line object, with its meter.
"""
# get phonological info
weight_str=line.str_weight()
sonority_str=line.str_sonority()
stress_str=line.str_stress()
# store metrical constraint stats
bp=line.bestParse(meter)
# require
if require_parse_data:
if not bp:
return {}
ap=line.allParses(meter)
output_dict={}
output_dict['prosodic_line']=line.txt #.encode('utf-8',errors='ignore')
output_dict['parse']=bp.posString(viols=True) if bp else ''
#output_dict['parse']=output_dict['parse'] #.encode('utf-8',errors='ignore')
meter_str=output_dict['meter']=bp.str_meter() if bp else ''
output_dict['num_parses']=len(ap)
output_dict['num_viols'] = bp.totalCount if bp else ''
output_dict['score_viols'] = bp.score() if bp else ''
output_dict['num_sylls']=bp.num_sylls if bp else ''
output_dict['num_words']=len(line.words())
for c in meter.constraints:
sumviol = sum([parse.constraintCounts[c] if c in parse.constraintCounts else 0 for parse in ap])
output_dict[c.name_weight+'_bestparse']=bp.constraintCounts[c] if bp and c in bp.constraintCounts else 0
output_dict[c.name_weight+'_allparse_sum']=sumviol if sumviol else 0
## store phonological constraint stats
output_dict['prosodic_stress']=stress_str
output_dict['prosodic_weight']=weight_str
output_dict['prosodic_sonority']=sonority_str
output_dict['num_monosylls']=len([w for w in line.words() if w.numSyll==1])
output_dict['[*clash_across]']=counts(stress_str,'P#P') + counts(stress_str,'P#S') + counts(stress_str,'S#P') + counts(stress_str,'S#S')
output_dict['[*clash_within]']=counts(stress_str,'PP') + counts(stress_str,'PS') + counts(stress_str,'SP') + counts(stress_str,'SS')
output_dict['[*clash_across_primary]']=counts(stress_str,'P#P')
output_dict['[*clash_within_primary]']=counts(stress_str,'PP')
output_dict['[*lapse_across]']=counts(stress_str,'U#U')
output_dict['[*lapse_within]']=counts(stress_str,'UU')
output_dict['[*WSP]']=0
output_dict['[*PEAKPROM]']=0
output_dict['[*High_Stress]']=0
output_dict['[*Low_Unstress]']=0
output_dict['[*High_Strong]']=0
output_dict['[*Low_Weak]']=0
for s,w,hml,mtr in zip(stress_str,weight_str,sonority_str,meter_str):
if s=='U' and w=='H':
output_dict['[*WSP]']+=1
if (s=='P' or s=='S') and w=='L':
output_dict['[*PEAKPROM]']+=1
if hml=='H' and s in {'P','S'}:
output_dict['[*High_Stress]']+=1
if hml=='L' and s=="U":
output_dict['[*Low_Unstress]']+=1
if hml=='H' and mtr == 's':
output_dict['[*High_Strong]']+=1
if hml=='L' and mtr == 'w':
output_dict['[*Low_Weak]']+=1
return output_dict
In [12]:
def parse_string(text_str, meter='default_english', num_processes=1, maxlines=MAXLINES):
"""
Parse the string, assuming line as unit
"""
# prosodic parse
if maxlines: text_str='\n'.join(text_str.split('\n')[:maxlines])
text = p.Text(text_str)
meter = text.get_meter(meter)
out_ld=[]
for i,line in enumerate(text.iparse(meter=meter, num_processes=num_processes)):
line_d=get_data_from_line(line,meter)
if not line_d or not 'score_viols' in line_d: continue
line_d['line_id']=i+1
out_ld.append(line_d)
return out_ld
In [13]:
# import pandas as pd
# pd.DataFrame(parse_string("""With what attractive charms this goodly frame
# Of nature touches the consenting hearts
# Of mortal men; and what the pleasing stores
# Which beauteous imitation thence derives
# To deck the poet's, or the painter's toil;
# My verse unfolds."""))
In [14]:
def parse_by_line(path_to_txt_or_xml_file, meter='default_english', num_processes=1):
# get txt
txt=read_file(path_to_txt_or_xml_file)
# return parse
return parse_string(txt, meter=meter, num_processes=num_processes)
In [15]:
# import pandas as pd
# pd.DataFrame(parse_by_line(eg_path)).sort_values('score_viols').head()
In [16]:
# import pandas as pd
# df_window1=pd.DataFrame(parse_by_line(eg_path1))
# df_window2=pd.DataFrame(parse_by_line(eg_path2))
# print(df_window1.mean()['score_viols'], df_window2.mean()['score_viols'])
# df_window2.sort_values('num_viols').head()
In [17]:
def slice(l,num_slices=None,slice_length=None,runts=True,random=False):
"""
Returns a new list of n evenly-sized segments of the original list
"""
if random:
import random
random.shuffle(l)
if not num_slices and not slice_length: return l
if not slice_length: slice_length=int(len(l)/num_slices)
newlist=[l[i:i+slice_length] for i in range(0, len(l), slice_length)]
if runts: return newlist
return [lx for lx in newlist if len(lx)==slice_length]
def ngram(l,n=3):
grams=[]
gram=[]
for x in l:
gram.append(x)
if len(gram)<n: continue
g=tuple(gram)
grams.append(g)
gram.reverse()
gram.pop()
gram.reverse()
return grams
In [18]:
#slice(read_file(eg_path).split(), slice_length=5)
#len(ngram(read_file(eg_path).split(), n=5))
In [19]:
def parse_by_window(path_to_txt_or_xml_file, meter='default_english', window_size=5,overlapping_windows=False,max_slices=100000,num_processes=1,):
# get txt
txt=read_file(path_to_txt_or_xml_file)
words=txt.split()
if overlapping_windows:
word_slices = ngram(words,n=window_size)
else:
word_slices = slice(words,slice_length=window_size)
word_slices = word_slices[:max_slices]
txt = '\n'.join([' '.join(slicex) for slicex in word_slices])
return parse_string(txt)
In [20]:
# import pandas as pd
# df_window1=pd.DataFrame(parse_by_window(eg_path1))
# df_window2=pd.DataFrame(parse_by_window(eg_path2))
# print(df_window1.mean()['score_viols'], df_window2.mean()['score_viols'])
# df_window2.sort_values('num_viols').head()
In [21]:
# Requires NLTK
def parse_by_phrase(path_to_txt_or_xml_file, meter='default_english', minword=5):
# get txt
txt=read_file(path_to_txt_or_xml_file)
# phrases
import re
phrases=re.split('[?.,;:\n]', txt)
# recombine for minword
if minword:
phrases2=[]
phrase=[]
for px in phrases:
phrase+=px.split()
if len(phrase)>=minword:
phrases2+=[' '.join(phrase)]
phrase=[]
phrases=phrases2
# make txt
txt = '\n'.join(phrases)
# return parsed
return parse_string(txt)
In [22]:
# import pandas as pd
# df_window1=pd.DataFrame(parse_by_phrase(eg_path1))
# df_window2=pd.DataFrame(parse_by_phrase(eg_path2))
# print(df_window1.mean()['score_viols'], df_window2.mean()['score_viols'])
# df_window2.sort_values('score_viols',ascending=False).head()
In [ ]:
In [ ]:
In [23]:
def loopsum(l,sumval=10,overlapping=True,overlapping_offset=1):
stack=[]
for i,x in enumerate(l):
stack.append((i,x))
stack_sum=sum([y for x,y in stack])
#print(stack_sum,stack)
if stack_sum < sumval: continue
while sum([y for x,y in stack]) > sumval:
gone=stack.pop(0)
_sum=sum([y for x,y in stack])
#print('-',gone,_sum,stack)
#print(_sum,stack,'--')
#print('>>',stack_sum,stack)
stack_sum=sum([y for x,y in stack])
if stack_sum == sumval:
_sum=sum([y for x,y in stack])
#print(_sum,stack,'>>')
yield stack
stack=stack[overlapping_offset:] if overlapping else []
if stack and sum([y for x,y in stack])==sumval:
yield stack
In [24]:
def tokenize_fast(line):
import re
return re.findall("[A-Z]{2,}(?![a-z])|[A-Z][a-z]+(?=[A-Z])|[\'\w\-]+",line.lower())
In [32]:
import random
def n_syll_project(path_to_txt_or_xml_file, n_syll=10,meter='default_english',
within_phrases=True,within_lines=False, maxlines=MAXLINES,
overlapping=False,overlapping_offset=2,shuffle_lines=True,
can_end_on_unstressed=False,can_end_on_maybe_stressed=False,
return_postprocessed=True):
# get txt
txt=read_file(path_to_txt_or_xml_file)
# words
all_tokens=tokenize_fast(txt)
all_types=set(all_tokens)
# dictionary
pDict = p.dict['en']
# getting the numsyll
word2numsyll=dict((w,pDict.get(w)[0].getNumSyll()) for w in all_types)
# phrases
import re
if within_lines:
phrase_splitter='[?.,;:\n]' if within_phrases else '[\n]'
else:
phrase_splitter='[?.,;:]' if within_phrases else ''
if phrase_splitter:
phrases=re.split(phrase_splitter, txt)
else:
phrases = [txt]
if not can_end_on_unstressed: unstressed = set(p.dict['en'].unstressedWords)
if not can_end_on_maybe_stressed: maybestressed = set(p.dict['en'].unstressedWords)
# loop over phrases to produce the lines
LINES = []
for phrase in phrases:
phrase_words=tokenize_fast(phrase)
phrase_nsylls=[word2numsyll.get(w,0) for w in phrase_words]
for stack in loopsum(phrase_nsylls,sumval=n_syll,overlapping=overlapping,overlapping_offset=overlapping_offset):
stack_words=[phrase_words[i] for i,x in stack]
if not can_end_on_unstressed and stack_words[-1] in unstressed: continue
if not can_end_on_maybe_stressed and stack_words[-1] in maybestressed: continue
line = ' '.join(stack_words)
#print(line)
LINES.append(line)
# make txt
print('>> # lines:',len(LINES),'in',path_to_txt_or_xml_file)
if shuffle_lines: random.shuffle(LINES)
LINES=LINES[:MAXLINES]
#print('>> # lines:',len(LINES))
txt = '\n'.join(LINES)
#print(txt)
# return parsed
data=parse_string(txt,meter=meter)
if return_postprocessed:
data=postprocess_avg((path_to_txt_or_xml_file,data))
#print(data)
return data
In [26]:
#n_syll_project('ewrwrerewre')
In [27]:
# import pandas as pd
# pd.DataFrame(n_syll_project(eg_path3))
In [28]:
#n_syll_project(eg_path3,return_postprocessed=True)
In [29]:
#n_syll_project(eg_path3)
In [30]:
def postprocess_avg(individual_slingshot_result, split_bys=['/xml/','/txt/','/_txt_chadwyck/','/_txt_ecco_tcp/','/_txt_sellars/']):
# imports
import os, pandas as pd
# Split into two components
path,data=individual_slingshot_result
# make pandas dataframe
df=pd.DataFrame(data)
#df2=pd.DataFrame(index=df.index)
# normalize
for col in df.columns:
try:
df[col+'_per_100_sylls'] = df[col] / df['num_sylls'] * 100
except (TypeError,ValueError) as e:
pass
# summarize
d=dict(df.mean())
# outof
d['num_lines_parsed']=len(data)
# add id
#split_fn_by='/txt/'
idx=os.path.splitext(path)[0]
for split_fn_by in split_bys:
idx=idx.split(split_fn_by)[-1]
d['id'] = idx
# return dict
return d