In [1]:
    
#!/usr/bin/python
#coding=utf-8
    
by openthings@163.com,2016-4-23. License: GPL, MUST include this header.
后续:
- 将数据保存到MongoDB/Hbase/HDFS等其它存储系统。
 - 将数据进行分块,保存为分区域的DataFrame数据集合。
 - 将DataFrame转换为GeoPandas.DataFrame,然后保存为shape files。
 - 将DataFrame直接转换为GIScript.Dataset,然后保存为UDB files。
 
In [2]:
    
from pprint import *
import pyspark
from pyspark import SparkConf, SparkContext
sc = None
print(pyspark.status)
    
    
In [68]:
    
conf = (SparkConf()
         .setMaster("local")
         .setAppName("MyApp")
         .set("spark.executor.memory", "1g"))
if sc is None:
    sc = SparkContext(conf = conf)
print(type(sc))
print(sc)
print(sc.applicationId)
    
    
In [67]:
    
print(conf)
conf_kv = conf.getAll()
pprint(conf_kv)
    
    
In [5]:
    
fl = sc.textFile("../data/muenchen.osm_node.json")
for node in fl.collect()[0:2]:
    node_dict = eval(node)
    pprint(node_dict)
    
    
In [28]:
    
lines = fl.filter(lambda line: "soemisch" in line)
print(lines.count())
print(lines.collect()[0])
    
    
In [6]:
    
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
nodeDF = sqlContext.read.json("../data/muenchen.osm_node.json")
#print(nodeDF)
nodeDF.printSchema()
    
    
In [8]:
    
nodeDF.select("id","lat","lon","timestamp").show(10,True)
#help(nodeDF.show)
    
    
In [80]:
    
wayDF = sqlContext.read.json("../data/muenchen.osm_way.json")
wayDF.printSchema()
    
    
In [10]:
    
wayDF.select("id","tag","nd").show(10,True)
    
    
In [90]:
    
def sepator():
    print("===============================================================")
#### 将给定way的nd对象的nodeID列表提取出来,并生成一个查询的过滤字符串。
def nodelist_way(nd_list):
    print("WayID:",nd_list["id"],"\tNode count:",len(nd_list["nd"]))
    ndFilter = "("
    for nd in  nd_list["nd"]:
        ndFilter = ndFilter + nd["ref"] + ","
    ndFilter = ndFilter.strip(',') + ")"
    print(ndFilter)
    return ndFilter
#### 根据way的节点ID从nodeDF中提取node信息,包含经纬度等坐标域。
def nodecoord_way(nodeID_list):
    nodeDF.registerTempTable("nodeDF")
    nodeset = sqlContext.sql("select id,lat,lon,timestamp from nodeDF where nodeDF.id in " + nodeID_list)
    nodeset.show(10,True)
    
In [92]:
    
for wayset in wayDF.select("id","nd").collect()[4:6]:
    ndFilter = nodelist_way(wayset)
    nodecoord_way(ndFilter)
#pprint(nd_list["nd"])
#sepator()
    
    
In [ ]:
    
    
In [43]:
    
relationDF = sqlContext.read.json("../data/muenchen.osm_relation.json")
#print(relationDF)
relationDF.printSchema()
relationDF.show(10,True)
    
    
In [9]:
    
def myFunc(s):
        words = s.split()
        return len(words)
#wc = fl.map(myFunc).collect()
wc = fl.map(myFunc).collect()
wc
    
    Out[9]:
In [27]:
    
#df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "result").load("../data/muenchen.osm")
#df
    
In [ ]: