Count Min Sketch

Twitter users use free-form hashtags to indicate a tweet is attached to an idea or a cause. E.g. #tacotuesday #firstworldproblems #metoo etc.

The typical stream of tweets is 6-10k tweets/second throughout the day, with spikes 10x larger. And a tweet can have 0-10s of hashtags.

Imagine we want to estimate the rank of hashtags for multiple timescales.

#WednesdayWisdom Example Stream

Strategy 1: Keep exact count

Build a data structure:

  • represents all unique values ($i_t$) of the hashtags
  • counts the occurances ($c_t$)
  • keeps track of time period of an occurance ($c_t^\Delta$)

is the size of the cardinality of the stream (the number of unique hashtags in time period $\Delta$) x (number of time periods we need to keep to make our rankings).

Computation time is determined by:

  • key lookup ($O(1)$)
  • counter increment by 1

So this solution is fast and grows linearly with cardinality.

Strategy 2: Keep approximate count

Come up with a data structure that does not grow linearly with cardiality and gives a bounded-error estimate of the ranking.

Basic Idea of Count Min sketch

The strategy:

  • Map the input value ($i_t$) to multiple points in a relatively small output space. An increment each of those locations when we see that item.
  • In this strategy, the count associated with a given input will be applied to multiple counts in the output space.
  • Collisions will occur. That is, different values may sometimes increment the same cell.
  • But, the minimum count associated with a given input will have some desirable properties, including the ability to be used to estimate the largest N counts.

http://debasishg.blogspot.com/2014/01/count-min-sketch-data-structure-for.html

So assume we have d pairwise-independent hash functions {h1 .. hd} that hash each of our inputs to the range (1 .. w)

Parameters of the sketch:

  • $\epsilon$
  • $\delta$

These parameters are inversely and exponentially (respectively) related to the sketch size parameters, d and w.

The reason to parameterize the data structure by these factors - $\epsilon$ and $\delta$, is the error in answering the rank query is within a factor of $\epsilon$ with probability $\delta$.

Strategy 1 Implementat Counting


In [ ]:
# define a function to return a list of the exact top users, sorted by count
def exact_top_users(f, top_n = 10):
    import operator
    counts = {}
    for user in f:
        user = user.rstrip('\n')
        try:
            if user not in counts:
                counts[user] = 1
            else:
                counts[user] += 1
        except ValueError:
            pass
        except KeyError:
            pass
    print("exact data structure counters = {}".format(len(counts)))
    counter = 0
    results = []
    for user,count in reversed(sorted(counts.items(), key=operator.itemgetter(1))):
        if counter >= top_n:
            break
        results.append('{},{}'.format(user,str(count)))
        counter += 1
    return results
# note that the output format is '[user] [count]'

Strategy 2: Implementation of the CM sketch


In [ ]:
import sys
import random
import numpy as np
import heapq
import json
import time

BIG_PRIME = 9223372036854775783

def random_parameter():
    return random.randrange(0, BIG_PRIME - 1)


class Sketch:
    def __init__(self, delta, epsilon, k):
        """
        Setup a new count-min sketch with parameters delta, epsilon and k

        The parameters delta and epsilon control the accuracy of the
        estimates of the sketch

        Cormode and Muthukrishnan prove that for an item i with count a_i, the
        estimate from the sketch a_i_hat will satisfy the relation

        a_hat_i <= a_i + epsilon * ||a||_1

        with probability at least 1 - delta, where a is the the vector of all
        all counts and ||x||_1 is the L1 norm of a vector x

        Parameters
        ----------
        delta : float
            A value in the unit interval that sets the precision of the sketch
        epsilon : float
            A value in the unit interval that sets the precision of the sketch
        k : int
            A positive integer that sets the number of top items counted

        Examples
        --------
        >>> s = Sketch(10**-7, 0.005, 40)

        Raises
        ------
        ValueError
            If delta or epsilon are not in the unit interval, or if k is
            not a positive integer

        """
        if delta <= 0 or delta >= 1:
            raise ValueError("delta must be between 0 and 1, exclusive")
        if epsilon <= 0 or epsilon >= 1:
            raise ValueError("epsilon must be between 0 and 1, exclusive")
        if k < 1:
            raise ValueError("k must be a positive integer")

        self.w = int(np.ceil(np.exp(1) / epsilon))
        self.d = int(np.ceil(np.log(1 / delta)))
        print("cm data Structure: {} hashes x {} cells = {} counters".format(self.d, self.w, self.d*self.w))
        self.k = k
        self.hash_functions = [self.__generate_hash_function() for i in range(self.d)]
        self.count = np.zeros((self.d, self.w), dtype='int32')
        self.heap, self.top_k = [], {} # top_k => [estimate, key] pairs

    def update(self, key, increment):
        """
        Updates the sketch for the item with name of key by the amount
        specified in increment

        Parameters
        ----------
        key : string
            The item to update the value of in the sketch
        increment : integer
            The amount to update the sketch by for the given key

        Examples
        --------
        >>> s = Sketch(10**-7, 0.005, 40)
        >>> s.update('http://www.cnn.com/', 1)

        """
        for row, hash_function in enumerate(self.hash_functions):
            column = hash_function(abs(hash(key)))
            self.count[row, column] += increment

        self.update_heap(key)

    def update_heap(self, key):
        """
        Updates the class's heap that keeps track of the top k items for a
        given key

        For the given key, it checks whether the key is present in the heap,
        updating accordingly if so, and adding it to the heap if it is
        absent

        Parameters
        ----------
        key : string
            The item to check against the heap

        """
        estimate = self.get(key)

        if not self.heap or estimate >= self.heap[0][0]:
            if key in self.top_k:
                old_pair = self.top_k.get(key)
                old_pair[0] = estimate
                heapq.heapify(self.heap)
            else:
                if len(self.top_k) < self.k:
                    heapq.heappush(self.heap, [estimate, key])
                    self.top_k[key] = [estimate, key]
                else:
                    new_pair = [estimate, key]
                    old_pair = heapq.heappushpop(self.heap, new_pair)
                    if new_pair[1] != old_pair[1]:
                        del self.top_k[old_pair[1]]
                        self.top_k[key] = new_pair
                    self.top_k[key] = new_pair

    def get(self, key):
        """
        Fetches the sketch estimate for the given key

        Parameters
        ----------
        key : string
            The item to produce an estimate for

        Returns
        -------
        estimate : int
            The best estimate of the count for the given key based on the
            sketch

        Examples
        --------
        >>> s = Sketch(10**-7, 0.005, 40)
        >>> s.update('http://www.cnn.com/', 1)
        >>> s.get('http://www.cnn.com/')
        1

        """
        value = sys.maxsize
        for row, hash_function in enumerate(self.hash_functions):
            column = hash_function(abs(hash(key)))
            value = min(self.count[row, column], value)

        return value

    def __generate_hash_function(self):
        """
        Returns a hash function from a family of pairwise-independent hash
        functions

        """
        a, b = random_parameter(), random_parameter()
        return lambda x: (a * x + b) % BIG_PRIME % self.w

In [ ]:
! head CM_small.txt
! cat CM_small.txt | sort | uniq -c | sort -n

In [ ]:
f = open('CM_small.txt')
results_exact = sorted(exact_top_users(f))
print("\n".join(results_exact))

In [ ]:
# define a function to return a list of the estimated top users, sorted by count
def CM_top_users(f, s, top_n = 10):
    for user_name in f:
        s.update(user_name.rstrip('\n'),1)
    
    results = []
    counter = 0
    for value in reversed(sorted(s.top_k.values())):
        if counter >= top_n:
            break
        results.append('{1},{0}'.format(str(value[0]),str(value[1])))
        counter += 1
    return results
# note that the output format is '[user] [count]'

In [ ]:
# instantiate a Sketch object
s = Sketch(10**-3, 0.1, 10)

In [ ]:
f = open('CM_small.txt')
results_CM = sorted(CM_top_users(f,s))
print("\n".join(results_CM))

In [ ]:
for item in zip(results_exact,results_CM):
    print(item)

Is it possible to make the sketch so coarse that its estimates are wrong even for this data set?


In [ ]:
s = Sketch(0.9, 0.9, 10)
f = open('CM_small.txt')
results_coarse_CM = CM_top_users(f,s)
print("\n".join(results_coarse_CM))

Yes! (if you try enough) Why?

  • The 'w' parameter goes like $\text{ceiling}\exp(1/\epsilon)$, which is always >=~ 3.
  • The 'd' parameter goes like $\text{ceiling}\log(1/\delta)$, which is always >= 1.

So, you're dealing with a space with minimum size 3 x 1. With 10 records, it's possible that all 4 users map their counts to the point. So it's possible to see an estimate as high as 10, in this case.

Now for a larger data set...


In [ ]:
! wc -l CM_large.txt
! cat CM_large.txt | sort | uniq | wc -l
! cat CM_large.txt | sort | uniq -c | sort -rn

In [ ]:
f = open('CM_large.txt')
%time results_exact = exact_top_users(f)
print("\n".join(results_exact))

In [ ]:
# this could take a few minutes
f = open('CM_large.txt')
s = Sketch(10**-4, 10**-4, 10)
%time results_CM = CM_top_users(f,s)
print("\n".join(results_CM))

For this precision and dataset size, the CM algo takes much longer than the exact solution. In fact, the crossover point at which the CM sketch can achieve reasonable accuracy in the same time as the exact solution is a very large number of entries.


In [ ]:
for item in zip(results_exact,results_CM):
    print(item)

In [ ]:
# the CM sketch gets the top entry (an outlier) correct but doesn't do well 
# estimating the order of the more degenerate counts

# let's decrease the precision via both the epsilon and delta parameters, 
# and see whether it still gets the "heavy-hitter" correct

f = open('CM_large.txt')
s = Sketch(10**-3, 10**-2, 50)
%time results_CM = CM_top_users(f,s)
print("\n".join(results_CM))

In [ ]:
# nope...sketch is too coarse, too many collisions, and the prominence of user 'user_0 129' is obscured
for item in zip(results_exact,results_CM):
    print(item)

The most common use of the CM sketch is analysis of streaming data. Why?

  • Becasue the data are arriving in real time, the hashing of the inputs is not a bottleneck as it is when the data are already collected.
  • Similarly, the recalculation of the top-k list (implemented as a heap, in this case) is done on insert. No need to sort the entire list.
  • The sketches are associative, meaning that the operation can be easily parallelized, and the results combined in the end.
  • Dot-product estimations of combinations of items

Take away: use the CM sketch to estimate of the top-k most frequent elements in a streaming environment.


In [ ]: