Skip to content

Commit

Permalink
Merge pull request #1 from vasco-da-gama/master
Browse files Browse the repository at this point in the history
sbt dependencies and cleanup
  • Loading branch information
Vasco-da-Gama authored Jun 23, 2017
2 parents dd5a36b + f851d3d commit 4302fce
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 655 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
.ipynb_checkpoints/
metastore_db/
target/
project/
derby.log
*bag
*bin
Expand Down
65 changes: 47 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,46 +1,75 @@
**RosbagInputFormat** is an open source **splitable** Hadoop InputFormat for the rosbag file format.

## Usage from Spark (pyspark)
# Usage from Spark (pyspark)

### Check that the rosbag file version is V2.0
## Check that the rosbag file version is V2.0
```bash
java -jar lib/rosbaginputformat_2.11-0.9.0-SNAPSHOT.jar --version -f HMB_1.bag
```

### Compute the index
The index is a very very small configuration file containing a protobuf array that will be given in the job configuration. **Note** the operation **will not** process and it **will not** parse the whole bag file, but will simply seek to the required offset.
## Extract the index as configuration
The index is a very very small configuration file containing a protobuf array that will be given in the job configuration.

**Note** that the operation **will not** process and it **will not** parse the whole bag file, but will simply seek to the required offset.
```bash
java -jar lib/rosbaginputformat_2.11-0.9.0-SNAPSHOT.jar -f HMB_1.bag
```

### Copy the bag file in HDFS
## Copy the bag file in HDFS

Using your favorite tool put the bag file in your working HDFS folder.

Using your favorite tool put the bag file in your working HDFS folder.
**Note:** keep the index json file as configuration to your jobs, **do not** put small files in HDFS.

Assuming your HDFS working folder is /user/spark/ (e.g. hdfs dfs -mkdir /user/spark/)
```bash
hdfs dfs -put data/HMB_1.bag data/
hdfs dfs -ls data/
hdfs dfs -put data/HMB_1.bag
hdfs dfs -ls
```

### Process the ros bag file in Spark using the RosbagInputFormat
## Process the ros bag file in Spark using the RosbagInputFormat

**Note:** your HDFS address might differ.
```python
fin = sc.newAPIHadoopFile(
path = "hdfs://127.0.0.1:9000/user/spark/HMB_1.bag",
inputFormatClass = "de.valtech.foss.RosbagMapInputFormat",
keyClass = "org.apache.hadoop.io.LongWritable",
path = "hdfs://127.0.0.1:9000/user/spark/HMB_1.bag",
inputFormatClass = "de.valtech.foss.RosbagMapInputFormat",
keyClass = "org.apache.hadoop.io.LongWritable",
valueClass = "org.apache.hadoop.io.MapWritable",
conf = {"RosbagInputFormat.chunkIdx":"./HMB_1.bag.idx.bin"})
```

### Interpret the Messages
To interpret the messages we need the connections.
We could get the connections as configuration as well. At the moment we decided to collect the connections into Spark driver in a dictionary and use it in the subsequent RDD actions. Note in the next version of the RosbagInputFormater alternative implementations will be given
Collect the connections from all Spark partitions of the bag file into the Spark driver
## Interpret the Messages
To interpret the messages we need the connections.

```python
conn_d = {str(k['topic']):k for k in fin.filter(lambda r: r[1]['header']['op'] == 7).map(lambda r: r[1]['header']).collect()}
We could get the connections as configuration as well. At the moment we decided to collect the connections into Spark driver in a dictionary and use it in the subsequent RDD actions. Note in the next version of the RosbagInputFormater alternative implementations will be given.

### Collect the connections from all Spark partitions of the bag file into the Spark driver

```python
conn_a = fin.filter(lambda r: r[1]['header']['op'] == 7).map(lambda r: r[1]).collect()
conn_d = {str(k['header']['topic']):k for k in conn_a}
# see topic names
conn_d.keys()
```

### Load the python map functions from src/main/python/functions.py
```python
%run -i src/main/python/functions.py
```

### Use of msg_map to apply a function on all messages
Python **rosbag.bag** needs to be installed on all Spark workers.
The msg_map function (from src/main/python/functions.py) takes three arguments:
1. r = the message or RDD record Tuple
2. func = a function (default str) to apply to the ROS message
3. conn = a connection to specify what topic to process

```python
from functools import partial

# Take 3 messages from '/imu/data' topic using default str func
rdd.flatMap(
partial(msg_map, conn=conn_d['/imu/data'])
).take(3)
```
Empty file added doc/ToDo
Empty file.
Binary file modified lib/rosbaginputformat_2.11-0.9.0-SNAPSHOT.jar
Binary file not shown.
5 changes: 5 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import sbt._

object Dependencies {
lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.0.1"
}
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=0.13.13
Loading

0 comments on commit 4302fce

Please sign in to comment.