In [20]:
%pprint ON #pretty printing
import pdb #debugger
import math
import numpy as np


Pretty printing has been turned OFF

Mining massive datasets week01

Map reduce

Map Reduce is the programming approach to the big data, when we have to distributed the dataset across multiple nodes. This approach is required as with multiple servers (nodes) chance of failure increase rapidly -- we need a way of eliminating this event affecting our calculations. Map Reduce simplify programming model, move computation close to the data and maintains redundancy, preventing system failure. The whole approach consist of following steps:

  • take input values
  • group input by the key (map) (equivalent of sorting values)
  • reduce those groups (by for ex by summing them)
  • output a single value for the key

Page Ranking

To solve for simple equations we could use Gaussian elimination, but for any real life problem we need to use matrix approach, in which flow equation can be written as $r = Mr$. This is similar to eigenvector problem, where r is first egienvector (largest possible value == 1) of stochastic web matrix M (web page x links, j x i). This means we can use power iteration to compute r

Power iteration

</a>

  • initialise with $r_0= [\frac{1}{N} \cdots \frac{1}{N}]$
  • iterate $r^{t+1} = Mr^t$ until $| r^{t+1} - r^t | < \varepsilon$ so using L1 norm
    • note that in Matrix M - page on col i point to page on row j.

Lets see this coded:


In [51]:
import math
import numpy as np


#returns converge r, require web matrix M 
def PowerIteration (M,allowedIterations=300):
    
  iterationIdx = 0

  #init r as (n,1) matrix
  numPages= M.shape[0]
  r = np.asmatrix(np.ones(numPages)/float(numPages)).T #[1/N...]

  #iterate until converge
  iterationDiff = 1
  while iterationDiff>0:  
    if iterationIdx>=allowedIterations:
      print "Iteration limit of %i reached, exiting..." %iterationIdx
      break
    r_old = r
    r = M *r
    iterationDiff = max(abs((r-r_old)))
    iterationIdx += 1
  return r

In [52]:
#define M for pages (y,a,m)
M = np.matrix([[0.5, 0.5, 0],
               [.5, 0, 1],
               [0, .5, 0]])

NormalRanking = PowerIteration(M)
print(NormalRanking)


Iteration limit of 300 reached, exiting...
[[ 0.4]
 [ 0.4]
 [ 0.2]]

Random walk

</a> If we consider our algorithm from surfer perspective, we have to include probability in previous equation, so for each epoch t we have $p_{t+1}=Mp_t$. Hence our flow equation can be interpreted as stationary distribution of random walk (that is when $p_{t+1}=p_t$) and from Markov processes we know that, if we fulfil all conditions, stationary distribution is unique.

Given that, does our algorithm always coverage to corect values? Not always:

  • spider trap problem - page links to itself absorbing all score importance
  • dead end problem - problems with web pages without outgoing links, our score is leaked

Spider trap problem


In [54]:
#define M with spider trap at page m
M = np.matrix([[0.5, 0.5, 0],
               [.5, 0, 0],
               [0, .5, 1]])

SpiderTrapRanking = PowerIteration(M)
#SpiderTrapRanking - NormalRanking 
SpiderTrapRanking


Iteration limit of 300 reached, exiting...
Out[54]:
matrix([[  9.52055792e-29],
        [  5.88402839e-29],
        [  1.00000000e+00]])

Spider trap suck up all our markings, we end up (0,0,1) problem. Solution to this problem is to add teleport and define probability $1-\beta$ (commoonly 0.1-0.2) when walker will not follow random link but teleport to random page instead.

Dead end problem


In [56]:
#define M with dead end at page m
M = np.matrix([[0.5, 0.5, 0],
               [.5, 0, 0],
               [0, .5, 0]])

DeadEndRanking = PowerIteration(M)
DeadEndRanking


Iteration limit of 300 reached, exiting...
Out[56]:
matrix([[  9.52055792e-29],
        [  5.88402839e-29],
        [  3.63652953e-29]])

Most of our ranking has leaked out and PageRanking does not longer sum to 1. Solution to this problem is to add teleport with probability 1 once we read dead-end. Combining both problems we can rewrite our orginal equation as $r_j = \sum_{i\rightarrow j} \beta \frac{r_i}{d_i}+(1-\beta) \frac{1}{n}$ .

Explanation of teleports

Folloowing follows markow chains, to do so our matrix M has to be stochastic (all columns sum to 1, teleport allows this for dead ends), irreducible (we cant get stuck in a give state) and aperiodic (there is no repeatability in our data). This is possible by indroducing random jumps, this is In case of matrix this means introducing new matrix $A = \beta M +(1-\beta) \frac{1}{n}ee^T$, which we then use for flow equation, hence final equation is $r_{t+1}= A r_t$. Code below demonstrate this


In [57]:
#returns converge r, require web matrix M, stayProbability, MaxAllowedIterations last value is optional
#stayProbability, \beta - how often will random walker just follow the link
#otherwords:: JumpProbability = 1-stayProbability
def PowerIteration (M,stayProbability,allowedIterations=300):
  covergeAccuracy = 10**-15
  iterationDiff = 1
  iterationIdx = 1

  #pdb.set_trace()
  #init r as (n,1) matrix normalised to 1
  numPages= M.shape[0]
  r = np.asmatrix(np.ones(numPages)/float(numPages)).T #[1/N...]

  #calculate matrix A
  A = stayProbability*M + np.ones((numPages,numPages))*(1-stayProbability)/float(numPages)

  #iterate until converge (within given acc) or no of iterations exceed allowed limit
  while (iterationDiff>covergeAccuracy):
    if iterationIdx>allowedIterations:
      print "Iteration limit of %i reached, exiting..." %iterationIdx
      break
    r_old = r
    r = A *r
    iterationDiff = max(abs((r-r_old)))
    iterationIdx +=1
    
  return (A,r*numPages)

What if it does not fit into memory of our machine?

Matrix A is the most memory hungry so lets avoid using it. To do so we focus on using matrix M instead and "tax" it for leaked ranking after each iteration. To do so we will use equation $r = \beta M r + [\frac{1-\beta}{n}]_n$. This way we only need smaller matrix M for our calculations, corrected after each iteration by $[\frac{1-\beta}{n}]_n$, which distribute some of our weight equally to other pages.


In [35]:
#returns converge r, require web matrix M, stayProbability, MaxAllowedIterations last value is optional
#stayProbability, \beta - how often will random walker just follow the link
#otherwords:: JumpProbability = 1-stayProbability
def PowerIteration (M,stayProbability,allowedIterations=300):
  covergeAccuracy = 10**-15
  iterationDiff = 1
  iterationIdx = 1


  #init r as (n,1) matrix normalised to 1
  numPages= M.shape[0]
  r = np.asmatrix(np.ones(numPages)/float(numPages)).T #[1/N...]

  #correct matrix M
  M = stayProbability*M
  
  #create leak correction to each r
  leakageCorrection = np.asmatrix(np.ones((numPages))*(1-stayProbability)/float(numPages)).T
  #pdb.set_trace()
  #iterate until converge (within given acc) or no of iterations exceed allowed limit
  while (iterationDiff>covergeAccuracy):
    if iterationIdx>allowedIterations:
      print "Iteration limit of %i reached, exiting..." %iterationIdx
      break
    r_old = r
    r = M *r
    r = r + leakageCorrection
    iterationDiff = max(abs((r-r_old)))
    iterationIdx +=1
    
  return (A,r*numPages)

Week1 tests

Question 1

Suppose we compute PageRank with a β of 0.7, and we introduce the additional constraint that the sum of the PageRanks of the three pages must be 3, to handle the problem that otherwise any multiple of a solution will also be a solution. Compute the PageRanks a, b, and c of the three pages A, B, and C, respectively. Then, identify from the list below, the true statement. .

Solution

To start with lets define final PowerIteration function solving for both dead ends and spider traps.

Then we can define matrix M, remembering that in Matrix M - page on col i point to page on row j. Hence


In [58]:
M = np.matrix([
        [0,0,0],
        [.5,0,0],
        [0.5,1,1]      
    ])
A,PageRank = PowerIteration(M,0.7)
print PageRank

print "a+c = %.3f" %(PageRank[0]+PageRank[2])
print "b+c = %.3f" %(PageRank[1]+PageRank[2])
print "a+b = %.3f" %(PageRank[0]+PageRank[1])


[[ 0.3  ]
 [ 0.405]
 [ 2.295]]
a+c = 2.595
b+c = 2.700
a+b = 0.705

Question 2

Consider three Web pages with the following links: .

Suppose we compute PageRank with β=0.85. Write the equations for the PageRanks a, b, and c of the three pages A, B, and C, respectively. Then, identify in the list below, one of the equations.


In [59]:
M = np.matrix([
        [0,0,1],
        [.5,0,0],
        [0.5,1,0]      
    ])
A,PageRank = PowerIteration(M,0.85)

#show calculated matrix A
print A

#convert matrix to equation
print "a= %.3fb+ %.3fc" %(A[0,1]/(1-A[0,0]),A[0,2]/(1-A[0,0]))
print "b= %.3fa+ %.3fc" %(A[1,0]/(1-A[1,1]),A[1,2]/(1-A[1,1]))
print "c= %.3fa+ %.3fb" %(A[2,0]/(1-A[2,2]),A[2,1]/(1-A[2,2]))


[[ 0.05   0.05   0.9  ]
 [ 0.475  0.05   0.05 ]
 [ 0.475  0.9    0.05 ]]
a= 0.053b+ 0.947c
b= 0.500a+ 0.053c
c= 0.500a+ 0.947b

Question 3

Consider three Web pages with the following links: .

Assuming no "taxation," compute the PageRanks a, b, and c of the three pages A, B, and C, using iteration, starting with the "0th" iteration where all three pages have rank a = b = c = 1. Compute as far as the 5th iteration, and also determine what the PageRanks are in the limit. Then, identify the true statement from the list below.


In [60]:
M = np.matrix([
        [0,0,1],
        [.5,0,0],
        [0.5,1,0]      
    ])
r =  np.asmatrix(np.ones(3)).T #[1,1,1]
numPages= M.shape[0]

for idx in range(5): # 5 iterations
  r_old = r
  r = A *r
  iterationDiff = r - r_old
  

print "Last iteration value:"
print r
print "\nDifference from previous iteration:"
print iterationDiff


Last iteration value:
[[ 1.18468906]
 [ 0.65349285]
 [ 1.16181809]]

Difference from previous iteration:
[[ 0.        ]
 [ 0.05546316]
 [-0.05546316]]

Question 4

Suppose our input data to a map-reduce operation consists of integer values (the keys are not important). The map function takes an integer i and produces the list of pairs (p,i) such that p is a prime divisor of i. For example, map(12) = [(2,12), (3,12)]. The reduce function is addition. That is, reduce(p, $[i_1, i_2, ...,i_k])$ is $(p,i_1+i_2+...+i_k)$.

Compute the output, if the input is the set of integers 15, 21, 24, 30, 49. Then, identify, in the list below, one of the pairs in the output.


In [11]:
def GetAllPrimeFactors(inputInteger):
  idx = 2
  factors = []
  while idx * idx <= inputInteger:
    if inputInteger % idx:
      idx += 1
    else:
      inputInteger //= idx
      factors.append(idx)

  if inputInteger > 1:
    factors.append(inputInteger)
        
  #get only unique
  factors = list(set(factors))
  return factors

######################################
#Our mapping fuction returning (key,integer)
def mappingFuction(inputInteger):
  allPrimeFactors = GetAllPrimeFactors(inputInteger)
  mappedKeys = []
  for key in allPrimeFactors:
    mappedKeys.append((key,inputInteger))
  return mappedKeys

######################################
#Our reduce fuction returning (sum of integers)
def reduceFuction(mappedKeys):
  reducedKeys = dict()
  lastKey = None
  for idx in mappedKeys:
    #pdb.set_trace()
    key = idx[0]
    if key == lastKey: #lets add
      reducedKeys[key] += idx[1]
    else:
      reducedKeys[key] = idx[1] #its first key
    lastKey = key

  return reducedKeys

In [14]:
integers = [4,12,15, 21, 24, 30, 49]

#First lets map values

mappedValues =  map((lambda x: mappingFuction(x)),integers) #list of lists
mappedValues = [val for sublist in mappedValues for val in sublist] #flatten it
#then sort them
mappedValues = sorted(mappedValues, key=lambda key: key[0])
print "Those are our mapped values"
print mappedValues


#Then lest reduce it
reducedValues = reduceFuction(mappedValues)
print "\nThose are our reduced (final) values"
print reducedValues


Those are our mapped values
[(2, 4), (2, 12), (2, 24), (2, 30), (3, 12), (3, 15), (3, 21), (3, 24), (3, 30), (5, 15), (5, 30), (7, 21), (7, 49)]

Those are our reduced (final) values
{2: 70, 3: 102, 5: 45, 7: 70}

Interesting links to follow