In [2]:
import findspark
findspark.init()


import sys
from pyspark.sql import SparkSession, functions, types

spark = SparkSession.builder.appName('reddit relative scores').getOrCreate()

assert sys.version_info >= (3, 4) # make sure we have Python 3.4+
assert spark.version >= '2.1' # make sure we have Spark 2.1+

schema = types.StructType([ # commented-out fields won't be read
    #types.StructField('archived', types.BooleanType(), False),
    types.StructField('author', types.StringType(), False),
    #types.StructField('author_flair_css_class', types.StringType(), False),
    #types.StructField('author_flair_text', types.StringType(), False),
    #types.StructField('body', types.StringType(), False),
    #types.StructField('controversiality', types.LongType(), False),
    #types.StructField('created_utc', types.StringType(), False),
    #types.StructField('distinguished', types.StringType(), False),
    #types.StructField('downs', types.LongType(), False),
    #types.StructField('edited', types.StringType(), False),
    #types.StructField('gilded', types.LongType(), False),
    #types.StructField('id', types.StringType(), False),
    #types.StructField('link_id', types.StringType(), False),
    #types.StructField('name', types.StringType(), False),
    #types.StructField('parent_id', types.StringType(), True),
    #types.StructField('retrieved_on', types.LongType(), False),
    types.StructField('score', types.LongType(), False),
    #types.StructField('score_hidden', types.BooleanType(), False),
    types.StructField('subreddit', types.StringType(), False),
    #types.StructField('subreddit_id', types.StringType(), False),
    #types.StructField('ups', types.LongType(), False),
])


def main():
    in_directory = sys.argv[1]
    out_directory = sys.argv[2]
    
    in_directory = "reddit-1"
    out_directory = "output"
    
    comments = spark.read.json(in_directory, schema=schema).cache()
    comments.show()
    #
    averages_by_subreddit = comments.groupby("subreddit").agg(functions.avg("score").alias("avg(score)"))
    averages_by_subreddit = averages_by_subreddit.where(averages_by_subreddit["avg(score)"] > 0)
    averages_by_subreddit.show()
    
    comments_with_avg = averages_by_subreddit.join(comments,"subreddit")
    comments_with_avg = comments_with_avg.withColumn("rel_score", comments_with_avg["score"] / comments_with_avg["avg(score)"]).cache()
    comments_with_avg.show()

    max_relative_score = comments_with_avg.groupby(comments["subreddit"]).agg(functions.max("rel_score"))
    max_relative_score.show()
    
    comments_with_max = max_relative_score.join(comments_with_avg,"subreddit").cache()
    comments_with_max.show()
    max_comments = comments_with_max.where(comments_with_max["max(rel_score)"]==comments_with_max["rel_score"])
    max_comments.show()
    #     averages_by_subreddit = grouped.sort("subreddit")
    #     averages_by_score = grouped.sort(functions.desc("avg(score)"))
    #     averages_by_subreddit.show()
    #     averages_by_score.show()

    #     averages_by_subreddit.write.csv(out_directory + '-subreddit', mode='overwrite')
    #     averages_by_score.write.csv(out_directory + '-score', mode='overwrite')
    # TODO
    best_author = max_comments.select(max_comments["subreddit"], max_comments["author"], max_comments["rel_score"])
    
    # We know groups has <=1000 rows, since the data is grouped by time combinations, so it can safely be moved into 1 partition.
    #coalesce(1)
    best_author.write.json(out_directory, mode='overwrite')


if __name__=='__main__':
    main()


+-------------------+-----+---------+
|             author|score|subreddit|
+-------------------+-----+---------+
|           Tbone139|    1|     xkcd|
|         BoonTobias|    4|     xkcd|
|       mynameisdave|   -2|     xkcd|
|             ani625|    5|     xkcd|
|           Treberto|   -1|     xkcd|
|            Norther|    1|     xkcd|
|             dakk12|    2|     xkcd|
|            jtp8736|    1|     xkcd|
|RichardPeterJohnson|    7|     xkcd|
|            wthulhu|    3|     xkcd|
|          retronym_|    2|    scala|
|              jdh30|    1|    scala|
|           rsshilli|   25|     xkcd|
|           rsshilli|    5|     xkcd|
|            Shindig|    1|     xkcd|
|          [deleted]|    1|     xkcd|
|            Wilibus|    1|     xkcd|
|             ilogik|    2|     xkcd|
|     conservohippie|    3|     xkcd|
|             csours|    4|     xkcd|
+-------------------+-----+---------+
only showing top 20 rows

+---------+------------------+
|subreddit|        avg(score)|
+---------+------------------+
|     xkcd| 5.272939881689366|
|    scala| 1.928939237899073|
|optometry|1.4701986754966887|
|  Cameras|1.2222222222222223|
|Genealogy| 1.871313672922252|
+---------+------------------+

+---------+-----------------+------------------+-----+-------------------+
|subreddit|       avg(score)|            author|score|          rel_score|
+---------+-----------------+------------------+-----+-------------------+
|     xkcd|5.272939881689366|            Glorin|    1| 0.1896475253724334|
|     xkcd|5.272939881689366|            iKs279|    1| 0.1896475253724334|
|     xkcd|5.272939881689366|        Supersnazz|    9| 1.7068277283519007|
|     xkcd|5.272939881689366|          hotpixel|    3| 0.5689425761173001|
|     xkcd|5.272939881689366|           V2Blast|    1| 0.1896475253724334|
|     xkcd|5.272939881689366|         [deleted]|    1| 0.1896475253724334|
|     xkcd|5.272939881689366|           mind404|    2| 0.3792950507448668|
|     xkcd|5.272939881689366|      AndrewBenton|   10|  1.896475253724334|
|     xkcd|5.272939881689366|            daemen|    2| 0.3792950507448668|
|     xkcd|5.272939881689366| Quicksilver_Johny|    2| 0.3792950507448668|
|     xkcd|5.272939881689366|           fashraf|    0|                0.0|
|     xkcd|5.272939881689366|         [deleted]|    1| 0.1896475253724334|
|     xkcd|5.272939881689366|         SMTRodent|    1| 0.1896475253724334|
|     xkcd|5.272939881689366|             Kuhva|    0|                0.0|
|     xkcd|5.272939881689366| ChrisHansensVoice|    0|                0.0|
|     xkcd|5.272939881689366|            dmitch|   -1|-0.1896475253724334|
|     xkcd|5.272939881689366|         SMTRodent|    2| 0.3792950507448668|
|     xkcd|5.272939881689366|         [deleted]|    2| 0.3792950507448668|
|     xkcd|5.272939881689366|supersonicsongbird|    1| 0.1896475253724334|
|     xkcd|5.272939881689366|        myotheralt|   17|  3.224007931331368|
+---------+-----------------+------------------+-----+-------------------+
only showing top 20 rows

+---------+------------------+
|subreddit|    max(rel_score)|
+---------+------------------+
|     xkcd| 63.15262594902032|
|    scala| 8.813134009610252|
|optometry| 4.081081081081082|
|  Cameras|1.6363636363636362|
|Genealogy| 6.412607449856734|
+---------+------------------+

+---------+-----------------+-----------------+------------------+-----+-------------------+
|subreddit|   max(rel_score)|       avg(score)|            author|score|          rel_score|
+---------+-----------------+-----------------+------------------+-----+-------------------+
|     xkcd|63.15262594902032|5.272939881689366|            Glorin|    1| 0.1896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|            iKs279|    1| 0.1896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|        Supersnazz|    9| 1.7068277283519007|
|     xkcd|63.15262594902032|5.272939881689366|          hotpixel|    3| 0.5689425761173001|
|     xkcd|63.15262594902032|5.272939881689366|           V2Blast|    1| 0.1896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|         [deleted]|    1| 0.1896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|           mind404|    2| 0.3792950507448668|
|     xkcd|63.15262594902032|5.272939881689366|      AndrewBenton|   10|  1.896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|            daemen|    2| 0.3792950507448668|
|     xkcd|63.15262594902032|5.272939881689366| Quicksilver_Johny|    2| 0.3792950507448668|
|     xkcd|63.15262594902032|5.272939881689366|           fashraf|    0|                0.0|
|     xkcd|63.15262594902032|5.272939881689366|         [deleted]|    1| 0.1896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|         SMTRodent|    1| 0.1896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|             Kuhva|    0|                0.0|
|     xkcd|63.15262594902032|5.272939881689366| ChrisHansensVoice|    0|                0.0|
|     xkcd|63.15262594902032|5.272939881689366|            dmitch|   -1|-0.1896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|         SMTRodent|    2| 0.3792950507448668|
|     xkcd|63.15262594902032|5.272939881689366|         [deleted]|    2| 0.3792950507448668|
|     xkcd|63.15262594902032|5.272939881689366|supersonicsongbird|    1| 0.1896475253724334|
|     xkcd|63.15262594902032|5.272939881689366|        myotheralt|   17|  3.224007931331368|
+---------+-----------------+-----------------+------------------+-----+-------------------+
only showing top 20 rows

+---------+------------------+------------------+-----------+-----+------------------+
|subreddit|    max(rel_score)|        avg(score)|     author|score|         rel_score|
+---------+------------------+------------------+-----------+-----+------------------+
|Genealogy| 6.412607449856734| 1.871313672922252|  ackbar420|   12| 6.412607449856734|
|  Cameras|1.6363636363636362|1.2222222222222223|  [deleted]|    2|1.6363636363636362|
|  Cameras|1.6363636363636362|1.2222222222222223|TogOfStills|    2|1.6363636363636362|
|    scala| 8.813134009610252| 1.928939237899073|   TheSmoke|   17| 8.813134009610252|
|optometry| 4.081081081081082|1.4701986754966887|Klinefelter|    6| 4.081081081081082|
|     xkcd| 63.15262594902032| 5.272939881689366|  shigawire|  333| 63.15262594902032|
+---------+------------------+------------------+-----------+-----+------------------+