In [29]:
import json
from pyspark.mllib.recommendation import ALS

Importing rating file


In [30]:
def parseRelation(line):
    fields = line.split("\t")
    
    rating = json.loads(fields[3])["rating"]
    properties_js = json.loads(fields[4])
    user = properties_js["subject"].split(":")[1]
    movie = properties_js["object"].split(":")[1]
    
    return long(fields[2]) % 10, (int(user), int(movie), long(rating))

ratings = sc.textFile('/vagrant/vagrant/relations.dat').filter(lambda line: "rating.explicit" in line).map(parseRelation)
ratings.first()


Out[30]:
(9L, (10196, 11242, 3L))

In [31]:
def parseEntity(line):
    fields = line.split("\t")
    js = json.loads(fields[2])
    return int(fields[1]), js["title"]

movies = sc.textFile('/vagrant/vagrant/entities.dat').filter(lambda line: "movie" in line).map(parseEntity)
ratings.first()


Out[31]:
(9L, (10196, 11242, 3L))

In [32]:
numRatings = ratings.count()
    numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
    numMovies = ratings.values().map(lambda r: r[1]).distinct().count()

    print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)


Got 100000 ratings from 943 users on 1682 movies.

Splitting data for test and validation


In [33]:
numPartitions = 4

training = ratings.filter(lambda x: x[0] < 6).values().repartition(numPartitions).cache()
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8).values().repartition(numPartitions).cache()
test = ratings.filter(lambda x: x[0] >= 8).values().cache()

numTraining = training.count()
numValidation = validation.count()
numTest = test.count()

print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)


Training: 60024, validation: 20435, test: 19541

Create training test and validation data


In [34]:
training.repartition(1).saveAsTextFile("/tmp/training")
validation.repartition(1).saveAsTextFile("/tmp/validation")
test.repartition(1).saveAsTextFile("/tmp/test")


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-34-9c47187f425f> in <module>()
----> 1 training.repartition(1).saveAsTextFile("/tmp/training")
      2 validation.repartition(1).saveAsTextFile("/tmp/validation")
      3 test.repartition(1).saveAsTextFile("/tmp/test")

/usr/lib/spark/python/pyspark/rdd.pyc in saveAsTextFile(self, path)
   1286         keyed = self.mapPartitionsWithIndex(func)
   1287         keyed._bypass_serializer = True
-> 1288         keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
   1289 
   1290     # Pair functions

/usr/lib/spark/python/build/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/lib/spark/python/build/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o721.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/tmp/training already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1041)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:849)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1164)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:443)
	at org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:32)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)