Kafka Producer Test Script

Creates mock readings for testing the back end processing if an IoTBB device is not available.

This script allows you to test out the functionality of the Spark Streaming and machine learing components of the project.


In [1]:
import time
from kafka import KafkaProducer
import json
import random
import csv
import uuid
import datetime
"""
Usage:  bin/spark-submit ~/spark/kafkaProducrerTest.py
"""
producer = KafkaProducer(bootstrap_servers='localhost:9092')
pitch = 0
position = 0

The "getRandomPitch" method will generate a mock "reading" between a min and max range. The position can be arbitrarily defined or if you want to be very accurate, the debug output of the machine learning model can be used here.


In [2]:
def getRandomPitch(min,max):
    pitch = random.uniform(min,max)
    if pitch < -25:
        position=2
    elif pitch < -15:
        position=1
    else:
        position=0
    return pitch, position

The "sendSensorReadings" method creates mock "actual" readings. A random reading is generated and a JSON packet is created and sent to Kafka using the "LumbarSensorReadings" topic.


In [3]:
def sendSensorReadings(iterations):
	for i in range(iterations):
		readingID = str(uuid.uuid4())
                pitch, position = getRandomPitch(-50,25)
                if pitch <= -15:
                        if pitch <= -25:
                                position = 2.0
                        elif pitch > -25:
                                if pitch <= -15:
                                        position = 1.0
		packet = {
				"readingID":readingID,
				"deviceID":"5d681c54e66ff4a5654e55c6d5a5b54",
				"readingTime":datetime.datetime.now().isoformat(),
				"metricTypeID":6,
				"uomID":4,
				"actual":{"y":-30,"p":pitch,"r":120},
				"setPoints":{"y":25,"p":45,"r":100},
				"prevAvg":{"y":15,"p":40,"r":88}
			 }
		print(packet) 
		producer.send('LumbarSensorReadings', json.dumps(packet))

The "sendSensorTrainingReadings" method creates mock Training readings that are subesquently used in the machine learning model generation. A random reading is generated with a known posture and a JSON packet is created and sent to Kafka using the "LumbarSensorTrainingReadings" topic.


In [4]:
def sendSensorTrainingReadings(iterations):
	
	for i in range(iterations): 
		pitch, position = getRandomPitch(-50,25)
		if pitch <= -15:
			if pitch <= -25:
				position = 2.0
			elif pitch > -25:
				if pitch <= -15:
					position = 1.0
		packet = {
					"deviceID":"5d681c54e66ff4a5654e55c6d5a5b54",
					"positionID":position,
					"readingTime":"2016-07-25T15:45:07.12",
					"metricTypeID":6,
					"uomID":4,
					"actual":{"y":18,"p":pitch,"r":120},
					"setPoints":{"y":25,"p":45,"r":100},
					"prevAvg":{"y":15,"p":40,"r":88}
				 }
		producer.send('LumbarSensorTrainingReadings', json.dumps(packet))

The main method calls the methods with a defined number of iterations.


In [ ]:
def main():
	sendSensorReadings(50)
	sendSensorTrainingReadings(50)


if __name__ == "__main__":
    main()