In [93]:
import numpy as np
import math
import matplotlib.pyplot as plt
import random
from numpy.random import rand
from copy import copy
read text
In [20]:
def read_text_words(filename, wordsnumber):
with open(filename) as f:
X = f.readlines()
wordsnumber = len(X)
X = ''.join(X)
X = X.replace('\n', '{') #123
return X
def read_text_whole(filename):
with open(filename) as f:
X = f.read()
X = X.replace('\n', '{') #123
return X
def chop_text_to_size(text, size):
return text[:1024*1024*size]
def read_text_filesize(filename, size):
with open(filename) as f:
X = f.read(1024*1024*size)
X = X.replace('\n', '{') #123
return X
counts
In [21]:
def get_unicount(text):
length = len(text)
counts = np.zeros(27)
for i in xrange(length):
c = ord(text[i])
counts[c-97]+=1
#97-122, 123 - word delimiter
return counts[:26]
bigram statistics
In [22]:
def get_bigram_stats_dic(text):
length = len(text)
dic = {}
for i in xrange(length-1):
if ord(text[i]) == 123 or ord(text[i+1]) == 123:
continue
if (text[i], text[i+1]) in dic:
dic[(text[i], text[i+1])] += 1
else:
dic[(text[i], text[i+1])] = 1
for k,v in dic.items():
r = 0
if (k[0],'{') in dic.keys():
r = dic[(k[0],'{')]
dic[k] = v/(counts[ord(k[0])-97]-r)
return dic
quality
In [23]:
def quality(decrypted, original):
l = len(decrypted)
zipped = zip(decrypted, original)
return sum(1 for x,y in zipped if x != y)/l
crypt
In [24]:
def crypt(text):
p = range(26)
random.shuffle(p)
output=''
p.append(26)
for ch in text:
try:
x = ord(ch) - ord('a')
output+=(chr(p[x] + ord('a')))
except:
pass
return output, p
metropolis
In [140]:
# -*- coding: utf-8 -*-
# from random import random
"""
This module implements algorithm of Metropolis-Hastings
for random variable generation.
The algorithm generates random variables from a desired
distribution (which may be unnormalized).
"""
def metropolis( desiredPDF, initValue, computableRVS, skipIterations = 10 ):
"""
This function returns a generator, which generates random variables
from some space S with a desired distribution using Metropolis-Hastings
algorithm.
Args:
desiredPDF (func) : PDF of desired distribution p( T ), where T from S
initValue : an object from S to initialize the starting point
of iterative proccess
computableRVS (func) : a generator of random value from space S
with given parameter T, which is also from S
skipIterations (int) : number of iterations to skip
(skipping more iterations leads to better accuracy?
but greater time consuming)
Returns: generator, which produce some values from S
and their denisity according to distribution desiredPDF
"""
random_variable = initValue
random_variableDensityValue = desiredPDF( random_variable )
"""
A state of MCMC
"""
#ignore first iterations to let the iterative proccess
#converge to some distribution, which is close to desired
for i in xrange( skipIterations ):
candidate = computableRVS( random_variable )
# print candidate
candidateDensityValue = desiredPDF( candidate )
"""
next candidate for sample, generated by computableRVS
"""
#acceptanceProb = min( 1, candidateDensityValue / random_variableDensityValue )
#logp is returnd by desiredPDF_bigram, so here is the change
acceptanceProb = min(0, candidateDensityValue - random_variableDensityValue )
"""
probability to accept candidate to sample
"""
# print acceptanceProb
if math.log(random.random()) < acceptanceProb:
print candidate
print candidateDensityValue
print acceptanceProb
random_variable = candidate
random_variableDensityValue = candidateDensityValue
#now when the procces is converged to desired distribution,
#return acceptable candidates
print "-----"
while True:
candidate = computableRVS( random_variable )
# print candidate
candidateDensityValue = desiredPDF( candidate )
"""
next candidate for sample, generated by computableRVS
"""
#acceptanceProb = min( 1, candidateDensityValue / random_variableDensityValue )
# logp is returnd by desiredPDF_bigram, so here is the change
acceptanceProb = min( 0, candidateDensityValue - random_variableDensityValue )
"""
probability to accept candidate to sample
"""
# print acceptanceProb
if math.log(random.random()) < acceptanceProb:
random_variable = candidate
random_variableDensityValue = candidateDensityValue
yield random_variable, random_variableDensityValue
density maximization
In [129]:
def densityMaximization( desiredPDF, initValue, computableRVS, skipIterations = 200 ):
"""
This function return a generator, which generates random variables
from some space S by trying to maximize givven density.
The algorithm is a modification of Metropolis-Hastings.
It rejects all objects, which decrease density.
Args:
desiredPDF (func) : PDF of desired distribution p( T ), where T from S
initValue : an object from S to initialize the starting point
of iterative proccess
computableRVS (func) : a generator of random value from space S
with given parameter T, which is also from S
skipIterations (int) : number of iterations to skip
(skipping more iterations leads to better accuracy?
but greater time consuming)
Returns: generator, which produce some values from S,
where each next value has no less density, and their denisity
"""
random_variable = initValue
random_variableDensityValue = desiredPDF( random_variable )
"""
A state of MCMC
"""
#ignore first iterations to let the iterative proccess to enter
#the high density regions
for i in xrange( skipIterations ):
candidate = computableRVS( random_variable )
candidateDensityValue = desiredPDF( candidate )
"""
next candidate for sample, generated by computableRVS
"""
if random_variableDensityValue < candidateDensityValue:
# print candidate
# print candidateDensityValue
random_variable = candidate
random_variableDensityValue = candidateDensityValue
#now when the procces is in high density regions,
#return acceptable candidates
while True:
candidate = computableRVS( random_variable )
candidateDensityValue = desiredPDF( candidate )
"""
next candidate for sample, generated by computableRVS
"""
if random_variableDensityValue < candidateDensityValue:
# print candidate
# print candidateDensityValue
random_variable = candidate
random_variableDensityValue = candidateDensityValue
yield random_variable, random_variableDensityValue
permutation generator and computablervs
In [97]:
"""
This module provide some functions,
that generate random permutations with different distributions.
There are a uniform distribution and a symmetric distribution,
which depends on some other permutation.
"""
def uniform( n ):
"""
Generates random permutation using Knuth algorithm.
Args:
n (int) : length of permutation
Returns: random permutation of length n from uniform distribution
"""
#initialize permutation with identical
permutation = [ i for i in xrange( n ) ]
#swap ith object with random onject from i to n - 1 enclusively
for i in xrange( n ):
j = random.randint( i, n - 1 )
permutation[ i ], permutation[ j ] = permutation[ j ], permutation[ i ]
permutation.append(26)
return permutation
def applyedTranspostions( basePermutation ):
"""
This function returns random permutation by applying random traonspositions to given permutation.
The result distribution is not uniform and summetric assuming parameter.
Args:
basePermutation (array) : parameter of distribution
Returns: random permutation generated from basePermutation
"""
n = len( basePermutation )-1
"""
length of permutation
"""
permutation = copy( basePermutation )
"""
permutation to retur after some mpdifications
we use a copy method, because initial arguments must leaved uncahnge
"""
#apply n random transpositions (including identical) to base permutation
for i in xrange( n ):
k, l = random.randint( 0, n - 1 ), random.randint( 0, n - 1 )
permutation[ k ], permutation[ l ] = permutation[ l ], permutation[ k ]
return permutation
desiredPDF (improved)
In [131]:
def get_bigram_stats_array(text):
length = len(text)
stats = np.zeros((27,27))
for i in xrange(length-1):
c = ord(text[i])
d = ord(text[i+1])
stats[c-97, d-97]+=1
#97-122, 123 - word delimiter
for i in xrange(26):
stats[i] = stats[i]/sum(stats[i,:26])
return stats[:26,:26]
def get_desiredPDF_bigram(permutation):
logp = 0
for i in xrange(len(encrypted)-1):
if ord(encrypted[i]) != 123 and ord(encrypted[i+1]) != 123:
if stats[permutation[ord(encrypted[i])-97],
permutation[ord(encrypted[i+1])-97]] > 0:
logp += math.log(stats[permutation[ord(encrypted[i])-97],
permutation[ord(encrypted[i+1])-97]])
else:
logp += -9 #penalty for non existant pairs
return logp
Fix large (e.g. 5000 or more words) encrypted text and explore how the ratio of correctly decrypted symbols depends on the size of training text (using the same number of MCMC iterations)
In [32]:
fname = 'main/oliver_twist.txt'
original = read_text_words(fname, 5000)
encrypted,p = crypt(original)
print encrypted[:20]
In [ ]:
sizes = [2,4,8,16]
for s in sizes:
i=0
train_text = read_text_filesize('main/super.txt', s)
counts = get_unicount(train_text)
bistats = get_bigram_stats_dic(train_text)
# print chr(np.argmax(counts)+97)
# print max(bistats.iteritems(), key=operator.itemgetter(1))[0]
#Metropolis here
#decryption
#output - decrypted text
# qs = np.zeros(len(sizes))
# qs[i] = quality(decrypted, original)
# i+=1
print train_text[:1000]
# plt.plot(sizes, qs)
In [132]:
#TEST TEXT
fname = 'main/oliver_twist.txt'
original = read_text_words(fname, 1000)
encrypted, p = crypt(original)
#TRAIN TEXT
length = 575514
train_text = read_text_words('main/war_and_peace.txt', length)
# counts = get_unicount(train_text)
stats = get_bigram_stats_array(train_text)
# print stats
print p
bp = np.zeros(27, dtype=int)
for i in p:
bp[p[i]] = i
q = get_desiredPDF_bigram(bp)
print bp
print q
ra = uniform(26)
q = get_desiredPDF_bigram(ra)
print q
In [141]:
import time
st = time.time()
computableGen = lambda t: applyedTranspostions(t)
# computableGen = applyedTranspostions
init_p = uniform(26)
metropolisgenerator = \
metropolis(get_desiredPDF_bigram, init_p, computableGen )
# densityMaximization(get_desiredPDF_bigram, init_p, computableGen )
x = []
y = []
for i in xrange( 10 ):
a,b = metropolisgenerator.next()
x.append(a)
y.append(b)
et = time.time() - st
print et
In [142]:
for i in xrange(10):
print x[i] , ' | ', y[i]
In [143]:
per = x[9]
for i in xrange(len(per)):
print (ord('a') + i) == (ord('a') + per[p[i]])
In [ ]: