Deep Learning with TensorFlow

Credits: Forked from TensorFlow by Google

Setup

Refer to the setup instructions.

Exercise 5

The goal of this exercise is to train a skip-gram model over Text8 data.


In [3]:
# These are all the modules we'll be using later. Make sure you can import them
# before proceeding further.
import collections
import math
import numpy as np
import os
import random
import tensorflow as tf
import urllib
import zipfile
from matplotlib import pylab
from sklearn.manifold import TSNE

Download the data from the source website if necessary.


In [4]:
#url = 'http://mattmahoney.net/dc/'
import urllib.request
url = urllib.request.urlretrieve("http://mattmahoney.net/dc/")
def maybe_download(filename, expected_bytes):
  """Download a file if not present, and make sure it's the right size."""
  if not os.path.exists(filename):
    filename, _ = urllib.request.urlretrieve(url + filename, filename)
  statinfo = os.stat(filename)
  if statinfo.st_size == expected_bytes:
    print ('Found and verified', filename)
  else:
    print (statinfo.st_size)
    raise Exception('Failed to verify ' + filename + '. Can you get to it with a browser?')
  return filename

#filename = maybe_download("text8.zip",31344016)

Read the data into a string.


In [5]:
filename=("text8.zip")
def read_data(filename):
  f = zipfile.ZipFile(filename)
  for name in f.namelist():
    return f.read(name).split()
  f.close()
  
words = read_data(filename)
print ('Data size', len(words))


Data size 17005207

Build the dictionary and replace rare words with UNK token.


In [6]:
vocabulary_size = 50000

def build_dataset(words):
  count = [['UNK', -1]]
  count.extend(collections.Counter(words).most_common(vocabulary_size - 1))
  dictionary = dict()
  for word, _ in count:
    dictionary[word] = len(dictionary)
  data = list()
  unk_count = 0
  for word in words:
    if word in dictionary:
      index = dictionary[word]
    else:
      index = 0  # dictionary['UNK']
      unk_count = unk_count + 1
    data.append(index)
  count[0][1] = unk_count
  reverse_dictionary = dict(zip(dictionary.values(), dictionary.keys())) 
  return data, count, dictionary, reverse_dictionary

data, count, dictionary, reverse_dictionary = build_dataset(words)
print ('Most common words (+UNK)', count[:5])
print ('Sample data', data[:10])
del words  # Hint to reduce memory.


Most common words (+UNK) [['UNK', 418391], (b'the', 1061396), (b'of', 593677), (b'and', 416629), (b'one', 411764)]
Sample data [5234, 3081, 12, 6, 195, 2, 3135, 46, 59, 156]

Function to generate a training batch for the skip-gram model.


In [7]:
data_index = 0

def generate_batch(batch_size, num_skips, skip_window):
  global data_index
  assert batch_size % num_skips == 0
  assert num_skips <= 2 * skip_window
  batch = np.ndarray(shape=(batch_size), dtype=np.int32)
  labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)
  span = 2 * skip_window + 1 # [ skip_window target skip_window ]
  buffer = collections.deque(maxlen=span)
  for _ in range(span):
    buffer.append(data[data_index])
    data_index = (data_index + 1) % len(data)
  for i in range(int(batch_size / num_skips)):
    target = skip_window  # target label at the center of the buffer
    targets_to_avoid = [ skip_window ]
    for j in range(num_skips):
      while target in targets_to_avoid:
        target = random.randint(0, span - 1)
      targets_to_avoid.append(target)
      batch[i * num_skips + j] = buffer[skip_window]
      labels[i * num_skips + j, 0] = buffer[target]
    buffer.append(data[data_index])
    data_index = (data_index + 1) % len(data)
  return batch, labels

batch, labels = generate_batch(batch_size=8, num_skips=2, skip_window=1)
for i in range(8):
    print (batch[i], '->', labels[i, 0])
    print (reverse_dictionary[batch[i]], '->', reverse_dictionary[labels[i, 0]])


3081 -> 12
b'originated' -> b'as'
3081 -> 5234
b'originated' -> b'anarchism'
12 -> 3081
b'as' -> b'originated'
12 -> 6
b'as' -> b'a'
6 -> 195
b'a' -> b'term'
6 -> 12
b'a' -> b'as'
195 -> 6
b'term' -> b'a'
195 -> 2
b'term' -> b'of'

Train a skip-gram model.


In [8]:
batch_size = 128
embedding_size = 128 # Dimension of the embedding vector.
skip_window = 1 # How many words to consider left and right.
num_skips = 2 # How many times to reuse an input to generate a label.
# We pick a random validation set to sample nearest neighbors. here we limit the
# validation samples to the words that have a low numeric ID, which by
# construction are also the most frequent. 
valid_size = 16 # Random set of words to evaluate similarity on.
valid_window = 100 # Only pick dev samples in the head of the distribution.
valid_examples = np.array(random.sample(range(valid_window), valid_size))
num_sampled = 64 # Number of negative examples to sample.

graph = tf.Graph()

with graph.as_default():

  # Input data.
  train_dataset = tf.placeholder(tf.int32, shape=[batch_size])
  train_labels = tf.placeholder(tf.int32, shape=[batch_size, 1])
  valid_dataset = tf.constant(valid_examples, dtype=tf.int32)
  
  # Variables.
  embeddings = tf.Variable(
    tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))
  softmax_weights = tf.Variable(
    tf.truncated_normal([vocabulary_size, embedding_size],
                         stddev=1.0 / math.sqrt(embedding_size)))
  softmax_biases = tf.Variable(tf.zeros([vocabulary_size]))
  
  # Model.
  # Look up embeddings for inputs.
  embed = tf.nn.embedding_lookup(embeddings, train_dataset)
  # Compute the softmax loss, using a sample of the negative labels each time.
  loss = tf.reduce_mean(
    tf.nn.sampled_softmax_loss(softmax_weights, softmax_biases, embed,
                               train_labels, num_sampled, vocabulary_size))

  # Optimizer.
  optimizer = tf.train.AdagradOptimizer(1.0).minimize(loss)
  
  # Compute the similarity between minibatch examples and all embeddings.
  # We use the cosine distance:
  norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keep_dims=True))
  normalized_embeddings = embeddings / norm
  valid_embeddings = tf.nn.embedding_lookup(
    normalized_embeddings, valid_dataset)
  similarity = tf.matmul(valid_embeddings, tf.transpose(normalized_embeddings))


---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/Applications/anaconda/lib/python3.5/site-packages/tensorflow/python/framework/op_def_library.py in apply_op(self, op_type_name, name, **keywords)
    489                 as_ref=input_arg.is_ref,
--> 490                 preferred_dtype=default_dtype)
    491           except TypeError as err:

/Applications/anaconda/lib/python3.5/site-packages/tensorflow/python/framework/ops.py in internal_convert_to_tensor(value, dtype, name, as_ref, preferred_dtype)
    740         if ret is None:
--> 741           ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
    742 

/Applications/anaconda/lib/python3.5/site-packages/tensorflow/python/framework/ops.py in _TensorTensorConversionFunction(t, dtype, name, as_ref)
    613         "Tensor conversion requested dtype %s for Tensor with dtype %s: %r"
--> 614         % (dtype.name, t.dtype.name, str(t)))
    615   return t

ValueError: Tensor conversion requested dtype int32 for Tensor with dtype float32: 'Tensor("sampled_softmax_loss/Reshape_1:0", shape=(?, 1, ?), dtype=float32)'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-8-07554a859030> in <module>()
     34   loss = tf.reduce_mean(
     35     tf.nn.sampled_softmax_loss(softmax_weights, softmax_biases, embed,
---> 36                                train_labels, num_sampled, vocabulary_size))
     37 
     38   # Optimizer.

/Applications/anaconda/lib/python3.5/site-packages/tensorflow/python/ops/nn_impl.py in sampled_softmax_loss(weights, biases, labels, inputs, num_sampled, num_classes, num_true, sampled_values, remove_accidental_hits, partition_strategy, name)
   1266       remove_accidental_hits=remove_accidental_hits,
   1267       partition_strategy=partition_strategy,
-> 1268       name=name)
   1269   sampled_losses = nn_ops.softmax_cross_entropy_with_logits(labels=labels,
   1270                                                             logits=logits)

/Applications/anaconda/lib/python3.5/site-packages/tensorflow/python/ops/nn_impl.py in _compute_sampled_logits(weights, biases, labels, inputs, num_sampled, num_classes, num_true, sampled_values, subtract_log_q, remove_accidental_hits, partition_strategy, name)
   1005     row_wise_dots = math_ops.multiply(
   1006         array_ops.expand_dims(inputs, 1),
-> 1007         array_ops.reshape(true_w, new_true_w_shape))
   1008     # We want the row-wise dot plus biases which yields a
   1009     # [batch_size, num_true] tensor of true_logits.

/Applications/anaconda/lib/python3.5/site-packages/tensorflow/python/ops/math_ops.py in multiply(x, y, name)
    284 
    285 def multiply(x, y, name=None):
--> 286   return gen_math_ops._mul(x, y, name)
    287 
    288 

/Applications/anaconda/lib/python3.5/site-packages/tensorflow/python/ops/gen_math_ops.py in _mul(x, y, name)
   1375     A `Tensor`. Has the same type as `x`.
   1376   """
-> 1377   result = _op_def_lib.apply_op("Mul", x=x, y=y, name=name)
   1378   return result
   1379 

/Applications/anaconda/lib/python3.5/site-packages/tensorflow/python/framework/op_def_library.py in apply_op(self, op_type_name, name, **keywords)
    524                   "%s type %s of argument '%s'." %
    525                   (prefix, dtypes.as_dtype(attrs[input_arg.type_attr]).name,
--> 526                    inferred_from[input_arg.type_attr]))
    527 
    528           types = [values.dtype]

TypeError: Input 'y' of 'Mul' Op has type float32 that does not match type int32 of argument 'x'.

In [10]:
num_steps = 100001

with tf.Session(graph=graph) as session:
  tf.global_variables_initializer().run()
  print ("Initialized")
  average_loss = 0
  for step in xrange(num_steps):
    batch_data, batch_labels = generate_batch(
      batch_size, num_skips, skip_window)
    feed_dict = {train_dataset : batch_data, train_labels : batch_labels}
    _, l = session.run([optimizer, loss], feed_dict=feed_dict)
    average_loss += l
    if step % 2000 == 0:
      if step > 0:
        average_loss = average_loss / 2000
      # The average loss is an estimate of the loss over the last 2000 batches.
      print ("Average loss at step", step, ":", average_loss)
      average_loss = 0
    # note that this is expensive (~20% slowdown if computed every 500 steps)
    if step % 10000 == 0:
      sim = similarity.eval()
      for i in xrange(valid_size):
        valid_word = reverse_dictionary[valid_examples[i]]
        top_k = 8 # number of nearest neighbors
        nearest = (-sim[i, :]).argsort()[1:top_k+1]
        log = "Nearest to %s:" % valid_word
        for k in xrange(top_k):
          close_word = reverse_dictionary[nearest[k]]
          log = "%s %s," % (log, close_word)
        print (log)
  final_embeddings = normalized_embeddings.eval()


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-10-9fda770e894f> in <module>()
      1 num_steps = 100001
      2 
----> 3 with tf.Session(graph=graph) as session:
      4   tf.global_variables_initializer().run()
      5   print ("Initialized")

NameError: name 'graph' is not defined

In [50]:
num_points = 400

tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
two_d_embeddings = tsne.fit_transform(final_embeddings[1:num_points+1, :])


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-50-f70c03698d67> in <module>()
      2 
      3 tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
----> 4 two_d_embeddings = tsne.fit_transform(final_embeddings[1:num_points+1, :])

NameError: name 'final_embeddings' is not defined

In [51]:
def plot(embeddings, labels):
  assert embeddings.shape[0] >= len(labels), 'More labels than embeddings'
  pylab.figure(figsize=(15,15))  # in inches
  for i, label in enumerate(labels):
    x, y = embeddings[i,:]
    pylab.scatter(x, y)
    pylab.annotate(label, xy=(x, y), xytext=(5, 2), textcoords='offset points',
                   ha='right', va='bottom')
  pylab.show()

words = [reverse_dictionary[i] for i in range(1, num_points+1)]
plot(two_d_embeddings, words)


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-51-60644b7b42e2> in <module>()
     10 
     11 words = [reverse_dictionary[i] for i in range(1, num_points+1)]
---> 12 plot(two_d_embeddings, words)

NameError: name 'two_d_embeddings' is not defined

In [7]:
from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec

#sc = SparkContext(appName='Word2Vec')
inp = sc.textFile("url.txt").map(lambda row: row.split(" "))
word2vec = Word2Vec()
model = word2vec.fit(inp)  #Results in exception...
print(model.getVectors)
print(model.getVectors)
model.call
model.findSynonyms
model.load
model.save
model.transform
model.getVectors


<bound method Word2VecModel.getVectors of <pyspark.mllib.feature.Word2VecModel object at 0x114cb27f0>>
<bound method Word2VecModel.getVectors of <pyspark.mllib.feature.Word2VecModel object at 0x114cb27f0>>
Out[7]:
<bound method Word2VecModel.getVectors of <pyspark.mllib.feature.Word2VecModel object at 0x114cb27f0>>

In [1]:
sc


Out[1]:
<pyspark.context.SparkContext at 0x10f0036d8>

In [14]:
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec

USAGE = ("bin/spark-submit --driver-memory 4g "
         "examples/src/main/python/mllib/word2vec.py text8_lines")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(USAGE)
        sys.exit("Argument for file not provided")
    file_path = sys.argv[1]
    file_path="url.txt"
   # sc = SparkContext(appName='Word2Vec')
    inp = sc.textFile(file_path).map(lambda row: row.split(" "))

    word2vec = Word2Vec()
    model = word2vec.fit(inp)

    synonyms = model.findSynonyms('1', 5)
    

    for word, cosine_distance in synonyms:
        print("{}: {}".format(word, cosine_distance))
    sc.stop()


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-14-0c36f52dbe7d> in <module>()
     22     model = word2vec.fit(inp)
     23 
---> 24     synonyms = model.findSynonyms('1', 5)
     25 
     26     for word, cosine_distance in synonyms:

/usr/local/spark/python/pyspark/mllib/feature.py in findSynonyms(self, word, num)
    522         if not isinstance(word, basestring):
    523             word = _convert_to_vector(word)
--> 524         words, similarity = self.call("findSynonyms", word, num)
    525         return zip(words, similarity)
    526 

/usr/local/spark/python/pyspark/mllib/common.py in call(self, name, *a)
    144     def call(self, name, *a):
    145         """Call method of java_model"""
--> 146         return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
    147 
    148 

/usr/local/spark/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
    121     """ Call Java Function """
    122     args = [_py2java(sc, a) for a in args]
--> 123     return _java2py(sc, func(*args))
    124 
    125 

/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o393.findSynonyms.
: java.lang.IllegalStateException: 1 not in vocabulary
	at org.apache.spark.mllib.feature.Word2VecModel.transform(Word2Vec.scala:516)
	at org.apache.spark.mllib.api.python.Word2VecModelWrapper.transform(Word2VecModelWrapper.scala:34)
	at org.apache.spark.mllib.api.python.Word2VecModelWrapper.findSynonyms(Word2VecModelWrapper.scala:47)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:748)

In [18]:
from pyspark.mllib.feature import HashingTF, IDF

# Load documents (one per line).
documents = sc.textFile("url.txt").map(lambda line: line.split(" "))

hashingTF = HashingTF()
tf = hashingTF.transform(documents)

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)

In [19]:
from pyspark.mllib.feature import Word2Vec

inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))

word2vec = Word2Vec()
model = word2vec.fit(inp)

synonyms = model.findSynonyms('1', 5)

for word, cosine_distance in synonyms:
    print("{}: {}".format(word, cosine_distance))


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-19-d348dc2d018c> in <module>()
      4 
      5 word2vec = Word2Vec()
----> 6 model = word2vec.fit(inp)
      7 
      8 synonyms = model.findSynonyms('1', 5)

/usr/local/spark/python/pyspark/mllib/feature.py in fit(self, data)
    678                                float(self.learningRate), int(self.numPartitions),
    679                                int(self.numIterations), int(self.seed),
--> 680                                int(self.minCount), int(self.windowSize))
    681         return Word2VecModel(jmodel)
    682 

/usr/local/spark/python/pyspark/mllib/common.py in callMLlibFunc(name, *args)
    128     sc = SparkContext.getOrCreate()
    129     api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 130     return callJavaFunc(sc, api, *args)
    131 
    132 

/usr/local/spark/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
    121     """ Call Java Function """
    122     args = [_py2java(sc, a) for a in args]
--> 123     return _java2py(sc, func(*args))
    124 
    125 

/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o622.trainWord2VecModel.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/recepkabatas/Documents/GitHub/data-science-ipython-notebooks/deep-learning/tensor-flow-exercises/data/mllib/sample_lda_data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
	at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:328)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:327)
	at org.apache.spark.mllib.feature.Word2Vec.learnVocab(Word2Vec.scala:186)
	at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:307)
	at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:452)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainWord2VecModel(PythonMLLibAPI.scala:693)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:748)

In [ ]: