In [20]:
%pprint ON #pretty printing
import pdb #debugger
import math
import numpy as np
Map Reduce is the programming approach to the big data, when we have to distributed the dataset across multiple nodes. This approach is required as with multiple servers (nodes) chance of failure increase rapidly -- we need a way of eliminating this event affecting our calculations. Map Reduce simplify programming model, move computation close to the data and maintains redundancy, preventing system failure. The whole approach consist of following steps:
To solve for simple equations we could use Gaussian elimination, but for any real life problem we need to use matrix approach, in which flow equation can be written as $r = Mr$. This is similar to eigenvector problem, where r is first egienvector (largest possible value == 1) of stochastic web matrix M (web page x links, j x i). This means we can use power iteration to compute r
</a>
Lets see this coded:
In [51]:
import math
import numpy as np
#returns converge r, require web matrix M
def PowerIteration (M,allowedIterations=300):
iterationIdx = 0
#init r as (n,1) matrix
numPages= M.shape[0]
r = np.asmatrix(np.ones(numPages)/float(numPages)).T #[1/N...]
#iterate until converge
iterationDiff = 1
while iterationDiff>0:
if iterationIdx>=allowedIterations:
print "Iteration limit of %i reached, exiting..." %iterationIdx
break
r_old = r
r = M *r
iterationDiff = max(abs((r-r_old)))
iterationIdx += 1
return r
In [52]:
#define M for pages (y,a,m)
M = np.matrix([[0.5, 0.5, 0],
[.5, 0, 1],
[0, .5, 0]])
NormalRanking = PowerIteration(M)
print(NormalRanking)
</a> If we consider our algorithm from surfer perspective, we have to include probability in previous equation, so for each epoch t we have $p_{t+1}=Mp_t$. Hence our flow equation can be interpreted as stationary distribution of random walk (that is when $p_{t+1}=p_t$) and from Markov processes we know that, if we fulfil all conditions, stationary distribution is unique.
Given that, does our algorithm always coverage to corect values? Not always:
In [54]:
#define M with spider trap at page m
M = np.matrix([[0.5, 0.5, 0],
[.5, 0, 0],
[0, .5, 1]])
SpiderTrapRanking = PowerIteration(M)
#SpiderTrapRanking - NormalRanking
SpiderTrapRanking
Out[54]:
In [56]:
#define M with dead end at page m
M = np.matrix([[0.5, 0.5, 0],
[.5, 0, 0],
[0, .5, 0]])
DeadEndRanking = PowerIteration(M)
DeadEndRanking
Out[56]:
Most of our ranking has leaked out and PageRanking does not longer sum to 1. Solution to this problem is to add teleport with probability 1 once we read dead-end. Combining both problems we can rewrite our orginal equation as $r_j = \sum_{i\rightarrow j} \beta \frac{r_i}{d_i}+(1-\beta) \frac{1}{n}$ .
Folloowing follows markow chains, to do so our matrix M has to be stochastic (all columns sum to 1, teleport allows this for dead ends), irreducible (we cant get stuck in a give state) and aperiodic (there is no repeatability in our data). This is possible by indroducing random jumps, this is In case of matrix this means introducing new matrix $A = \beta M +(1-\beta) \frac{1}{n}ee^T$, which we then use for flow equation, hence final equation is $r_{t+1}= A r_t$. Code below demonstrate this
In [57]:
#returns converge r, require web matrix M, stayProbability, MaxAllowedIterations last value is optional
#stayProbability, \beta - how often will random walker just follow the link
#otherwords:: JumpProbability = 1-stayProbability
def PowerIteration (M,stayProbability,allowedIterations=300):
covergeAccuracy = 10**-15
iterationDiff = 1
iterationIdx = 1
#pdb.set_trace()
#init r as (n,1) matrix normalised to 1
numPages= M.shape[0]
r = np.asmatrix(np.ones(numPages)/float(numPages)).T #[1/N...]
#calculate matrix A
A = stayProbability*M + np.ones((numPages,numPages))*(1-stayProbability)/float(numPages)
#iterate until converge (within given acc) or no of iterations exceed allowed limit
while (iterationDiff>covergeAccuracy):
if iterationIdx>allowedIterations:
print "Iteration limit of %i reached, exiting..." %iterationIdx
break
r_old = r
r = A *r
iterationDiff = max(abs((r-r_old)))
iterationIdx +=1
return (A,r*numPages)
Matrix A is the most memory hungry so lets avoid using it. To do so we focus on using matrix M instead and "tax" it for leaked ranking after each iteration. To do so we will use equation $r = \beta M r + [\frac{1-\beta}{n}]_n$. This way we only need smaller matrix M for our calculations, corrected after each iteration by $[\frac{1-\beta}{n}]_n$, which distribute some of our weight equally to other pages.
In [35]:
#returns converge r, require web matrix M, stayProbability, MaxAllowedIterations last value is optional
#stayProbability, \beta - how often will random walker just follow the link
#otherwords:: JumpProbability = 1-stayProbability
def PowerIteration (M,stayProbability,allowedIterations=300):
covergeAccuracy = 10**-15
iterationDiff = 1
iterationIdx = 1
#init r as (n,1) matrix normalised to 1
numPages= M.shape[0]
r = np.asmatrix(np.ones(numPages)/float(numPages)).T #[1/N...]
#correct matrix M
M = stayProbability*M
#create leak correction to each r
leakageCorrection = np.asmatrix(np.ones((numPages))*(1-stayProbability)/float(numPages)).T
#pdb.set_trace()
#iterate until converge (within given acc) or no of iterations exceed allowed limit
while (iterationDiff>covergeAccuracy):
if iterationIdx>allowedIterations:
print "Iteration limit of %i reached, exiting..." %iterationIdx
break
r_old = r
r = M *r
r = r + leakageCorrection
iterationDiff = max(abs((r-r_old)))
iterationIdx +=1
return (A,r*numPages)
Suppose we compute PageRank with a β of 0.7, and we introduce the additional constraint that the sum of the PageRanks of the three pages must be 3, to handle the problem that otherwise any multiple of a solution will also be a solution. Compute the PageRanks a, b, and c of the three pages A, B, and C, respectively. Then, identify from the list below, the true statement.
To start with lets define final PowerIteration function solving for both dead ends and spider traps.
Then we can define matrix M, remembering that in Matrix M - page on col i point to page on row j. Hence
In [58]:
M = np.matrix([
[0,0,0],
[.5,0,0],
[0.5,1,1]
])
A,PageRank = PowerIteration(M,0.7)
print PageRank
print "a+c = %.3f" %(PageRank[0]+PageRank[2])
print "b+c = %.3f" %(PageRank[1]+PageRank[2])
print "a+b = %.3f" %(PageRank[0]+PageRank[1])
In [59]:
M = np.matrix([
[0,0,1],
[.5,0,0],
[0.5,1,0]
])
A,PageRank = PowerIteration(M,0.85)
#show calculated matrix A
print A
#convert matrix to equation
print "a= %.3fb+ %.3fc" %(A[0,1]/(1-A[0,0]),A[0,2]/(1-A[0,0]))
print "b= %.3fa+ %.3fc" %(A[1,0]/(1-A[1,1]),A[1,2]/(1-A[1,1]))
print "c= %.3fa+ %.3fb" %(A[2,0]/(1-A[2,2]),A[2,1]/(1-A[2,2]))
Consider three Web pages with the following links:
Assuming no "taxation," compute the PageRanks a, b, and c of the three pages A, B, and C, using iteration, starting with the "0th" iteration where all three pages have rank a = b = c = 1. Compute as far as the 5th iteration, and also determine what the PageRanks are in the limit. Then, identify the true statement from the list below.
In [60]:
M = np.matrix([
[0,0,1],
[.5,0,0],
[0.5,1,0]
])
r = np.asmatrix(np.ones(3)).T #[1,1,1]
numPages= M.shape[0]
for idx in range(5): # 5 iterations
r_old = r
r = A *r
iterationDiff = r - r_old
print "Last iteration value:"
print r
print "\nDifference from previous iteration:"
print iterationDiff
Suppose our input data to a map-reduce operation consists of integer values (the keys are not important). The map function takes an integer i and produces the list of pairs (p,i) such that p is a prime divisor of i. For example, map(12) = [(2,12), (3,12)]. The reduce function is addition. That is, reduce(p, $[i_1, i_2, ...,i_k])$ is $(p,i_1+i_2+...+i_k)$.
Compute the output, if the input is the set of integers 15, 21, 24, 30, 49. Then, identify, in the list below, one of the pairs in the output.
In [11]:
def GetAllPrimeFactors(inputInteger):
idx = 2
factors = []
while idx * idx <= inputInteger:
if inputInteger % idx:
idx += 1
else:
inputInteger //= idx
factors.append(idx)
if inputInteger > 1:
factors.append(inputInteger)
#get only unique
factors = list(set(factors))
return factors
######################################
#Our mapping fuction returning (key,integer)
def mappingFuction(inputInteger):
allPrimeFactors = GetAllPrimeFactors(inputInteger)
mappedKeys = []
for key in allPrimeFactors:
mappedKeys.append((key,inputInteger))
return mappedKeys
######################################
#Our reduce fuction returning (sum of integers)
def reduceFuction(mappedKeys):
reducedKeys = dict()
lastKey = None
for idx in mappedKeys:
#pdb.set_trace()
key = idx[0]
if key == lastKey: #lets add
reducedKeys[key] += idx[1]
else:
reducedKeys[key] = idx[1] #its first key
lastKey = key
return reducedKeys
In [14]:
integers = [4,12,15, 21, 24, 30, 49]
#First lets map values
mappedValues = map((lambda x: mappingFuction(x)),integers) #list of lists
mappedValues = [val for sublist in mappedValues for val in sublist] #flatten it
#then sort them
mappedValues = sorted(mappedValues, key=lambda key: key[0])
print "Those are our mapped values"
print mappedValues
#Then lest reduce it
reducedValues = reduceFuction(mappedValues)
print "\nThose are our reduced (final) values"
print reducedValues