Tom Meagher & Fangling Zhang

DS 503 Project 3

To get started install Jupyter and Apache Toree.

$ pip install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
$ jupyter toree install

Once you start the notebook with jupyter notebook, run the following command (if everything worked, you should see something that looks like org.apache.spark.SparkContext@5fa635fb):


In [1]:
sc


Out[1]:
org.apache.spark.SparkContext@37e20441

Problem 1 SparkSQL (Transaction Data Processing) [30 points]

Guide to SparkSQL commands

Use the Transaction dataset T that you created in Project 1 and create a Spark workflow to do the following. [Use SparkSQL to write this workflow.]


In [2]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val transactionsRDD = sc.textFile("data/transactions.csv")
val schemaString = "transaction_id customer_id total_amount item_count description"

val fields = schemaString.split(" ").
    map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

val rowRDD = transactionsRDD.
    map(_.split(",")).
    map(attributes => Row(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).trim))
val transactionsDF = spark.createDataFrame(rowRDD, schema)
transactionsDF.createOrReplaceTempView("transactions")

val results = spark.sql("""
    SELECT *
    FROM transactions
    """)
results.show()


+--------------+-----------+------------+----------+--------------------+
|transaction_id|customer_id|total_amount|item_count|         description|
+--------------+-----------+------------+----------+--------------------+
|             1|          1|      705.88|         3|F4NYY9eZp0fLoOQEQ...|
|             2|          1|    710.3965|         1|bIoRFNFd55cR2rEmh...|
|             3|          1|   643.23785|         5|0WPnJAZsOeIlTdE1C...|
|             4|          1|    823.4931|         8|XxAqgGnkAI4tvYmBP...|
|             5|          1|   768.90283|         3|3mwOIi3HWKL0RvVYs...|
|             6|          1|   665.63586|         7|3nzN85bvXybkNpCs0...|
|             7|          1|   476.02887|         4|U2rNi2mphaSYCTaLk...|
|             8|          1|   630.26965|         8|ip1CFmNLbC5FEv43F...|
|             9|          1|   116.05951|         1|cCNufRWg7qVKW1LGp...|
|            10|          1|    939.7608|         3|11ACsLbqkjKarSOmO...|
|            11|          1|    672.5814|         4|RTSvOg40mg2R7fJTb...|
|            12|          1|    454.1324|         5|ZAcDqqSWN1HhOUjrQ...|
|            13|          1|    459.1774|         2|9WblccyFZ1OQjHKic...|
|            14|          1|   160.99246|         1|qFq5XeGUyhdcfIamV...|
|            15|          1|    738.1266|         4|wIkGDs3K4h979cw34...|
|            16|          1|   459.06564|        10|KAoxX3v1uiD3Gziv4...|
|            17|          1|    798.7524|         9|29AZGr1KuQrknudoYer1|
|            18|          1|    579.0614|         8|1Mtm7hJmqWXS52AMohxK|
|            19|          1|    771.7162|         5|HQ1FbpG5tczroyOL7...|
|            20|          1|    677.1439|        10|uuhffShG83X4AUIuw...|
+--------------+-----------+------------+----------+--------------------+
only showing top 20 rows

1) T1: Filter out (drop) the transactions from T whose total amount is less than $200


In [3]:
val T1 = spark.sql("""
    SELECT *
    FROM transactions
    WHERE total_amount > 200
    """)
transactionsDF.registerTempTable("T1")
T1.show()


+--------------+-----------+------------+----------+--------------------+
|transaction_id|customer_id|total_amount|item_count|         description|
+--------------+-----------+------------+----------+--------------------+
|             1|          1|      705.88|         3|F4NYY9eZp0fLoOQEQ...|
|             2|          1|    710.3965|         1|bIoRFNFd55cR2rEmh...|
|             3|          1|   643.23785|         5|0WPnJAZsOeIlTdE1C...|
|             4|          1|    823.4931|         8|XxAqgGnkAI4tvYmBP...|
|             5|          1|   768.90283|         3|3mwOIi3HWKL0RvVYs...|
|             6|          1|   665.63586|         7|3nzN85bvXybkNpCs0...|
|             7|          1|   476.02887|         4|U2rNi2mphaSYCTaLk...|
|             8|          1|   630.26965|         8|ip1CFmNLbC5FEv43F...|
|            10|          1|    939.7608|         3|11ACsLbqkjKarSOmO...|
|            11|          1|    672.5814|         4|RTSvOg40mg2R7fJTb...|
|            12|          1|    454.1324|         5|ZAcDqqSWN1HhOUjrQ...|
|            13|          1|    459.1774|         2|9WblccyFZ1OQjHKic...|
|            15|          1|    738.1266|         4|wIkGDs3K4h979cw34...|
|            16|          1|   459.06564|        10|KAoxX3v1uiD3Gziv4...|
|            17|          1|    798.7524|         9|29AZGr1KuQrknudoYer1|
|            18|          1|    579.0614|         8|1Mtm7hJmqWXS52AMohxK|
|            19|          1|    771.7162|         5|HQ1FbpG5tczroyOL7...|
|            20|          1|    677.1439|        10|uuhffShG83X4AUIuw...|
|            21|          1|     916.906|         1|0hWBshEkMsUnnlaJl...|
|            22|          1|   312.02075|         6|liJ9fcZD3VtJM3jNV...|
+--------------+-----------+------------+----------+--------------------+
only showing top 20 rows

2) T2: Over T1, group the transactions by the Number of Items it has, and for each group calculate the sum of total amounts, the average of total amounts, the min and the max of the total amounts.


In [4]:
var T2 = spark.sql("""
    SELECT item_count, sum(total_amount) as sum, avg(total_amount) as avg, min(total_amount) as min, max(total_amount) as max
    FROM T1
    GROUP BY item_count
    """)
transactionsDF.registerTempTable("T2")

3) Report back T2 to the client side


In [5]:
T2.show()


+----------+--------------------+-----------------+----------+---------+
|item_count|                 sum|              avg|       min|      max|
+----------+--------------------+-----------------+----------+---------+
|         7|1.0032024631134991E8|550.2668300021387| 100.00456| 999.9923|
|         3| 9.986017844713731E7|549.9211324805183| 100.00295|999.99896|
|         8|1.0004597069404551E8|549.2323650829262| 100.00547|999.99615|
|         5| 9.979772892034207E7|550.7661724761978|100.000854| 999.9972|
|         6|1.0025006083922471E8|549.6949172536914| 100.00928|999.99536|
|         9| 9.990064196736786E7|550.5957416866522| 100.00365| 999.9996|
|         1|1.0018220245438981E8| 550.276299060683| 100.00193| 999.9967|
|        10|1.0022187393882251E8|549.7966083462113| 100.00204|999.99243|
|         4| 9.958202809550999E7|548.3380491694152| 100.00327|999.97284|
|         2|1.0031883233221298E8|550.0961381628866| 100.00756| 999.9979|
+----------+--------------------+-----------------+----------+---------+

4) T3: Over T1, group the transactions by customer ID, and for each group report the customer ID, and the transactions’ count.


In [6]:
var T3 = spark.sql("""
    SELECT customer_id, count(*) as transaction_count
    FROM T1
    GROUP BY customer_id
    """)
T3.registerTempTable("T3")
T3.show()


+-----------+-----------------+
|customer_id|transaction_count|
+-----------+-----------------+
|        296|              102|
|        467|              102|
|        675|              102|
|        691|              102|
|        829|              102|
|       1090|              102|
|       1159|              102|
|       1436|              102|
|       1512|              102|
|       1572|              102|
|       2069|              102|
|       2088|              102|
|       2136|              102|
|       2162|              102|
|       2294|              102|
|       2904|              102|
|       3210|              102|
|       3414|              102|
|       3606|              102|
|       3959|              102|
+-----------+-----------------+
only showing top 20 rows


In [7]:
T3


Out[7]:
[customer_id: string, transaction_count: bigint]

5) T4: Filter out (drop) the transactions from T whose total amount is less than $600


In [8]:
var T4 = spark.sql("""
    SELECT *
    FROM transactions
    WHERE total_amount < 600
    """)
transactionsDF.registerTempTable("T4")
T4.show()


+--------------+-----------+------------+----------+--------------------+
|transaction_id|customer_id|total_amount|item_count|         description|
+--------------+-----------+------------+----------+--------------------+
|             7|          1|   476.02887|         4|U2rNi2mphaSYCTaLk...|
|             9|          1|   116.05951|         1|cCNufRWg7qVKW1LGp...|
|            12|          1|    454.1324|         5|ZAcDqqSWN1HhOUjrQ...|
|            13|          1|    459.1774|         2|9WblccyFZ1OQjHKic...|
|            14|          1|   160.99246|         1|qFq5XeGUyhdcfIamV...|
|            16|          1|   459.06564|        10|KAoxX3v1uiD3Gziv4...|
|            18|          1|    579.0614|         8|1Mtm7hJmqWXS52AMohxK|
|            22|          1|   312.02075|         6|liJ9fcZD3VtJM3jNV...|
|            25|          1|   280.80206|         5|4q9AJH152KKIriNsa...|
|            27|          1|    544.7473|         3|1bSnaCqf2b1vpZWYt...|
|            28|          1|   327.94125|         6|bA2y4FUJ3GHM8Lllr...|
|            29|          1|   543.52203|         1|1KnV9qjo76afvdafDIZX|
|            32|          1|   534.67126|         7|DSAUTxcNXg7W5QBX7...|
|            33|          1|   347.82996|         8|3UIqqppYQ5wBJmuA3...|
|            34|          1|   340.93652|         6|cjjKWVM6aAxMJiuyZ...|
|            35|          1|   369.94492|         3|tNkr7OISqvwVsBlSa...|
|            37|          1|   326.20517|         4|Ne7iejfV5XwYo6PgUlqu|
|            38|          1|    497.7991|         5|7OeiywgYeUESiJXNp...|
|            40|          1|   327.44507|         1|002Xsp4ZpC322tggk...|
|            42|          1|   166.89853|         3|LWOAuLXoKdaQPib9w...|
+--------------+-----------+------------+----------+--------------------+
only showing top 20 rows

6) T5: Over T4, group the transactions by customer ID, and for each group report the customer ID, and the transactions’ count.


In [9]:
var T5 = spark.sql("""
    SELECT customer_id, count(*) as transaction_count
    FROM T4
    GROUP BY customer_id 
    """)
transactionsDF.registerTempTable("T5")
T5.show()


+-----------+-----------------+
|customer_id|transaction_count|
+-----------+-----------------+
|        296|              102|
|        467|              102|
|        675|              102|
|        691|              102|
|        829|              102|
|       1090|              102|
|       1159|              102|
|       1436|              102|
|       1512|              102|
|       1572|              102|
|       2069|              102|
|       2088|              102|
|       2136|              102|
|       2162|              102|
|       2294|              102|
|       2904|              102|
|       3210|              102|
|       3414|              102|
|       3606|              102|
|       3959|              102|
+-----------+-----------------+
only showing top 20 rows

7) T6: Select the customer IDs whose T5.count * 3 < T3.count


In [10]:
var T6 = spark.sql("""
    SELECT T5.customer_id
    FROM T5
    JOIN T3
    ON T5.customer_id = T3.customer_id
    WHERE T5.transaction_count * 3 < T3.transaction_count
    GROUP BY T5.customer_id
    """)
transactionsDF.registerTempTable("T6")

8) Report back T6 to the client side


In [11]:
T6.show()


+-----------+
|customer_id|
+-----------+
+-----------+


Problem 2 Spark-RDDs (Grid Cells of High Relative-Density Index) [70 points]

Assume a two-dimensional space that extends from 1...10,000 in each dimension as shown in Figure 1. There are points scattered all around the space. The space is divided into pre-defined grid-cells, each of size 20x20. That is, there is 500,000 grid cell in the space. Each cell has a unique ID as indicated in the Figure. Given an ID of a grid cell, you can calculate the row and the column it belongs to using a simple mathematical equation.

Step 1 (Create the Datasets) [10 Points] //You can re-use your code from Project 2

  • Your task in this step is to create one dataset P (set of 2D points). Assume the space extends from 1...10,000 in the both the X and Y axis. Each line in the file should contain one point in the format (a, b), where a is the value in the X axis, and b is the value in the Y axis.
  • Scale the dataset to be at least 100MB.
  • Choose the appropriate random function (of your choice) to create the points.
  • Upload and store the file into HDFS

In [2]:
import java.io._

In [33]:
val RECORD_COUNT = 10000000 // 97.8MB
val LOWER = 1
val HIGHER = 10000

val writer = new PrintWriter(new File("data/points.csv"))
for (_ <- 1 to RECORD_COUNT) {
    val x = scala.util.Random.nextInt(HIGHER - LOWER) + LOWER
    val y = scala.util.Random.nextInt(HIGHER - LOWER) + LOWER
    writer.println(s"$x,$y")
}
writer.close

Step 2 (Report the TOP 50 grid cells w.r.t Relative-Density Index) [40 Points]

In this step, you will write Scala or Java code (it is your choice) to manipulate the file and report the top 50 grid cells (the grid cell IDs not the points inside) that have the highest I index. Write the workflow that reports the cell IDs along with their relative-density index. Your code should be fully parallelizable (distributed) and scalable.


In [101]:
class CellCoordinates(val x1: Int, val x2: Int, val y1: Int, val y2: Int) extends Serializable {
    override def toString(): String = s"($x1, $x2, $y1, $y2)"
}

def get_cell_coordinates(cell_number: Int, row_length: Int = 500): CellCoordinates = {
    val x = (cell_number - 1) % row_length
    val y = Math.floor((cell_number - 1) / row_length).toInt

    val x1 = x * 20
    val x2 = (x + 1) * 20

    val y1 = y * 20
    val y2 = (y + 1) * 20
    return new CellCoordinates(x1, x2, y1, y2)
}

def is_point_in_cell(point: Array[String], cell: CellCoordinates): Boolean = {
    val x = point(0).toInt
    val y = point(1).toInt
    return (x >= cell.x1) && (x < cell.x2) && (y >= cell.y1) && (y < cell.y2)
}

def get_cell_number_for_point(point: Array[String], row_length: Int = 500, cell_size: Int = 20): Int = {
    val x = point(0).toInt
    val y = point(1).toInt
    return (499 - y / cell_size) * row_length + x / cell_size + 1
}

def get_cell_neighbors(cell_number: Int, row_length: Int = 500): Array[Int] = {
    val top_neighbor_number = cell_number - row_length
    val bottom_neighbor_number = cell_number + row_length
    var neighbor_numbers = Array[Int](top_neighbor_number, bottom_neighbor_number)
    
    if ((cell_number - 1) % 500 != 0) {
        val left_neighbor_number = cell_number - 1
        val top_left_neighbor_number = top_neighbor_number - 1
        val bottom_left_neighbor_number = bottom_neighbor_number - 1
        neighbor_numbers = neighbor_numbers ++ Array[Int](left_neighbor_number,
                                                          top_left_neighbor_number,
                                                          bottom_left_neighbor_number)
    }
    
    if (cell_number % 500 != 0) {
        val right_neighbor_number = cell_number + 1
        val top_right_neighbor_number = top_neighbor_number + 1
        val bottom_right_neighbor_number = bottom_neighbor_number + 1
        neighbor_numbers = neighbor_numbers ++ Array[Int](right_neighbor_number,
                                                          top_right_neighbor_number,
                                                          bottom_right_neighbor_number)
    }

    return neighbor_numbers.filter(number => number > 0)
}

In [99]:
val pointsRDD = sc.textFile("data/points.csv").
                   map(line => line.split(",").map(elem => elem.trim))

Cells with count of points


In [105]:
val cellPointsRDD = pointsRDD.map({point => (get_cell_number_for_point(point), 1)}).
                              reduceByKey((x, y) => x + y)

Density for cell


In [6]:
val cell_number = 502
val cell = get_cell_coordinates(cell_number)
val cell501RDD = pointsRDD.filter(point => is_point_in_cell(point, cell))
val x_count = cell501RDD.count

val cell501NeighborNumbers = get_cell_neighbors(cell_number)
val y_count = cell501NeighborNumbers.map(number => get_cell_coordinates(number)).
                                     map(neighbor => pointsRDD.filter(point => is_point_in_cell(point, neighbor))).
                                     map(rdd => rdd.count)
val y_count_average = y_count.sum / y_count.length

println(s"$x_count / $y_count_average = ${x_count.toFloat / y_count_average.toFloat}")


32 / 42 = 0.7619048

Step 3 (Report the TOP 50 grid cells w.r.t Relative-Density Index)[20 Points]

Continue over the results from Step 2, and for each of the reported top 50 grid cells, report the IDs and the relative-density indexes of its neighbor cells.


In [ ]: