We need some libraries and initial parameters...


In [4]:
library(SparkR)
library(leaflet)
library(sp)
options(digits=15)


the_lat = 39.976057
the_lng = 116.330243
the_zoom = 15
epsilon = 200
source("pbfe.R")


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var

The following objects are masked from ‘package:base’:

    colnames, colnames<-, endsWith, intersect, rank, rbind, sample,
    startsWith, subset, summary, table, transform

Let load some data...


In [5]:
data = read.csv("sample_small.csv")
names(data) = c("ID","lat","lng")
head(data)


IDlatlng
1 39.5905830 116.2719450
2 39.6334840 116.7282400
3 39.6558133 116.6641916
4 39.6596780 116.6711490
5 39.6654583 116.3098399
6 39.6728416 116.3086649

and visualize them in a map...


In [6]:
map = leaflet() %>%
  addTiles() %>%
  addCircleMarkers(lng=data$lng, lat=data$lat,weight=2,fillOpacity=1,color="blue",radius=2)

file = 'map1.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))


Connecting with Simba and getting a SQLContext...


In [7]:
sc <- sparkR.init("local[*]", "SparkR")
sqlContext <- sparkRSQL.init(sc)


Launching java with spark-submit command /opt/Simba/bin/spark-submit   sparkr-shell /tmp/RtmpzeJA5w/backend_port58fb4c057ff4 

Let read the data into Simba and apply a transformation...


In [8]:
dataRDD = SparkR:::textFile(sc,"sample_small.csv")
dataRDD = SparkR:::map(dataRDD, transformCoords)

Now We create a DataFrame and cache the data...


In [9]:
schema <- structType(structField("id", "double"), structField("lng", "double"), structField("lat", "double"))
points <- createDataFrame(sqlContext, dataRDD, schema = schema)
cache(points)


DataFrame[id:double, lng:double, lat:double]

Let's have a look...


In [10]:
head(points)
count(points)


idlnglat
0 -300629.9302907524419848.70124370
1 -336418.9337940244429641.88724925
2 -296625.4025613554430225.49965138
3 -301876.5474525004433286.74767265
4 -301232.9041596294433654.10554999
5 -332247.2125765584437621.06989079
899

Now, We need register a pair of temporal tables...


In [11]:
registerTempTable(points, "p1")
registerTempTable(points, "p2")

It is time to execute the SQL statement...

SELECT
    * 
FROM 
    p1 
DISTANCE JOIN 
    p2
ON 
    POINT(p2.lng, p2.lat) IN CIRCLERANGE(POINT(p1.lng, p1.lat), epsilon)
WHERE 
    p2.id < p1.id

In [12]:
sql = paste0("SELECT * FROM p1 DISTANCE JOIN p2 ON POINT(p2.lng, p2.lat) IN CIRCLERANGE(POINT(p1.lng, p1.lat), ",epsilon,") WHERE p2.id < p1.id")
pairs = sql(sqlContext,sql)
head(pairs)
nrow(pairs)


idlnglatidlnglat
26 -332354.6972521514456150.25607556 24 -332269.4098346874456108.23475632
45 -334252.4270143544459698.61791281 44 -334344.7314288494459655.45188833
46 -334242.7453464364459715.58979402 44 -334344.7314288494459655.45188833
46 -334242.7453464364459715.58979402 45 -334252.4270143544459698.61791281
76 -332954.8985128074460728.70522263 74 -333132.9223859184460639.54370124
121 -353770.2694354424466969.59768473 120 -353766.6524938624466963.17659202
923

Now We need to calculate the disk locations for each pair...


In [13]:
centers <- SparkR:::map(pairs, calculateDisk)
schema <- structType(structField("id1", "double"), structField("id2", "double"), structField("lng1", "double"), structField("lat1", "double"), structField("lng2", "double"), structField("lat2", "double"))
d <- createDataFrame(sqlContext, centers, schema = schema)
head(d)
count(d)


id1id2lng1lat1lng2lat2
26 24 -332273.1701484524456208.16403151 -332350.9369383864456050.32680037
45 44 -334262.1282117164459599.08959136 -334335.0302314874459754.98020978
46 44 -334252.8011970144459616.09667934 -334334.6755782714459754.94500301
46 45 -334161.1409175824459657.79085639 -334334.0314432084459756.41685044
76 74 -333039.6755602484460675.66889331 -333048.1453384774460692.58003055
121 120 -353681.3920350674467015.43225359 -353855.5298942374466917.34202316
923

Let's collect the data back to LatLong coordinates...


In [14]:
centers_lnglat <- SparkR:::map(centers, transformCenters)
disks <- as.data.frame(createDataFrame(sqlContext,centers_lnglat))
names(disks) = c("id1","id2","lng1","lat1","lng2","lat2")
head(disks)
nrow(disks)


id1id2lng1lat1lng2lat2
26 24 116.28617230792039.8304366534153116.28547621686139.8289598550172
45 44 116.25897956039039.8585969529074116.25794235208939.8599094498003
46 44 116.25906543037539.8587570188828116.25794648142439.8599094837162
46 45 116.26006828265639.8592166161994116.25795203476039.8599231771293
76 74 116.27169661016139.8693462236874116.27157763091439.8694880711692
121 120 116.02580757143439.9052021013708116.02392952863239.9041577846355
923

Let's have a look at the results...


In [15]:
p = sort(unique(c(disks$id1,disks$id2)))
data2 = data[p,]
map = leaflet() %>% setView(lat = the_lat, lng = the_lng, zoom = the_zoom) %>% addTiles() %>% 
        addCircles(lng=disks$lng1, lat=disks$lat1, weight=2, fillOpacity=0.25, color="red", radius = epsilon/2) %>%
        addCircles(lng=disks$lng2, lat=disks$lat2, weight=2, fillOpacity=0.25, color="red", radius = epsilon/2) %>%
        addCircleMarkers(lng=data$lng, lat=data$lat, weight=2, fillOpacity=1,radius = 2) %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius = 2, popup=paste(data2$ID)) %>% 
        addProviderTiles("Esri.WorldImagery", group = "ESRI") %>% 
        addLayersControl(baseGroup = c("OSM(default)", "ESRI"))

file = 'map2.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))



In [16]:
registerTempTable(d, "d")
registerTempTable(points, "p")

In [17]:
sql = paste0("SELECT d.lng1 AS lng, d.lat1 AS lat, id AS id_member FROM d DISTANCE JOIN p ON POINT(p.lng, p.lat) IN CIRCLERANGE(POINT(d.lng1, d.lat1), ",(epsilon/2)+0.01,")")
mdisks = sql(sqlContext,sql)
registerTempTable(mdisks, "m")
sql = "SELECT lng, lat FROM m GROUP BY lng, lat HAVING count(id_member) >= 3"
mdisks1 = sql(sqlContext,sql)

In [18]:
sql = paste0("SELECT d.lng2 AS lng, d.lat2 AS lat, id AS id_member FROM d DISTANCE JOIN p ON POINT(p.lng, p.lat) IN CIRCLERANGE(POINT(d.lng2, d.lat2), ",(epsilon/2)+0.01,")")
mdisks = sql(sqlContext,sql)
registerTempTable(mdisks, "m")
sql = "SELECT lng, lat FROM m GROUP BY lng, lat HAVING count(id_member) >= 3"
mdisks2 = sql(sqlContext,sql)

In [19]:
mdisks = as.data.frame(rbind(mdisks1, mdisks2))
id = seq(1,nrow(mdisks))
mdisks$id = id

coordinates(mdisks) = ~lng+lat
proj4string(mdisks) = mercator
mdisks = spTransform(mdisks, wgs84)
mdisks$lng1 = coordinates(mdisks)[,1]
mdisks$lat1 = coordinates(mdisks)[,2]

In [20]:
map = leaflet() %>% setView(lat = the_lat, lng = the_lng, zoom = the_zoom) %>% addTiles() %>% 
        addCircles(lng=mdisks$lng1, lat=mdisks$lat1, weight=2, fillOpacity=0.25, color="blue", radius = epsilon/2) %>%
        addCircleMarkers(lng=data$lng, lat=data$lat, weight=2, fillOpacity=1,radius = 2) %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius = 2) %>% 
        addProviderTiles("Esri.WorldImagery", group = "ESRI") %>% 
        addLayersControl(baseGroup = c("OSM(default)", "ESRI"))

file = 'map3.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))



In [21]:
v1 = "656,659,660"

v1 = as.numeric(unlist(strsplit(v1,',')))

v1

v2 = c(656,657,659,660)

v2

prod(is.element(v1, v2))

4:3


  1. 656
  2. 659
  3. 660
  1. 656
  2. 657
  3. 659
  4. 660
1
  1. 4
  2. 3

In [22]:
m <- as.data.frame(rbind(mdisks1, mdisks2))
m$id = seq(1,nrow(m))
m = createDataFrame(sqlContext, m)
head(m)
count(m)
registerTempTable(m, "m")

head(points)
count(points)

sql = paste0("SELECT m.id AS mid, p.id AS pid FROM m DISTANCE JOIN p ON POINT(p.lng, p.lat) IN CIRCLERANGE (POINT(m.lng, m.lat), ",(epsilon/2)+0.01,") ORDER BY mid, pid")
t = sql(sqlContext,sql)
head(t, 30)
count(t)


lnglatid
-326817.2491651094473000.32009028 1
-326733.4791148374472020.94361649 2
-326441.6770068374473064.94544083 3
-326769.9033811074472499.05395445 4
-326466.4606296244473654.18613708 5
-326636.7269381504474740.14306566 6
1248
idlnglat
0 -300629.9302907524419848.70124370
1 -336418.9337940244429641.88724925
2 -296625.4025613554430225.49965138
3 -301876.5474525004433286.74767265
4 -301232.9041596294433654.10554999
5 -332247.2125765584437621.06989079
899
midpid
1 569
1 584
1 591
2 447
2 451
2 457
2 458
2 464
2 467
2 470
3 587
3 590
3 610
4 528
4 529
4 532
4 536
5 656
5 659
5 660
5 665
5 670
5 679
6 721
6 724
6 731
6 736
7 553
7 561
7 566
5573

In [23]:
library(sqldf)


Loading required package: gsubfn
Loading required package: proto
Loading required package: RSQLite
Loading required package: DBI

In [24]:
t = as.data.frame(t)
g = sqldf("SELECT mid, group_concat(CAST(pid AS INT)) AS pids FROM t GROUP BY mid ORDER BY count(pid)")
head(g)
nrow(g)
tail(g)


Loading required package: tcltk
midpids
1 569,584,591
3 587,590,610
9 864,865,867
10 550,551,556
16 618,627,632
17 698,699,708
1248
midpids
1243 594 403,405,406,407,409,411,412,413,415,417
1244 722 403,405,406,407,409,411,412,413,415,417
1245 730 403,405,406,407,409,411,412,413,415,417
1246 907 403,405,406,407,409,411,412,413,415,417
12471024 438,443,444,447,451,457,458,464,467,470
12481214 438,443,444,447,451,457,458,464,467,470

In [25]:
n = c()
r = nrow(g)
for(i in 1:r){
    disk_i = as.numeric(unlist(strsplit(g[i,2], ',')))
    flag = 1
    for(j in (i+1):r){
        disk_j = as.numeric(unlist(strsplit(g[j,2], ',')))
        # print(paste0("Disk 1: ", disk_i, " Disk 2: ", disk_j, " Result: ", is.element(disk_i, disk_j)))
              
        if(prod(is.element(disk_i, disk_j)) == 1){
            flag = 0
            break
        }
    }
    if(flag == 1){
        n = c(n, i)
    }
}
n = c(n, r)
n = unique(n)

In [26]:
g = g[n,]

m = as.data.frame(m)
maximal = sqldf("SELECT lng, lat, pids FROM g JOIN m ON(id = mid)")
head(maximal)
nrow(maximal)

coordinates(maximal) = ~lng+lat
proj4string(maximal) = mercator
maximal = spTransform(maximal, wgs84)
maximal$lng1 = coordinates(maximal)[,1]
maximal$lat1 = coordinates(maximal)[,2]


lnglatpids
-326512.3922495774471574.61187925 394,403,405
-326924.0996868264473135.22402740 584,591,619
-316480.1644359324472109.23487005 614,626,633
-325995.0461849774472418.12312925 523,535,537
-327206.7902347724475419.52784507 753,754,759
-326924.9326821564471864.92765717 416,418,432
161

In [27]:
map = leaflet() %>% setView(lat = the_lat, lng = the_lng, zoom = the_zoom) %>% addTiles() %>% 
        addCircles(lng=maximal$lng1, lat=maximal$lat1, weight=2, fillOpacity=0.25, color="orange", radius = epsilon/2, popup = maximal$pids) %>%
        addCircleMarkers(lng=data$lng, lat=data$lat, weight=2, fillOpacity=1,radius = 2) %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius = 2) %>% 
        addProviderTiles("Esri.WorldImagery", group = "ESRI") %>% 
        addLayersControl(baseGroup = c("OSM(default)", "ESRI"))

file = 'map4.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))



In [29]:
map = leaflet() %>% setView(lat = the_lat, lng = the_lng, zoom = the_zoom) %>% addTiles() %>% 
        addCircleMarkers(lng=data$lng, lat=data$lat, weight=2, fillOpacity=1,radius = 2, group = "Points") %>%
        
        addCircles(lng=disks$lng1, lat=disks$lat1, weight=2, fillOpacity=0.10, color="red", radius = epsilon/2, group = "Initial set") %>%
        addCircles(lng=disks$lng2, lat=disks$lat2, weight=2, fillOpacity=0.10, color="red", radius = epsilon/2, group = "Initial set") %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius=2, group = "Initial set") %>% 
        
        addCircles(lng=mdisks$lng1, lat=mdisks$lat1, weight=2, fillOpacity=0.20, color="blue", radius = epsilon/2, group = "Prune less than mu") %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius=2, group = "Prune less than mu") %>% 
        
        addCircles(lng=maximal$lng1, lat=maximal$lat1, weight=2, fillOpacity=0.40, color="orange", radius = epsilon/2, popup = maximal$pids, group = "Prune redundant") %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius=2, group = "Prune redundant") %>% 
        
        addProviderTiles("Esri.WorldImagery", group = "ESRI") %>% 
        addLayersControl(baseGroup = c("OSM(default)", "ESRI"),
                        overlayGroups = c("Initial set", "Prune less than mu", "Prune redundant", "Points"),
                        options = layersControlOptions(collapsed = FALSE, autoZIndex = FALSE)) 


file = 'map5.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))



In [ ]: