We need some libraries and initial parameters...


In [ ]:
library(SparkR)
library(leaflet)
library(sp)
options(digits=15)

epsilon = 100
source("examples.R")

Let load some data...


In [ ]:
data = read.csv("sample_small.csv")
names(data) = c("ID","lat","lng")
head(data)

and visualize them in a map...


In [ ]:
map = leaflet() %>%
  addTiles() %>%
  addCircleMarkers(lng=data$lng, lat=data$lat,weight=2,fillOpacity=1,color="blue",radius=2)

file = 'map1.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))

Connecting with Simba and getting a SQLContext...


In [ ]:
sc <- sparkR.init("local[*]", "SparkR")
sqlContext <- sparkRSQL.init(sc)

Let read the data into Simba and apply a transformation...


In [ ]:
dataRDD = SparkR:::textFile(sc,"sample_small.csv")
dataRDD = SparkR:::map(dataRDD, transformCoords)

Now We create a DataFrame and cache the data...


In [ ]:
schema <- structType(structField("id", "double"), structField("lng", "double"), structField("lat", "double"))
points <- createDataFrame(sqlContext, dataRDD, schema = schema)
cache(points)

Let's have a look...


In [ ]:
head(points)
count(points)

Now, We need register a pair of temporal tables...


In [ ]:
registerTempTable(points, "p1")
registerTempTable(points, "p2")

It is time to execute some SQL...

SELECT
    * 
FROM 
    p1 
JOIN 
    p2
ON 
    POINT(p2.lng, p2.lat) IN CIRCLERANGE(POINT(p1.lng, p1.lat), epsilon)
WHERE 
    p2.id < p1.id

In [ ]:
sql = paste0("SELECT * FROM p1 JOIN p2 ON POINT(p2.lng, p2.lat) IN CIRCLERANGE(POINT(p1.lng, p1.lat), ",epsilon,") WHERE p2.id < p1.id")
pairs = sql(sqlContext,sql)
head(pairs)
nrow(pairs)

Now We need to calculate the disk locations for each pair...


In [ ]:
centers <- SparkR:::map(pairs, calculateDisk)
d <- createDataFrame(sqlContext, centers)
head(d)
count(d)

Let's collect the data back to LatLong coordinates...


In [ ]:
centers_lnglat <- SparkR:::map(centers, transformCenters)
disks <- as.data.frame(createDataFrame(sqlContext,centers_lnglat))
names(disks) = c("id1","id2","lng1","lat1","lng2","lat2")
head(disks)
nrow(disks)

Let's have a look in the results...


In [ ]:
p = sort(unique(c(disks$id1,disks$id2)))
data2 = data[p,]
map = leaflet() %>% setView(lat = 39.990010, lng = 116.317406, zoom = 15) %>% addTiles() %>% 
        addCircles(lng=disks$lng1, lat=disks$lat1, weight=2, fillOpacity=0.25, color="red", radius = epsilon/2) %>%
        addCircles(lng=disks$lng2, lat=disks$lat2, weight=2, fillOpacity=0.25, color="red", radius = epsilon/2) %>%
        addCircleMarkers(lng=data$lng, lat=data$lat, weight=2, fillOpacity=1,radius = 2) %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius = 2) %>% addProviderTiles("Esri.WorldImagery", group = "ESRI") %>% addLayersControl(baseGroup = c("OSM(default", "ESRI"))

file = 'map2.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))

In [ ]: