In [1]:
from pkg_resources import resource_filename
import os
import warnings
import numpy as np
from cts_core.camera import Camera
from digicampipe.utils import geometry
from digicampipe.io.event_stream import event_stream, add_slow_data
example_file_path = resource_filename(
'digicampipe',
os.path.join(
'tests',
'resources',
'example_100_evts.000.fits.fz'
)
)
digicam_config_file = resource_filename(
'digicampipe',
os.path.join(
'tests',
'resources',
'camera_config.cfg'
)
)
aux_basepath = resource_filename('digicampipe', 'tests/resources/')
digicam = Camera(_config_file=digicam_config_file)
digicam_geometry = geometry.generate_geometry_from_camera(camera=digicam)
data_stream = event_stream(
file_list=[example_file_path],
camera_geometry=digicam_geometry,
camera=digicam,
max_events=100
)
data_stream = add_slow_data(data_stream, basepath=aux_basepath)
for event in data_stream:
pass
In [2]:
event
Out[2]:
In [3]:
# I do not show this, it is VERY long and ugly
# event.slow_data
In [4]:
# slow_data return a `collections.namedtuple` at the moment
# namedtuples tell you their fields with: `._fields` (yes, it should be keys(), but I did not write it.)
event.slow_data._fields
Out[4]:
In [5]:
# each field of `slow_control` at the moment is also a namedtuple namedtuple
# I do not show this, it is long and ugly
# event.slow_data.DigicamSlowControl
In [6]:
event.slow_data.DigicamSlowControl._fields
Out[6]:
In [7]:
event.slow_data.DigicamSlowControl.trigger_status
Out[7]:
In [8]:
# I do not show this, it is long and ugly
# event.slow_data.DriveSystem
In [9]:
event.slow_data.DriveSystem._fields
Out[9]:
In [10]:
event.slow_data.DriveSystem.current_time
Out[10]:
In [11]:
event.slow_data.DriveSystem.current_position_az
Out[11]:
In [12]:
event.slow_data.DriveSystem.current_position_el
Out[12]:
In [13]:
event.slow_data.SafetyPLC.SPLC_CAM_Status
Out[13]:
In [14]:
from enum import Enum, IntEnum, auto
class LidStatus(Enum):
# TODO: check the Enum docu, if this repetition of auto() can be avoided...
Undefined = auto()
Closed = auto()
D2_starting = auto()
D1_opening = auto()
D1_opened = auto()
D2_opening = auto()
Opened = auto()
D1_starting = auto()
D2_closing = auto()
D2_closed = auto()
D1_closing = auto()
OVC_error = auto()
OVC_error_reseting = auto()
Timeout_error = auto()
@classmethod
def from_CAM_Status(cls, status):
# LidStatus is encoded in the 4 least significant bits of
# the SafetyPLC.SPLC_CAM_Status
return cls(status & 0xF)
In [15]:
LidStatus.from_CAM_Status(event.slow_data.SafetyPLC.SPLC_CAM_Status)
Out[15]:
So inside a "processor" which wants to skip all events where the Lid is not Opened the code would read like this:
In [16]:
for event in data_stream:
lid_status = LidStatus.from_CAM_Status(event.slow_data.SafetyPLC.SPLC_CAM_Status)
if lid_status is not LidStatus.Opened:
continue
else:
# Lid is open, so we can analyze
pass