Muchos accidentes en carreteras son causados directa o indirectamente por vehículos pesados conducidos con sobrepeso. Estos causan daños en el pavimento y también sufren más efectos dinámicos durante las curvas.
Para inhibir el exceso de peso de estos vehículos es necesario fiscalizar estas infracciones y, cuando necesario, aplicar las medidas establecidas por ley, como multas y aprehensiones. Un método que está siendo investigado en muchas partes del mundo es el pesaje en movimiento. Este método tiene como ventajas la economía en espacio físico y operación, ya que sus sensores son instalados en la propia carretera y no implica en atrasos en el viaje de los usuarios de la vía, pues puede pesar los vehículos pesados transitando en la velocidad directriz de la vía.
En este trabajo serán presentados tecnologías útiles para desarrollar un sistema computacional para apoyo al pesaje de vehículos en movimiento. La experiencia para desarrollar este trabajo fue obtenida a través del proyecto desarrollado en el laboratorio de transportes (LabTrans) de la Universidade Federal de Santa Catarina (UFSC). El objetivo de este trabajo es servir como base inicial para futuros investigadores del tema.
El lenguaje utilizado aquí será el Python y las librerías principales utilizadas serán: numpy, scipy, pandas, sqlalchemy, statsmodels, numba, scikit-learn, pydaqmx, bokeh.
Un sistema computacional de pesaje de vehículos en movimiento está compuesto, básicamente, de:
El sistema debe ser rápido y robusto para procesar todas estas informaciones en el menor tiempo posible. Python no es un lenguaje reconocido por tener un alto desempeño, por eso, es necesario utilizar librerías y métodos para potenciar su capacidad de procesamiento.
Con base en los resultados del pesaje, clasificación y reconocimiento de la matrícula vehicular es posible saber si el vehículo cometió alguna infracción y, en caso positivo, es posible vincular la infracción a la identificación del vehículo infractor.
In [1]:
from IPython.display import display
from matplotlib import pyplot as plt
from scipy import signal
from scipy import constants
from scipy.signal import argrelextrema
from collections import defaultdict
from sklearn import metrics
import statsmodels.api as sm
import numpy as np
import pandas as pd
import numba as nb
import sqlalchemy
import os
import sys
import datetime
import peakutils
# local
sys.path.insert(
0, os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
)
from pywim.utils.dsp.synthetic_data.sensor_data import gen_truck_raw_data
from pywim.estimation.vehicular_classification import dww
%matplotlib inline
In [2]:
def plot_signals(df: pd.DataFrame, ax=None):
kwargs = {}
if ax is not None:
kwargs['ax'] = ax
df.plot(**kwargs)
plt.title("Datos de los sensores")
plt.xlabel('Segundos (s)')
plt.ylabel('Tensión (V)')
plt.grid(True)
if ax is None:
plt.show()
La adquisición de datos es hecha por una placa de adquisición de datos que está conectada con el sensor y la computadora que almacenará estos datos.
Para eso, por ejemplo, puede ser hecha por medio de placas de adquisición DAQmx de la empresa National Instruments (NI). Para comunicarse con estas puede ser utilizada la librería PyDAQmx, un wrap hecho en Python para los controladores del hardware fornecidos por la empresa. Esta librería es una interfaz completa para los controladores NIDAQmx ANSI C e importa todas las funciones del controlador e importa todas las constantes predefinidas. Como resultado, la librería retorna un objeto numpy.array.
Después de adquirir la señal de los sensores, los datos pueden ser almacenados en un buffer circular en memoria que, dentro un proceso paralelo, busca una señal completa de un vehículo (segmento). El segmento de datos puede ser hecho por medio de un bucle inductivo.
Para iniciar la adquisición de datos es necesario definir parámetros como el tipo de adquisición (ej. CreateAIVoltageChan), la cantidad de canales, especificación de los canales acesados, el mostreo total por lectura, tasa de mostreo, modo de agrupación de los datos (ej. DAQmx_Val_GroupByChannel)
In [3]:
df = pd.DataFrame()
sample_rate = 2000
total_seconds = 3.0
# analog channel
df = gen_truck_raw_data(
sample_rate=sample_rate,
speed=15,
vehicle_layout='-o--o---o--',
sensors_distance=[1, 1],
p_signal_noise=10
)
plot_signals(df)
Después de segmentados, los datos brutos son almacenados en la base de datos. Eso posibilita cambiar los métodos de cálculos o parámetros de calibración, posibilitando analizar los métodos utilizados.
Una tecnología que puede ser muy útil para el almacenamiento de datos brutos es HDF5. Así es posible definir un patrón para el almacenamiento de los datos y se mantiene integridad de estos datos (https://support.hdfgroup.org/HDF5/).
Para la realización de los cálculos, la señal necesita ser tratada y, para eso, es necesario aplicar un filtrado de señal y corrección de baseline. Para la aplicación del filtrado, en el ejemplo, será utilizado la recomendación de (KistlerInstrumente, 2004), la fabricante de los sensores Lineas: filtrado del tipo pasa baja de orden 1, a 600 Hz.
Para hacer la corrección de baseline pode ser utilizado el método que sea más apropiado para las características eléctricas de la señal del sensor. En la librería nmrglue (Helmus and Jaroniec, 2013) tiene el módulo proc_bl que contiene muchas funciones que pueden ayudar a hacer la corrección de baseline. En el ejemplo abajo, la corrección será hecha sustrayendo de la señal el valor mínimo encontrado en los primeros 100 puntos de la señal.
In [4]:
df_filt = df.copy()
for s in df_filt.keys():
df_filt[s] -= df_filt[s][:100].min()
# ploteo
plot_signals(df_filt)
El filtro utilizado será de tipo basa baja, de orden 1, con la frecuencia de corte de 600Hz. Para eso, fue utilizado los métodos filtfilt y butterworth de la librería scipy.
In [5]:
order = 1
freq = 600 # Mz
lower_cut = freq/sample_rate
b, a = signal.butter(order, lower_cut)
for k in df_filt.keys():
df_filt[k] = signal.filtfilt(b, a, df_filt[k])
# ploteo
plot_signals(df_filt)
Para la detección de picos, puede ser utilizada la librería peakutils (https://pypi.python.org/pypi/PeakUtils)
In [6]:
peaks = {}
for k in df_filt.keys():
index = peakutils.indexes(df_filt[k].values)
peaks[k] = index
In [7]:
# ploteo
ax = plt.figure().gca()
plot_signals(df_filt, ax=ax)
for k in df_filt.keys():
ax.plot(df_filt.index[peaks[k]], df_filt[k].iloc[peaks[k]], 'ro')
plt.show()
Para el recorte de la curva para el cálculo de peso para los sensores, usando como base los sensores Lineas de Kistler, puede ser utilizado el concepto descrito en (Kistler Instrumente, 2004). La figura abajo (Kistler Instrumente, 2004) ilustra cómo debe ser hecho el recorte.
Para hacerlo con los datos de ejemplo, puede ser adoptado un threshold de 0,2 y un $\Delta{t}$ de 20. Para facilitar el ejemplo, el corte será hecho desde 400 puntos antes del pico hasta 400 puntos después del pico.
In [8]:
sensor_curve = defaultdict(dict)
for k in df_filt.keys():
# k => sensor
ax = plt.figure().gca()
for i, peak in enumerate(peaks[k]):
# i => axle
sensor_curve[k][i] = df_filt[[k]].iloc[peak-25:peak+25]
plot_signals(sensor_curve[k][i], ax=ax)
ax.set_xlim([0, 1])
plt.show()
A partir de las informaciones de los picos de la señal y su curva, es posible empezar los cálculos para determinar la distancia entre ejes, velocidad y peso. A continuación, serán presentados estos cálculos utilizando los datos de ejemplo generados en las secciones anteriores.
Para calcular la velocidad es necesario, primeramente, saber la distancia entre los sensores. Para este ejemplo, será adoptada la distancia de 1 metro. La velocidad se da a través de la fórmula: $v = \frac{\Delta{s}}{\Delta{t}}$
In [9]:
distance_sensors = 1 # metro
vehicle_speed = defaultdict(list)
speed_index = []
for i in range(1, df_filt.shape[1]):
# i => sensor
for j in range(len(peaks['a%s' % i])):
# j => axis
time_points = peaks['a%s' % i][j]-peaks['a%s' % (i-1)][j]
d_time = time_points*(1/sample_rate)
vehicle_speed['axle_%s' % j].append(distance_sensors/d_time) # m/s
speed_index.append('speed_sensor_%s_%s' % (i-1, i))
df_speed = pd.DataFrame(
vehicle_speed, index=speed_index
)
vehicle_speed_mean = df_speed.mean().mean()
display(df_speed*3.6) # km
print('Velocidad media:', vehicle_speed_mean * 3.6, 'km/h') # km/h
Para calcular la distancia entre ejes es necesario haber calculado la velocidad. La fórmula para el cálculo de la distancia entre ejes es: $\Delta{s} = v*\Delta{t}$. En este ejemplo será utilizada la velocidad media, pero también podría ser utilizada la velocidad encontrada por eje.
In [10]:
axles_distance = defaultdict(dict)
for i in range(df_filt.shape[1]):
# i => sensor
for j in range(1, len(peaks['a%s' % i])):
iid = 'a%s' % i
time_points = peaks[iid][j]-peaks[iid][j-1]
d_time = time_points*(1/sample_rate)
axles_distance[iid]['axle%s-axle%s' % (j-1, j)] = (
d_time*vehicle_speed_mean
)
df_distance_axles = pd.DataFrame(axles_distance)
print(df_distance_axles)
Otra información necesaria para la realización de los cálculos de pesaje es el área bajo la curva identificada. Para realizar este cálculo es necesario hacer la integral de curva o, en este caso, la suma de los puntos de la curva.
In [11]:
df_area = pd.DataFrame()
time_interval = 1/sample_rate
print('intervalo de tiempo:', time_interval)
for s in sensor_curve:
area = {}
for axle, v in sensor_curve[s].items():
# sumatorio con corrección de baseline
result = float((v-v.min()).sum()*time_interval)
area.update({'axle_%s' % axle: result})
df_area[s] = pd.Series(area)
df_area = df_area.T
print(df_area)
Para calcular el peso del vehículo serán utilizadas las informaciones de velocidad, la curva de cada eje, . Para los sensores Lineas de Kistler, debe ser seguida la siguiente formula (KistlerInstrumente, 2004): $W = ( V / L_s ) * A * C$, donde W es la variable de peso, V es la velocidad, $L_s$ es el ancho del sensor, A es la integral de la curva y C es una constante de calibración. Para otros tipos de sensores, la fórmula es similar. Para sensores del tipo piezoeléctrico polímero y cerámicos es necesario considerar un método para corrección de los resultados debido a la sensibilidad a la temperatura (Burnos, 2008), (Gajda et al., 2012). Para los datos de ejemplo, serán calculados los pesos sobre los ejes y el peso bruto total utilizando como parámetro: el ancho del sensor con el valor de 0.53 metros y la constante de calibración igual a 1 para todos los sensores.
In [12]:
amp_sensibility = 0.15*10**-3 # 1.8 pC/N*5V/60000pC
sensors_number = df.shape[1]
C = pd.Series([1]*sensors_number)
Ls = pd.Series([0.53]*sensors_number)
V = df_speed.reset_index(drop=True)
A = df_area.reset_index(drop=True)
W = pd.DataFrame()
for i, axle in enumerate(V.keys()):
W[axle] = ((V[axle]/Ls)*A[axle]*C)/amp_sensibility/constants.g
print(W)
print('\nPromedio por eje:')
print(W.mean())
print('\nPeso Bruto Total:', W.mean().sum(), 'kg')
Aquí será presentado un método para clasificación vehicular basado en los trabajos de (vanBoxel and vanLieshout, 2003) y (Oommen and Loke, 1997)
En este método, es utilizado un conjunto de layouts de referencias, definido por un conjunto de símbolos, que representa el diseño del vehículo, como puede ser visto en la figura abajo (vanBoxel and vanLieshout, 2003).
Para clasificar el vehículo, el sistema crea un layout para el vehículo medido, lo compara con layouts de referencias y clasifica el vehículo que con el layout de referencia que resulta más próximo.
Este método presenta bajo desempeño en el lenguaje Python. Para solucionar esto, fue utilizada la librería numba, llegando a ser cerca de 100 veces más rápido. Fue necesária una adaptación en el algoritmo donde, ante de hacer las comparaciones, el layout del veículo y el layout de la clase de referencia son convertidos en números, así la función de comparación puede ser marcada para ser compilada en modo nopython. Cuanto más cerca de 0 más cerca el layout del vehículo está del layout de referencia.
In [13]:
layout_s = dww.layout((7, 2, 0.5, 2))
layout = dww.layout_to_int(layout_s)
layout_ref_s = '-O----O-O----O--'
layout_ref = dww.layout_to_int(layout_ref_s)
z = np.zeros((len(layout), len(layout_ref)), dtype=int)
%time resultado = dww.D(layout, layout_ref, z)
print('truck layout: ', layout_s)
print('truck layout reference:', layout_ref_s)
print(resultado)
La calibración periódica en sistemas de pesaje es muy importante para mantener a un bajo margen de errores los pesos calculados. Para apoyar esta etapa puede ser utilizado el método de regresión lineal por mínimos cuadrados (OLS) de la librería statsmodels que, por ejemplo, posibilita saber informaciones como el coeficiente de determinación (R²) de la regresión lineal realizada. La librería scikit-learn también puede ser usada en esta etapa con finalidad de apoyo en los análisis de los resultados. Para probar estas funcionalidades, serán utilizados dados de pesaje sintéticos con ruidos, para simular los errores de medición con 100 pasadas de dos camiones con peso conocido.
In [14]:
# datos sintéticos
df_weight = pd.DataFrame({
'a1': np.ones(200), 'a2': np.ones(200), 'target': np.ones(200)
})
df_weight.loc[:100, ['a1', 'a2']] = 8000
df_weight.loc[100:, ['a1', 'a2']] = 10000
df_weight['a1'] += np.random.random(200)*1000
df_weight['a2'] += np.random.random(200)*1000
df_weight.loc[:100, ['target']] = 8000
df_weight.loc[100:, ['target']] = 10000
r2 = {}
c = {}
for s in ['a1', 'a2']:
sm.add_constant(df_weight[s]) # Adds a constant term to the predictor
model = sm.OLS(df_weight['target'], df_weight[s])
predict = model.fit()
r2[s] = [predict._results.rsquared]
c[s] = predict.params[s]
In [15]:
# ploteo
x = df_weight['a1']
y = df_weight['target']
x_lim_max = df_weight['a1'].max()
x_lim_max *= 1.2
x_lim_min = df_weight['a1'].min()
x_lim_min *= 0.8
line_base = np.linspace(x_lim_min, x_lim_max, x_lim_max)
for i, s in enumerate(['a1', 'a2']):
f = plt.figure()
plt.title('Valores de pesaje sensor %s' % s)
plt.xlabel('Valor calculado')
plt.ylabel('Valor Target')
plt.plot(df_weight[s], df_weight['target'], 'ro')
plt.plot(line_base, line_base*c[s])
f.show()
print('R2', r2)
print('CC', c)
In [16]:
def score_95_calc(metric_score, y, y_pred):
if y.shape[0] < 1:
print('size calc 0')
return 0.0
y_true = np.array([True] * y.shape[0])
lb = y - y * 0.05
ub = y + y * 0.05
y_pred_95 = (lb < y_pred) == (y_pred < ub)
y_pred_95 = y_pred_95 == True
return metric_score(y_true, y_pred_95)
def score_95_base(metric_score, estimator, X_test, y_test):
if y_test.shape[0] < 1:
print('size base 0')
return 0.0
y_pred = estimator.predict(X_test)
return score_95_calc(metric_score, y_test, y_pred)
def score_95_accuracy(estimator, X, y):
return score_95_base(metrics.accuracy_score, estimator, X, y)
def score_95_precision(estimator, X, y):
return score_95_base(metrics.precision_score, estimator, X, y)
def score_95_recall(estimator, X, y):
return score_95_base(metrics.recall_score, estimator, X, y)
def score_95_f1_score(estimator, X, y):
return score_95_base(metrics.f1_score, estimator, X, y)
In [17]:
df_weight_cc = df_weight[['a1', 'a2']].copy()
for s in ['a1', 'a2']:
df_weight_cc[s] *= c[s]
df_gross_weight = df_weight_cc.mean(axis=1)
for _m_name, _metric in [
('accuracy', metrics.accuracy_score),
('precision', metrics.precision_score),
('recall', metrics.recall_score),
('f1 score', metrics.f1_score),
]:
print(
('%s:' % _m_name).ljust(22, ' '),
score_95_calc(_metric, df_weight['target'], df_gross_weight)
)
El reconocimiento de matrículas vehiculares puede ser realizado a través de una cámara ALPR. También es posible hacer este procesamiento separado, sin utilizar la cámara ALPR. Un material muy interesantes sobre el tema es el trabajo (Sajjad, 2010) que provee informaciones y ejemplos de cómo hacer el reconocimiento de matrículas vehiculares utilizando el lenguaje Python junto con la librería OpenCV. La matrícula vehicular es muy importante para los sistemas de pesaje de vehículos pesados pues con esta información es posible penalizar los infractores, principalmente los que huyen después de recibir el aviso de detención en las estaciones de control de pesaje.
Este es un trabajo inicial con propósito educativo de cómo crear un sistema computacional de apoyo al pesaje de vehículos pesados en movimiento. Los próximos pasos para evolucionar este trabajo es ampliar los temas de:
2017-02-02: Atualización del contenido para la nueva estructura del proyecto PyWIM.
[^](#ref-1) [^](#ref-4) [^](#ref-5) [^](#ref-6) Kistler Instrumente, AG. 2004. Installation Instructions: Lineas\textregistered Sensors for Weigh-in-Motion Type 9195E.
[^](#ref-2) Helmus, Jonathan J and Jaroniec, Christopher P. 2013. Nmrglue: an open source Python package for the analysis of multidimensional NMR data.
[^](#ref-3) Billauer, Eli. 2008. peakdet: Peak detection using MATLAB.
[^](#ref-7) Burnos, Piotr. 2008. Auto-calibration and temperature correction of WIM systems.
[^](#ref-8) Gajda, Janusz and Sroka, Ryszard and Stencel, Marek and Zeglen, Tadeusz and Piwowar, Piotr and Burnos, Piotr. 2012. Analysis of the temperature influences on the metrological properties of polymer piezoelectric load sensors applied in Weigh-in-Motion systems.
[^](#ref-9) [^](#ref-11) van Boxel, DW and van Lieshout, RA. 2003. Optimization Vehicle Classification.
[^](#ref-10) Oommen, B John and Loke, Richard KS. 1997. Pattern recognition of strings with substitutions, insertions, deletions and generalized transpositions.
[^](#ref-12) Sajjad, K.M.. 2010. Automatic License Plate Recognition using Python and OpenCV.