RosbagInputFormat is an open source splitable Hadoop InputFormat for the rosbag file format.
Example data can be found for instance at https://github.com/udacity/self-driving-car/tree/master/datasets published under MIT License.
The code you cloned is located in /opt/ros_hadoop/master
while the latest release is in /opt/ros_hadoop/latest
../lib/rosbaginputformat.jar is a symlink to a recent version. You can replace it with the version you would like to test.
java -jar ../lib/rosbaginputformat.jar --version -f /opt/ros_hadoop/master/dist/HMB_4.bag
The index is a very very small configuration file containing a protobuf array that will be given in the job configuration.
Note that the operation will not process and it will not parse the whole bag file, but will simply seek to the required offset.
In [12]:
%%bash
echo -e "Current working directory: $(pwd)\n\n"
tree -d -L 2 /opt/ros_hadoop/
In [9]:
%%bash
# assuming you start the notebook in the doc/ folder of master (default Dockerfile build)
java -jar ../lib/rosbaginputformat.jar -f /opt/ros_hadoop/master/dist/HMB_4.bag
This will generate a very small file named HMB_4.bag.idx.bin in the same folder.
Using your favorite tool put the bag file in your working HDFS folder.
Note: keep the index json file as configuration to your jobs, do not put small files in HDFS.
For convenience we already provide an example file (/opt/ros_hadoop/master/dist/HMB_4.bag) in the HDFS under /user/root/
hdfs dfs -put /opt/ros_hadoop/master/dist/HMB_4.bag
hdfs dfs -ls
In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sparkConf = SparkConf()
sparkConf.setMaster("local[*]")
sparkConf.setAppName("ros_hadoop")
sparkConf.set("spark.jars", "../lib/protobuf-java-3.3.0.jar,../lib/rosbaginputformat.jar,../lib/scala-library-2.11.8.jar")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
In [2]:
fin = sc.newAPIHadoopFile(
path = "hdfs://127.0.0.1:9000/user/root/HMB_4.bag",
inputFormatClass = "de.valtech.foss.RosbagMapInputFormat",
keyClass = "org.apache.hadoop.io.LongWritable",
valueClass = "org.apache.hadoop.io.MapWritable",
conf = {"RosbagInputFormat.chunkIdx":"/opt/ros_hadoop/master/dist/HMB_4.bag.idx.bin"})
To interpret the messages we need the connections.
We could get the connections as configuration as well. At the moment we decided to collect the connections into Spark driver in a dictionary and use it in the subsequent RDD actions. Note in the next version of the RosbagInputFormater alternative implementations will be given.
In [3]:
conn_a = fin.filter(lambda r: r[1]['header']['op'] == 7).map(lambda r: r[1]).collect()
conn_d = {str(k['header']['topic']):k for k in conn_a}
# see topic names
conn_d.keys()
Out[3]:
In [6]:
%run -i ../src/main/python/functions.py
Python rosbag.bag needs to be installed on all Spark workers. The msg_map function (from src/main/python/functions.py) takes three arguments:
In [7]:
%matplotlib nbagg
# use %matplotlib notebook in python3
from functools import partial
import pandas as pd
import numpy as np
In [8]:
# Take messages from '/imu/data' topic using default str func
rdd = fin.flatMap(
partial(msg_map, conn=conn_d['/imu/data'])
)
print(rdd.take(1)[0])
In [10]:
from PIL import Image
from io import BytesIO
res = fin.flatMap(
partial(msg_map, func=lambda r: r.data, conn=conn_d['/center_camera/image_color/compressed'])
).take(50)
Image.open(BytesIO(res[48]))
Out[10]:
In [11]:
def f(msg):
return (msg.header.stamp.secs, msg.fuel_level)
d = fin.flatMap(
partial(msg_map, func=f, conn=conn_d['/vehicle/fuel_level_report'])
).toDF().toPandas()
d.set_index('_1').plot()
Out[11]:
In [12]:
def f(msg):
from keras.layers import dot, Dot, Input
from keras.models import Model
linear_acceleration = {
'x': msg.linear_acceleration.x,
'y': msg.linear_acceleration.y,
'z': msg.linear_acceleration.z,
}
linear_acceleration_covariance = np.array(msg.linear_acceleration_covariance)
i1 = Input(shape=(3,))
i2 = Input(shape=(3,))
o = dot([i1,i2], axes=1)
model = Model([i1,i2], o)
# return a tuple with (numpy dot product, keras dot "predict")
return (
np.dot(linear_acceleration_covariance.reshape(3,3),
[linear_acceleration['x'], linear_acceleration['y'], linear_acceleration['z']]),
model.predict([
np.array([[ linear_acceleration['x'], linear_acceleration['y'], linear_acceleration['z'] ]]),
linear_acceleration_covariance.reshape((3,3))])
)
fin.flatMap(partial(msg_map, func=f, conn=conn_d['/vehicle/imu/data_raw'])).take(5)
# tuple with (numpy dot product, keras dot "predict")
Out[12]:
One can of course sample and collect the data in the driver to train a model.
Note that the msg is the most granular unit but you could of course replace the flatMap with a mapPartitions
Another option would be to have a map.reduceByKey before the flatMap so that the function argument would be a whole interval instead of a msg.