In [1]:
%matplotlib notebook
from IPython import display
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time
import datetime
from collections import deque
import serial
import struct
import threading
NUMCHANNELS = 1
WINDOWSIZE = 1000
In [2]:
class EMG_Recording:
def __init__(self):
self.start_datetime = None
self.data = []
rec = EMG_Recording()
window = None
ser = None
def start_emg():
global ser
if ser is None:
ser = serial.Serial('COM3', 57600)
time.sleep(3) # because arduino auto-resets on serial connection
ser.write(b'start')
def read_packet():
global ser
fmt = '<' + ('L') + ('H'*NUMCHANNELS)
try:
while True:
if struct.unpack('<B',ser.read(1))[0] is 255:
break
return struct.unpack(fmt,ser.read(struct.calcsize(fmt)))
except:
return None
def _start_data_capture():
global rec, ser, window
rec.start_datetime = datetime.datetime.now()
window = deque(maxlen=WINDOWSIZE)
for i in range(WINDOWSIZE):
window.append((0,0))
while ser is not None:
sample = read_packet()
if sample is not None:
window.append(sample)
rec.data.append(sample)
def start_data_capture():
thread = threading.Thread(target = _start_data_capture)
thread.start()
def stop_emg():
global ser
ser.write(b'stop')
ser.close()
ser = None
In [3]:
start_emg()
start_data_capture()
In [4]:
def window_data():
return [i[1] for i in list(window)]
xdata = range(WINDOWSIZE)
ydata = window_data()
fig, ax = plt.subplots()
line, = ax.plot(xdata, ydata, lw=0.5)
ax.set_ylim(0, 700)
ax.grid()
def run(i):
ydata = window_data()
line.set_ydata(ydata)
return line,
ani = animation.FuncAnimation(fig, run, blit=True, interval=10, repeat=False)
plt.show()
In [5]:
stop_emg()