In [1]:
import time
from kafka import KafkaProducer
import json
import random
import csv
import uuid
import datetime
"""
Usage: bin/spark-submit ~/spark/kafkaProducrerTest.py
"""
producer = KafkaProducer(bootstrap_servers='localhost:9092')
pitch = 0
position = 0
The "getRandomPitch" method will generate a mock "reading" between a min and max range. The position can be arbitrarily defined or if you want to be very accurate, the debug output of the machine learning model can be used here.
In [2]:
def getRandomPitch(min,max):
pitch = random.uniform(min,max)
if pitch < -25:
position=2
elif pitch < -15:
position=1
else:
position=0
return pitch, position
The "sendSensorReadings" method creates mock "actual" readings. A random reading is generated and a JSON packet is created and sent to Kafka using the "LumbarSensorReadings" topic.
In [3]:
def sendSensorReadings(iterations):
for i in range(iterations):
readingID = str(uuid.uuid4())
pitch, position = getRandomPitch(-50,25)
if pitch <= -15:
if pitch <= -25:
position = 2.0
elif pitch > -25:
if pitch <= -15:
position = 1.0
packet = {
"readingID":readingID,
"deviceID":"5d681c54e66ff4a5654e55c6d5a5b54",
"readingTime":datetime.datetime.now().isoformat(),
"metricTypeID":6,
"uomID":4,
"actual":{"y":-30,"p":pitch,"r":120},
"setPoints":{"y":25,"p":45,"r":100},
"prevAvg":{"y":15,"p":40,"r":88}
}
print(packet)
producer.send('LumbarSensorReadings', json.dumps(packet))
The "sendSensorTrainingReadings" method creates mock Training readings that are subesquently used in the machine learning model generation. A random reading is generated with a known posture and a JSON packet is created and sent to Kafka using the "LumbarSensorTrainingReadings" topic.
In [4]:
def sendSensorTrainingReadings(iterations):
for i in range(iterations):
pitch, position = getRandomPitch(-50,25)
if pitch <= -15:
if pitch <= -25:
position = 2.0
elif pitch > -25:
if pitch <= -15:
position = 1.0
packet = {
"deviceID":"5d681c54e66ff4a5654e55c6d5a5b54",
"positionID":position,
"readingTime":"2016-07-25T15:45:07.12",
"metricTypeID":6,
"uomID":4,
"actual":{"y":18,"p":pitch,"r":120},
"setPoints":{"y":25,"p":45,"r":100},
"prevAvg":{"y":15,"p":40,"r":88}
}
producer.send('LumbarSensorTrainingReadings', json.dumps(packet))
The main method calls the methods with a defined number of iterations.
In [ ]:
def main():
sendSensorReadings(50)
sendSensorTrainingReadings(50)
if __name__ == "__main__":
main()