Tom Meagher & Fangling Zhang
To get started install Jupyter and Apache Toree.
$ pip install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
$ jupyter toree install
Once you start the notebook with jupyter notebook, run the following command (if everything worked, you should see something that looks like org.apache.spark.SparkContext@5fa635fb):
In [1]:
sc
Out[1]:
In [2]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val transactionsRDD = sc.textFile("data/transactions.csv")
val schemaString = "transaction_id customer_id total_amount item_count description"
val fields = schemaString.split(" ").
map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rowRDD = transactionsRDD.
map(_.split(",")).
map(attributes => Row(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).trim))
val transactionsDF = spark.createDataFrame(rowRDD, schema)
transactionsDF.createOrReplaceTempView("transactions")
val results = spark.sql("""
SELECT *
FROM transactions
""")
results.show()
In [3]:
val T1 = spark.sql("""
SELECT *
FROM transactions
WHERE total_amount > 200
""")
transactionsDF.registerTempTable("T1")
T1.show()
In [4]:
var T2 = spark.sql("""
SELECT item_count, sum(total_amount) as sum, avg(total_amount) as avg, min(total_amount) as min, max(total_amount) as max
FROM T1
GROUP BY item_count
""")
transactionsDF.registerTempTable("T2")
In [5]:
T2.show()
In [6]:
var T3 = spark.sql("""
SELECT customer_id, count(*) as transaction_count
FROM T1
GROUP BY customer_id
""")
T3.registerTempTable("T3")
T3.show()
In [7]:
T3
Out[7]:
In [8]:
var T4 = spark.sql("""
SELECT *
FROM transactions
WHERE total_amount < 600
""")
transactionsDF.registerTempTable("T4")
T4.show()
In [9]:
var T5 = spark.sql("""
SELECT customer_id, count(*) as transaction_count
FROM T4
GROUP BY customer_id
""")
transactionsDF.registerTempTable("T5")
T5.show()
In [10]:
var T6 = spark.sql("""
SELECT T5.customer_id
FROM T5
JOIN T3
ON T5.customer_id = T3.customer_id
WHERE T5.transaction_count * 3 < T3.transaction_count
GROUP BY T5.customer_id
""")
transactionsDF.registerTempTable("T6")
In [11]:
T6.show()
Assume a two-dimensional space that extends from 1...10,000 in each dimension as shown in Figure 1. There are points scattered all around the space. The space is divided into pre-defined grid-cells, each of size 20x20. That is, there is 500,000 grid cell in the space. Each cell has a unique ID as indicated in the Figure. Given an ID of a grid cell, you can calculate the row and the column it belongs to using a simple mathematical equation.
In [2]:
import java.io._
In [33]:
val RECORD_COUNT = 10000000 // 97.8MB
val LOWER = 1
val HIGHER = 10000
val writer = new PrintWriter(new File("data/points.csv"))
for (_ <- 1 to RECORD_COUNT) {
val x = scala.util.Random.nextInt(HIGHER - LOWER) + LOWER
val y = scala.util.Random.nextInt(HIGHER - LOWER) + LOWER
writer.println(s"$x,$y")
}
writer.close
In this step, you will write Scala or Java code (it is your choice) to manipulate the file and report the top 50 grid cells (the grid cell IDs not the points inside) that have the highest I index. Write the workflow that reports the cell IDs along with their relative-density index. Your code should be fully parallelizable (distributed) and scalable.
In [101]:
class CellCoordinates(val x1: Int, val x2: Int, val y1: Int, val y2: Int) extends Serializable {
override def toString(): String = s"($x1, $x2, $y1, $y2)"
}
def get_cell_coordinates(cell_number: Int, row_length: Int = 500): CellCoordinates = {
val x = (cell_number - 1) % row_length
val y = Math.floor((cell_number - 1) / row_length).toInt
val x1 = x * 20
val x2 = (x + 1) * 20
val y1 = y * 20
val y2 = (y + 1) * 20
return new CellCoordinates(x1, x2, y1, y2)
}
def is_point_in_cell(point: Array[String], cell: CellCoordinates): Boolean = {
val x = point(0).toInt
val y = point(1).toInt
return (x >= cell.x1) && (x < cell.x2) && (y >= cell.y1) && (y < cell.y2)
}
def get_cell_number_for_point(point: Array[String], row_length: Int = 500, cell_size: Int = 20): Int = {
val x = point(0).toInt
val y = point(1).toInt
return (499 - y / cell_size) * row_length + x / cell_size + 1
}
def get_cell_neighbors(cell_number: Int, row_length: Int = 500): Array[Int] = {
val top_neighbor_number = cell_number - row_length
val bottom_neighbor_number = cell_number + row_length
var neighbor_numbers = Array[Int](top_neighbor_number, bottom_neighbor_number)
if ((cell_number - 1) % 500 != 0) {
val left_neighbor_number = cell_number - 1
val top_left_neighbor_number = top_neighbor_number - 1
val bottom_left_neighbor_number = bottom_neighbor_number - 1
neighbor_numbers = neighbor_numbers ++ Array[Int](left_neighbor_number,
top_left_neighbor_number,
bottom_left_neighbor_number)
}
if (cell_number % 500 != 0) {
val right_neighbor_number = cell_number + 1
val top_right_neighbor_number = top_neighbor_number + 1
val bottom_right_neighbor_number = bottom_neighbor_number + 1
neighbor_numbers = neighbor_numbers ++ Array[Int](right_neighbor_number,
top_right_neighbor_number,
bottom_right_neighbor_number)
}
return neighbor_numbers.filter(number => number > 0)
}
In [99]:
val pointsRDD = sc.textFile("data/points.csv").
map(line => line.split(",").map(elem => elem.trim))
In [105]:
val cellPointsRDD = pointsRDD.map({point => (get_cell_number_for_point(point), 1)}).
reduceByKey((x, y) => x + y)
In [6]:
val cell_number = 502
val cell = get_cell_coordinates(cell_number)
val cell501RDD = pointsRDD.filter(point => is_point_in_cell(point, cell))
val x_count = cell501RDD.count
val cell501NeighborNumbers = get_cell_neighbors(cell_number)
val y_count = cell501NeighborNumbers.map(number => get_cell_coordinates(number)).
map(neighbor => pointsRDD.filter(point => is_point_in_cell(point, neighbor))).
map(rdd => rdd.count)
val y_count_average = y_count.sum / y_count.length
println(s"$x_count / $y_count_average = ${x_count.toFloat / y_count_average.toFloat}")
In [ ]: