In [2]:
%load_ext coconut
# Coconut introduces nice functional macros and better map, filter, reduce, etc.
import PyCmdMessenger
from toolz.curried import *
op = operator
from copy import copy, deepcopy
import sys
from typing import *
In [6]:
# Initialize an ArduinoBoard instance. This is where you specify baud rate and
# serial timeout. If you are using a non ATmega328 board, you might also need
# to set the data sizes (bytes for integers, longs, floats, and doubles).
arduino = PyCmdMessenger.ArduinoBoard("COM10",baud_rate=115200, enable_dtr=False)
# List of command names (and formats for their associated arguments). These must
# be in the same order as in the sketch.
COMMANDS = [["kMotorOn","ssi"],
["kMotorStayOn", "ss"],
["kMotorOff","s"],
["kStatus","s*"],
["kAck","s*"],
["kError","s*"],
["kLogging", "s*"]]
# Initialize the messenger
c = PyCmdMessenger.CmdMessenger(arduino,COMMANDS)
In [22]:
from typing import *
from toolz.sandbox.core import unzip
from toolz.curried import *
cmd = PyCmdMessenger
def listen(Messenger: cmd.PyCmdMessenger.CmdMessenger, messageIdentifier: Text, *rest, arg_format: Text = None, tries: int = 250) -> Any:
""" Listens for a specific type of response message"""
try:
assert any([messageIdentifier in command for command in Messenger.commands])
pass
except:
raise ValueError("Message identifier must be a valid command identifier for the Messenger")
while True:
if arg_format is not None:
message = Messenger.receive(arg_formats=arg_format)
else:
message = Messenger.receive()
if type(message) in [list, tuple] and message is not None:
if message[0] == messageIdentifier:
return message
else:
continue
# def getLogs(Messenger: cmd.PyCmdMessenger.CmdMessenger) -> Text:
# """Yields the logs from the CmdMessenger"""
# while True:
# yield from listen(Messenger, "kLogging")
def sendCommand(Messenger: cmd.PyCmdMessenger, messageIdentifier: Text, *args) -> Any:
"""Sends a command and returns the response"""
Messenger.send(messageIdentifier, *args)
if messageIdentifier in [command for command in list(unzip(COMMANDS)[0])]:
response = listen(Messenger, "kAck", "s*")
else:
response = listen(Messenger, "kError", "s*")
return response
In [11]:
# Send
# c.send("MotorOn","A", "F", 9999)
c.send("kStatus")
# Receive. Should give ["my_name_is",["Bob"],TIME_RECIEVED]
msg = c.receive(arg_formats="s*")
print(msg)
In [12]:
c.send("kMotorStayOn", "A", "F")
In [13]:
c.send("kMotorOff","A")
In [25]:
msg = sendCommand(c, "kMotorStayOn", "A", "F")
In [34]:
dir(arduino)
Out[34]:
In [36]:
type(arduino)
Out[36]:
In [37]:
arduino.close()
In [ ]: