In [1]:
#!/usr/bin/python
#coding=utf-8
by openthings@163.com,2016-4-23. License: GPL, MUST include this header.
后续:
- 将数据保存到MongoDB/Hbase/HDFS等其它存储系统。
- 将数据进行分块,保存为分区域的DataFrame数据集合。
- 将DataFrame转换为GeoPandas.DataFrame,然后保存为shape files。
- 将DataFrame直接转换为GIScript.Dataset,然后保存为UDB files。
In [2]:
from pprint import *
import pyspark
from pyspark import SparkConf, SparkContext
sc = None
print(pyspark.status)
In [68]:
conf = (SparkConf()
.setMaster("local")
.setAppName("MyApp")
.set("spark.executor.memory", "1g"))
if sc is None:
sc = SparkContext(conf = conf)
print(type(sc))
print(sc)
print(sc.applicationId)
In [67]:
print(conf)
conf_kv = conf.getAll()
pprint(conf_kv)
In [5]:
fl = sc.textFile("../data/muenchen.osm_node.json")
for node in fl.collect()[0:2]:
node_dict = eval(node)
pprint(node_dict)
In [28]:
lines = fl.filter(lambda line: "soemisch" in line)
print(lines.count())
print(lines.collect()[0])
In [6]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
nodeDF = sqlContext.read.json("../data/muenchen.osm_node.json")
#print(nodeDF)
nodeDF.printSchema()
In [8]:
nodeDF.select("id","lat","lon","timestamp").show(10,True)
#help(nodeDF.show)
In [80]:
wayDF = sqlContext.read.json("../data/muenchen.osm_way.json")
wayDF.printSchema()
In [10]:
wayDF.select("id","tag","nd").show(10,True)
In [90]:
def sepator():
print("===============================================================")
#### 将给定way的nd对象的nodeID列表提取出来,并生成一个查询的过滤字符串。
def nodelist_way(nd_list):
print("WayID:",nd_list["id"],"\tNode count:",len(nd_list["nd"]))
ndFilter = "("
for nd in nd_list["nd"]:
ndFilter = ndFilter + nd["ref"] + ","
ndFilter = ndFilter.strip(',') + ")"
print(ndFilter)
return ndFilter
#### 根据way的节点ID从nodeDF中提取node信息,包含经纬度等坐标域。
def nodecoord_way(nodeID_list):
nodeDF.registerTempTable("nodeDF")
nodeset = sqlContext.sql("select id,lat,lon,timestamp from nodeDF where nodeDF.id in " + nodeID_list)
nodeset.show(10,True)
In [92]:
for wayset in wayDF.select("id","nd").collect()[4:6]:
ndFilter = nodelist_way(wayset)
nodecoord_way(ndFilter)
#pprint(nd_list["nd"])
#sepator()
In [ ]:
In [43]:
relationDF = sqlContext.read.json("../data/muenchen.osm_relation.json")
#print(relationDF)
relationDF.printSchema()
relationDF.show(10,True)
In [9]:
def myFunc(s):
words = s.split()
return len(words)
#wc = fl.map(myFunc).collect()
wc = fl.map(myFunc).collect()
wc
Out[9]:
In [27]:
#df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "result").load("../data/muenchen.osm")
#df
In [ ]: