.. _battery-chargers:
Many modern "smart" battery chargers have data logging capabilities that you can use with Pipecat to view the state of a battery during charging. Typically, these chargers connect to your computer via a serial port or a serial-over-USB cable that acts like a traditional serial port. Data is then sent to the port during charging. For example, data from an iCharger 208B connected to a Mac computer using a USB cable could be read using the following:
In [1]:
# nbconvert: hide
from __future__ import absolute_import, division, print_function
import sys
sys.path.append("../features/steps")
import test
serial = test.mock_module("serial")
serial.serial_for_url.side_effect = test.read_file("../data/icharger208b-charging", stop=3)
In [2]:
import serial
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
for line in port:
print(line)
Here, we used the pySerial (http://pyserial.readthedocs.io) library to open a serial port and read data from the charger, which sends data to the port one line at a time. Note that the device name for the port - "/dev/cu.SLAB_USBtoUART" in this case - will vary depending on your device and operating system. For example, you might use "COM1" or "COM2" on Windows, or "/dev/ttyS0" on Linux.
Our first step in using Pipecat to decipher the raw device output is to turn the for-loop from the previous example into a pipe:
In [3]:
import pipecat.utility
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
for record in pipe:
print(record)
In this case, :func:pipecat.utility.readline
converts the raw data into :ref:records
that store each line of data for further processing. To decode the contents of each line, we add the appropriate Pipecat device to the end of the pipe:
In [4]:
import pipecat.device.charger
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
print(record)
As you can see, :func:pipecat.device.charger.icharger208b
has converted the raw data records into charger-specific records containing human-readable fields whose values have appropriate physical units. Let's use :func:pipecat.record.dump
to make the output a little more readable:
In [5]:
import pipecat.record
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
pipecat.record.dump(record)
Now, you can extract just the data you care about from a record:
In [6]:
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
print(record[("battery", "voltage")], record[("battery", "current")])
And you can convert units safely and explicitly:
In [7]:
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
print(record[("battery", "current")].to(pipecat.units.amps))
As an aside, you may be wondering at this point why it's necessary to explicitly create the serial port and connect it to readline
... why not code that functionality directly into icharger208b
? The answer is flexibility: by separating Pipecat's functionality into discrete, well-defined components, those components can be easily combined in new and unexpected ways. For example, you could use icharger208b
with a charger that communicated over a network socket instead of a serial port. Or you could "replay" data from a charger stored in a file:
In [8]:
fobj = open("../data/icharger208b-charging-sample")
pipe = pipecat.utility.readline(fobj)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
print(record[("battery", "charge")])
Let's explore other things we can do with our pipe. To begin, you might want to add additional metadata to the records returned from a device. For example, you might want to append timestamps:
In [9]:
# nbconvert: hide
serial.serial_for_url.side_effect = test.read_file("../data/icharger208b-charging", rate=pipecat.quantity(2, pipecat.units.seconds), stop=3)
In [10]:
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
for record in pipe:
pipecat.record.dump(record)
Note that :func:pipecat.utility.add_timestamp
has added a timestamp
field to each record. Timestamps in Pipecat are always recorded using UTC (universal) time, so you will likely want to convert them to your local timezone before formatting them for display:
In [11]:
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
for record in pipe:
print(
record["timestamp"].to("local").format("YYYY-MM-DD hh:mm:ss a"),
record[("battery", "voltage")],
)
You could also use :func:pipecat.utility.add_field
to append your own custom field to every record that passes through the pipe.
Now let's consider calculating some simple statistics, such as the minimum and maximum battery voltage while charging. When we iterate over the contents of a pipe using a for
loop, we receive one record at-a-time until the pipe is empty. We could keep track of a "running" minimum and maximum during iteration, and there are use-cases where that is the best way to solve the problem. However, for moderately-sized data, Pipecat provides a more convenient approach:
In [12]:
# nbconvert: hide
serial.serial_for_url.side_effect = test.read_file("../data/icharger208b-charging")
In [13]:
import pipecat.store
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.store.cache(pipe)
for record in pipe:
pass
print(len(pipe.table))
print(pipe.table[("battery", "voltage")])
Here, :func:pipecat.store.cache
creates an in-memory cache that stores every record it receives. We have a do-nothing for
loop that reads data from the charger to populate the cache. Once that's complete, we can use the cache table
attribute to retrieve data from the cache using the same keys and syntax we would use with a record. Unlike a record, the cache returns every value for a given key at once (using a Numpy array), which makes it easy to compute the statistics we're interested in:
In [14]:
import pipecat.store
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.store.cache(pipe)
for record in pipe:
pass
print("Min:", pipe.table[("battery", "voltage")].min())
print("Max:", pipe.table[("battery", "voltage")].max())
Consolidating fields using the cache is also perfect for generating plots with a library like Toyplot (http://toyplot.readthedocs.io):
In [15]:
import toyplot
canvas = toyplot.Canvas()
axes = canvas.cartesian(grid=(3, 1, 0), label="Battery", ylabel="Voltage (V)")
axes.plot(pipe.table[("battery", "voltage")].to(pipecat.units.volt))
axes = canvas.cartesian(grid=(3, 1, 1), ylabel="Current (A)")
axes.plot(pipe.table[("battery", "current")].to(pipecat.units.amp))
axes = canvas.cartesian(grid=(3, 1, 2), ylabel="Charge (mAH)")
axes.plot(pipe.table[("battery", "charge")].to(pipecat.units.milliamp * pipecat.units.hour));
Note that nothing prevents us from doing useful work in the for
loop that populates the cache, and nothing prevents us from accessing the cache within the loop. For example, we might want to display field values from individual records alongside a running average computed from the cache. Or we might want to update our plot periodically as the loop progresses.
Moving on, you will likely want to store records to disk for later access. Pipecat provides components to make this easy too. First, you can add :func:pipecat.store.pickle.write
to the end of a pipe, to write records to disk using Python's pickle format:
In [16]:
# nbconvert: hide
import os
serial.serial_for_url.side_effect = test.read_file("../data/icharger208b-charging", rate=pipecat.quantity(2, pipecat.units.seconds), stop=3)
if os.path.exists("charger.pickle"):
os.remove("charger.pickle")
In [17]:
import pipecat.store.pickle
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.store.pickle.write(pipe, "charger.pickle")
for record in pipe:
pass
In [18]:
pipe = pipecat.store.pickle.read("charger.pickle")
pipe = pipecat.store.cache(pipe)
for record in pipe:
pipecat.record.dump(record)
print("Average:", pipe.table[("battery", "voltage")].mean())
This is another example of the interchangeability of the Pipecat components: the pickle writer is a record consumer, and the pickle reader is a record generator. In essence, we "broke" our previous pipe into two separate pipes that communicate via the filesystem. While we won't go into detail here, a similar approach could be used to communicate between threads using a message queue or between processes over a socket.
In [19]:
# nbconvert: hide
serial.serial_for_url.side_effect = test.read_file("../data/icharger208b-charging", rate=pipecat.quantity(2, pipecat.units.seconds), stop=10, block=True)
There is one final issue that we've ignored so far: when to stop logging data. The :func:pipecat.utility.readline()
function will read data from the serial port as long as the port is open, blocking indefinitely if no data is arriving. That means that for all the preceeding examples the for
loop will never end unless the serial port is closed (i.e. the external device is turned-off or unplugged), or the code is interrupted using Control-C. While that's fine for prototyping at the command line, we need to have a way to stop collecting data and shutdown cleanly if we're going to automate data logging processes. Fortunately, Pipecat provides several easy-to-use functions to do just that:
In [20]:
import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.count(pipe, count=2)
for record in pipe:
pipecat.record.dump(record)
Here, :func:pipecat.limit.count
ends the loop when count
records have been received. This is often handy during development to limit the amount of data consumed from a device that produces output continuously. However, this approach is no good for devices like our charger that will produce a finite, indeterminate number of records - if the device stops sending records before the count
has been reached, the loop will still block. Instead, we could use :func:pipecat.limit.duration
to limit the total amount of time the loop is allowed to run instead:
In [21]:
import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.duration(pipe, duration=pipecat.quantity(4, pipecat.units.seconds))
for record in pipe:
pipecat.record.dump(record)
This approach is an improvement because it puts an upper-bound on the amount of time the loop will run, whether the device has stopped sending records or not. However, it's still error-prone, since we don't know in advance how long charging will take - if we set the duration too low, it may stop the loop before charging is complete. If we set the duration too high, we will capture all the records we want, but we will likely waste time waiting for records that will never come. Ideally, we would like to exit the loop as soon as the charger tells us it's finished. Fortunately, the charger provides a field - charger/mode
that can do just that:
In [22]:
# nbconvert: hide
serial.serial_for_url.side_effect = test.read_file("../data/icharger208b-charging", rate=pipecat.quantity(2, pipecat.units.seconds), start=1285, block=True)
In [23]:
import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.until(pipe, key=("charger", "mode"), value="finished")
for record in pipe:
pipecat.record.dump(record)
:func:pipecat.limit.until
terminates the loop as soon as it receives a record with the given key and value. This approach finally gets us our desired behavior (loop ends as soon as the charger is finished), but it could use just a little more work to make it robust. For example, what happens if the charger stops sending data before the mode changes? We could combine :func:pipecat.limit.until
with :func:pipecat.limit.duration
, but that would still suffer from the terminate-too-soon / waste-too-much-time problem. Fortunately, we know from testing that our charger sends records every two seconds (if at all), and Pipecat provides :func:pipecat.limit.timeout
, which can terminate the loop if it doesn't receive a record within a specified time interval:
In [24]:
import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.until(pipe, key=("charger", "mode"), value="finished")
pipe = pipecat.limit.timeout(pipe, timeout=pipecat.quantity(5, pipecat.units.seconds))
for record in pipe:
pipecat.record.dump(record)
This example still exits normally when the charger is finished, but it will also exit cleanly if a record isn't received within a five seconds. More importantly, you can see how easy it is to combine limit functions to control when the for
loop terminates.