In [1]:
#!/usr/bin/python
#coding=utf-8

采用Spark处理OpenStreetMap的osm文件。

说明:

  • 使用sc.read.json()读取json文件(osm-all2json从osm转换而来),生成Spark的DataFrame对象。
  • 查询从json文件创建的DataFrame对象,创建新的DataFrame。
  • 读取way的nd索引(Node的ID),并构建way的geometry对象。

后续:

  • 将数据保存到MongoDB/Hbase/HDFS等其它存储系统。
  • 将数据进行分块,保存为分区域的DataFrame数据集合。
  • 将DataFrame转换为GeoPandas.DataFrame,然后保存为shape files。
  • 将DataFrame直接转换为GIScript.Dataset,然后保存为UDB files。

In [2]:
from pprint import *
import pyspark
from pyspark import SparkConf, SparkContext

sc = None
print(pyspark.status)


<module 'pyspark.status' from '/home/supermap/anaconda3/envs/GISpark/lib/python3.5/site-packages/pyspark/status.py'>

配置环境SparkConf和创建SparkContext运行环境对象。


In [68]:
conf = (SparkConf()
         .setMaster("local")
         .setAppName("MyApp")
         .set("spark.executor.memory", "1g"))

if sc is None:
    sc = SparkContext(conf = conf)

print(type(sc))
print(sc)
print(sc.applicationId)


<class 'pyspark.context.SparkContext'>
<pyspark.context.SparkContext object at 0x7f10bb8c46a0>
local-1461408071018

显示Spark的配置信息。


In [67]:
print(conf)
conf_kv = conf.getAll()
pprint(conf_kv)


<pyspark.conf.SparkConf object at 0x7f10bb8bde80>
[('spark.master', 'local'),
 ('spark.app.name', 'MyApp'),
 ('spark.executor.memory', '1g'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.deployMode', 'client')]

Spark的文本RDD操作。

按照文本方式读取osm的json格式文件,将JSON字符串转为dict对象。


In [5]:
fl = sc.textFile("../data/muenchen.osm_node.json")
for node in fl.collect()[0:2]:
    node_dict = eval(node)
    pprint(node_dict)


{'changeset': '34651972',
 'id': '398692',
 'lat': '48.1452196',
 'lon': '11.5414971',
 'tag': {'k': 'tmc', 'v': 'DE:35375'},
 'timestamp': '2015-10-15T10:53:28Z',
 'uid': '2290263',
 'user': 'soemisch',
 'version': '20',
 'visible': 'true'}
{'changeset': '34904180',
 'id': '1956100',
 'lat': '48.1434822',
 'lon': '11.5487963',
 'tag': [{'k': 'tmc', 'v': 'DE:61453'},
         {'k': 'TMC:cid_58:tabcd_1:Class', 'v': 'Point'},
         {'k': 'TMC:cid_58:tabcd_1:Direction', 'v': 'positive'},
         {'k': 'TMC:cid_58:tabcd_1:LCLversion', 'v': '9.00'},
         {'k': 'TMC:cid_58:tabcd_1:LocationCode', 'v': '35356'},
         {'k': 'TMC:cid_58:tabcd_1:NextLocationCode', 'v': '35357'},
         {'k': 'TMC:cid_58:tabcd_1:PrevLocationCode', 'v': '35355'}],
 'timestamp': '2015-10-27T14:01:37Z',
 'uid': '2385132',
 'user': 'MENTZ_TU',
 'version': '43',
 'visible': 'true'}

从RDD中按照文本方式进行关键词查询。


In [28]:
lines = fl.filter(lambda line: "soemisch" in line)
print(lines.count())
print(lines.collect()[0])


27
{"id": "398692", "visible": "true", "version": "20", "changeset": "34651972", "timestamp": "2015-10-15T10:53:28Z", "user": "soemisch", "uid": "2290263", "lat": "48.1452196", "lon": "11.5414971", "tag": {"k": "tmc", "v": "DE:35375"}}

Spark的DataFrame操作。

使用SQL引擎直接生成Spark的DataFrame对象,支持查询等操作。

读取osm的node数据表。


In [6]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)
nodeDF = sqlContext.read.json("../data/muenchen.osm_node.json")
#print(nodeDF)
nodeDF.printSchema()


root
 |-- changeset: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- user: string (nullable = true)
 |-- version: string (nullable = true)
 |-- visible: string (nullable = true)

Spark DataFrame的 select() 操作。show()方法可以指定最多显示的记录数。


In [8]:
nodeDF.select("id","lat","lon","timestamp").show(10,True)
#help(nodeDF.show)


+----------+----------+----------+--------------------+
|        id|       lat|       lon|           timestamp|
+----------+----------+----------+--------------------+
|    398692|48.1452196|11.5414971|2015-10-15T10:53:28Z|
|   1956100|48.1434822|11.5487963|2015-10-27T14:01:37Z|
|  21565151|48.1414994|11.5522715|2012-03-01T20:37:08Z|
|  21585828|48.1445431|11.5384205|2011-10-30T16:47:12Z|
|  60300474|48.1406915|11.5502820|2011-11-20T13:24:04Z|
| 256554156|48.1431978|11.5197388|2009-09-10T10:34:54Z|
| 256554158|48.1432360|11.5170168|2012-03-24T14:42:27Z|
| 256554152|48.1420008|11.5383182|2011-10-08T19:22:24Z|
|1423405650|48.1398728|11.5447444|2015-05-04T23:26:30Z|
|1423405651|48.1399051|11.5444005|2011-09-04T20:47:20Z|
+----------+----------+----------+--------------------+
only showing top 10 rows

读取osm的way表。


In [80]:
wayDF = sqlContext.read.json("../data/muenchen.osm_way.json")
wayDF.printSchema()


root
 |-- changeset: string (nullable = true)
 |-- id: string (nullable = true)
 |-- nd: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ref: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- user: string (nullable = true)
 |-- version: string (nullable = true)
 |-- visible: string (nullable = true)

查看way表中的数据。


In [10]:
wayDF.select("id","tag","nd").show(10,True)


+--------+--------------------+--------------------+
|      id|                 tag|                  nd|
+--------+--------------------+--------------------+
|24665462|[{"k":"bicycle","...|[[21585827], [355...|
|24665463|[{"k":"highway","...|[[268098191], [24...|
|24665465|[{"k":"highway","...|[[268098203], [26...|
|24665467|[{"k":"bicycle","...|[[480314303], [73...|
|24699690|[{"k":"highway","...|[[268098194], [14...|
|26612843|[{"k":"highway","...|[[302715923], [21...|
|27571168|[{"k":"addr:postc...|[[302733437], [30...|
|27571239|[{"k":"addr:postc...|[[302734100], [30...|
|27571404|{"k":"building","...|[[302734471], [30...|
|27571623|                null|[[304021384], [30...|
+--------+--------------------+--------------------+
only showing top 10 rows

构建way的几何对象。

从way中的每一条记录生成NodeID的字符串列表,用于下一步查询node的坐标信息表。


In [90]:
def sepator():
    print("===============================================================")

#### 将给定way的nd对象的nodeID列表提取出来,并生成一个查询的过滤字符串。
def nodelist_way(nd_list):
    print("WayID:",nd_list["id"],"\tNode count:",len(nd_list["nd"]))
    ndFilter = "("
    for nd in  nd_list["nd"]:
        ndFilter = ndFilter + nd["ref"] + ","
    ndFilter = ndFilter.strip(',') + ")"
    print(ndFilter)
    return ndFilter

#### 根据way的节点ID从nodeDF中提取node信息,包含经纬度等坐标域。
def nodecoord_way(nodeID_list):
    nodeDF.registerTempTable("nodeDF")
    nodeset = sqlContext.sql("select id,lat,lon,timestamp from nodeDF where nodeDF.id in " + nodeID_list)
    nodeset.show(10,True)

将多个way的node信息查询出来。


In [92]:
for wayset in wayDF.select("id","nd").collect()[4:6]:
    ndFilter = nodelist_way(wayset)
    nodecoord_way(ndFilter)
#pprint(nd_list["nd"])
#sepator()


WayID: 24699690 	Node count: 8
(268098194,1485915069,1485915055,1485915052,1485915050,268472655,268101008,268472657)
+----------+----------+----------+--------------------+
|        id|       lat|       lon|           timestamp|
+----------+----------+----------+--------------------+
| 268098194|48.1438349|11.5407441|2011-10-30T16:47:12Z|
| 268472655|48.1437336|11.5405677|2011-10-30T16:47:13Z|
| 268101008|48.1437746|11.5400268|2011-01-05T21:53:15Z|
| 268472657|48.1439011|11.5383099|2011-10-30T16:47:13Z|
|1485915050|48.1437336|11.5405815|2011-10-30T16:46:33Z|
|1485915052|48.1437346|11.5405921|2011-10-30T16:46:33Z|
|1485915055|48.1437370|11.5406022|2011-10-30T16:46:33Z|
|1485915069|48.1437394|11.5406086|2011-10-30T16:46:33Z|
+----------+----------+----------+--------------------+

WayID: 26612843 	Node count: 4
(302715923,21632177,480314301,268098211)
+---------+----------+----------+--------------------+
|       id|       lat|       lon|           timestamp|
+---------+----------+----------+--------------------+
|268098211|48.1442398|11.5424654|2013-09-28T20:57:42Z|
|480314301|48.1441518|11.5436803|2013-09-28T20:57:42Z|
| 21632177|48.1441086|11.5442555|2013-09-28T20:57:42Z|
|302715923|48.1440385|11.5442873|2013-09-28T20:57:43Z|
+---------+----------+----------+--------------------+

将经纬度坐标转换为一个GeoJSON的几何对象表示,并保存回way的geometry字段。


In [ ]:

nodeDF.filter(nodeDF.id == ["268098191"]).show()

In [43]:
relationDF = sqlContext.read.json("../data/muenchen.osm_relation.json")
#print(relationDF)
relationDF.printSchema()
relationDF.show(10,True)


root
 |-- changeset: string (nullable = true)
 |-- id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- user: string (nullable = true)
 |-- version: string (nullable = true)
 |-- visible: string (nullable = true)

+---------+-------+--------------------+-------+-------------+-------+-------+
|changeset|     id|           timestamp|    uid|         user|version|visible|
+---------+-------+--------------------+-------+-------------+-------+-------+
| 29285581|  56955|2015-03-06T09:41:33Z|  52766|       sakudo|      4|   true|
| 22484051|  67194|2014-05-22T11:28:06Z|  14002|       Gehrke|      9|   true|
| 31961030|  54390|2015-06-14T12:19:12Z|  88164|          Med|     21|   true|
| 37014835|1785647|2016-02-05T08:39:05Z|2503913|         Cnny|     51|   true|
| 31607796|3005967|2015-05-31T13:53:17Z|  88164|          Med|      3|   true|
| 37979924|3005970|2016-03-21T16:15:23Z|3734915|       Bankel|      5|   true|
| 30556388|2316667|2015-04-27T18:59:50Z|1771836| degernfelder|      2|   true|
| 31455417|  63809|2015-05-25T19:32:40Z| 115042|Filius Martii|     46|   true|
| 20780441|1100799|2014-02-25T21:40:45Z|  72235|  Basstoelpel|     12|   true|
|  9212407|1739953|2011-09-04T20:47:32Z|  17085|     cfaerber|      1|   true|
+---------+-------+--------------------+-------+-------------+-------+-------+
only showing top 10 rows

查找指定关键词。

自定义函数处理。


In [9]:
def myFunc(s):
        words = s.split()
        return len(words)

#wc = fl.map(myFunc).collect()
wc = fl.map(myFunc).collect()
wc


Out[9]:
[4, 4, 4, 4, 4, 4]

In [27]:
#df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "result").load("../data/muenchen.osm")
#df

In [ ]: