REST API for IoT Back Brace - WebServiceToKafka.py

This script creates a Flask web api running on port 5000 that listens for post requests from the IoT Back Brace.

Once the post is received, a Kafka message is created that will be subsequently injested by a Spark Streaming job.


In [1]:
from flask import Flask, Response, request, json, render_template
from kafka import KafkaProducer
import uuid
import datetime

app = Flask(__name__)

producer = KafkaProducer(bootstrap_servers='localhost:9092')

The default end point is simply there to "document" the API if the root endpoint of the API is called with a get method. This will show the JSON objects expected by the post methods.


In [2]:
# Default end point
@app.route('/', methods = ['GET'])
def api_root():
	data = {
		"title":"IOT Back Brace REST API",
		"sensorReading":{
					"deviceID":"5d681c54e66ff4a5654e55c6d5a5b54",
					"metricTypeID":6,
					"uomID":4,
					"actual":{"y":18,"p":17.50,"r":120},
					"setPoints":{"y":25,"p":45,"r":10}
				 },
		"trainingReading":{
					"deviceID":"5d681c54e66ff4a5654e55c6d5a5b54",
					"metricTypeID":6,
					"uomID":4,
					"currentPostureID":2,
					"actual":{"y":18,"p":17.50,"r":120},
					"setPoints":{"y":25,"p":45,"r":100}
				 }
	}
	
	try:
		print(request.headers)
		return render_template("index.html", data = data )
	except Exception, e:
		return str(e)

The /LumbarSensorReading endpoint expects a POST method with a JSON object in the data payload. A unique reading identifier and the current time attributes are generated and added to the JSON packet before sending.

The JSON packet is then sent to Kafka using the "LumbarSensorReadings" topic.


In [3]:
#  End point for posting sensor readings.
@app.route('/LumbarSensorReading',  methods = ['POST'])
def post_readings():
	if request.headers['Content-Type'] == 'application/json':
		
		# Create readingTime
		readingTime = datetime.datetime.now().isoformat()
		
		# Create readingID
		readingID = str(uuid.uuid4())
		
		# Add these to json object
		request.json['readingTime'] = readingTime
		request.json['readingID'] = readingID
		
		# Send to Kafka producer
		producer.send('LumbarSensorReadings', json.dumps(request.json))
		return "JSON Message: " + json.dumps(request.json)
	else:
		return "415 Unsupported Media Type"

The /LumbarSensorTraining endpoint expects a POST method with a JSON object in the data payload. A unique reading identifier and the current time attributes are generated and added to the JSON packet before sending.

The JSON packet is then sent to Kafka using the "LumbarSensorTraining" topic.


In [4]:
# End point for training the Machine Learning Model
@app.route('/LumbarSensorTraining',  methods = ['POST'])
def post_trainingData():
	if request.headers['Content-Type'] == 'application/json':
	
		# Create readingTime
		readingTime = datetime.datetime.now().isoformat()
		
		# Create readingID
		readingID = str(uuid.uuid4())
		
		# Add these to json object
		request.json['readingTime'] = readingTime
		request.json['readingID'] = readingID
		
		# Send to Kafka producer
		producer.send('LumbarSensorTraining', json.dumps(request.json))
		print(request.headers)

		return "JSON Message: " + json.dumps(request.json)
	else:
		return "415 Unsupported Media Type"

In [ ]:
if __name__ == '__main__':
    app.run(host="0.0.0.0",debug=1)