In [ ]:
public interface ITableProducer {
String[] requestAttributes();
String[][] requestInstances();
}
In [ ]:
public interface ITableReceptacle {
public void connect(ITableProducer producer);
}
In [ ]:
public interface IUpdate {
public void update();
}
In [ ]:
public interface IUpdateReceptacle {
public void connect(IUpdate notify);
}
In [ ]:
public interface IConsole extends ITableReceptacle, IUpdate {
}
In [ ]:
public class ConsoleComponent implements IConsole {
private ITableProducer iProducer;
public void connect(ITableProducer producer) {
iProducer = producer;
}
public void update() {
if (iProducer != null) {
System.out.println("=== Attributes ===");
String attributes[] = iProducer.requestAttributes();
for (int a = 0; a < attributes.length-1; a++)
System.out.print(attributes[a] + ", ");
System.out.println(attributes[attributes.length-1]);
System.out.println();
System.out.println("=== Instances ===");
String instances[][] = iProducer.requestInstances();
for (int i = 0; i < instances.length; i++) {
for (int a = 0; a < attributes.length-1; a ++)
System.out.print(instances[i][a] + ", ");
System.out.println(instances[i][attributes.length-1]);
}
}
}
}
In [ ]:
import java.util.Date;
public class SensorReading {
private Date timestamp;
private String dimension;
private double value;
private String unity;
public SensorReading() {
/* nothing */
}
public SensorReading(Date timestamp, String dimension, double value, String unity) {
this.timestamp = timestamp;
this.dimension = dimension;
this.value = value;
this.unity = unity;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public String getDimension() {
return dimension;
}
public void setDimension(String dimension) {
this.dimension = dimension;
}
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
public String getUnity() {
return unity;
}
public void setUnity(String unity) {
this.unity = unity;
}
}
In [ ]:
public class Message {
private String source;
private String name;
private String type;
private SensorReading body;
public Message() {
/* nothing */
}
public Message(String source, String name, String type, SensorReading body) {
super();
this.source = source;
this.name = name;
this.type = type;
this.body = body;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public SensorReading getBody() {
return body;
}
public void setBody(SensorReading body) {
this.body = body;
}
}
In [ ]:
public interface IBusProperties {
public String getBusURI();
public void setBusURI(String dataSource);
public String getTopic();
public void setTopic(String topic);
}
In [ ]:
public interface IBusConsumerProperties extends IBusProperties {
public int getBlockSize();
public void setBlockSize(int blockSize);
public int getNumber();
public void setNumber(int number);
public int getVerbose();
public void setVerbose(int verbose);
}
In [ ]:
public interface IBusConsumer extends ITableProducer, IBusConsumerProperties, IUpdateReceptacle {
}
In [ ]:
%classpath add jar ../../../../src/lib/mqttv3-1.1.2.jar
%classpath add jar ../../../../src/lib/mysql-connector-java-5.1.17-bin.jar
%classpath add jar ../../../../src/lib/genson-1.6.jar
In [ ]:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.owlike.genson.Genson;
import com.owlike.genson.GensonBuilder;
public class BusConsumerComponent implements IBusConsumer, MqttCallback {
private String busURI = null;
private String topic = null;
private int blockSize = 5;
private int number = 10;
private int verbose = 2;
private String[] attributes = {"message", "timestamp", "dimension", "value", "unit"};
private ArrayList<String[]> instArray = new ArrayList<String[]>();
private IUpdate notify = null;
private int count = 0;
private int total = 0;
private Genson genson;
private MqttClient client;
private MqttConnectOptions connectionOptions;
private String clientID;
public BusConsumerComponent() {
/* nothing */
}
public String getBusURI() {
return busURI;
}
public void setBusURI(String busURI) {
this.busURI = busURI;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
readDS();
}
public int getBlockSize() {
return blockSize;
}
public void setBlockSize(int blockSize) {
this.blockSize = blockSize;
}
public int getNumber() {
return number;
}
public void setNumber(int number) {
this.number = number;
}
public int getVerbose() {
return verbose;
}
public void setVerbose(int verbose) {
this.verbose = verbose;
}
public void connect(IUpdate notify) {
this.notify = notify;
}
public String[] requestAttributes() {
return attributes;
}
public String[][] requestInstances() {
String instances[][] = instArray.toArray(new String[0][]);
instArray = new ArrayList<String[]>();
return instances;
}
private void readDS() {
genson = new GensonBuilder()
.useDateFormat(new SimpleDateFormat("yyyy-MM-dd"))
.useIndentation(true)
.useConstructorWithArguments(false)
.create();
clientID = UUID.randomUUID().toString();
connectionOptions = new MqttConnectOptions();
connectionOptions.setAutomaticReconnect(true);
connectionOptions.setCleanSession(true);
connectionOptions.setConnectionTimeout(20);
try {
client = new MqttClient(busURI, clientID);
client.connect(connectionOptions);
client.setCallback(this);
client.subscribe(topic);
System.out.println("[MQTTConsumer] connected");
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("[BusConsumerComponent] Connection lost. Reason: " + cause);
System.exit(1);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
if (total < number) {
Message mess = genson.deserialize(new String(message.getPayload()), Message.class);
SensorReading body = mess.getBody();
count++;
total++;
String reading[] = {Integer.toString(count),
Long.toString(body.getTimestamp().getTime()), body.getDimension(),
Double.toString(body.getValue()), body.getUnity()};
instArray.add(reading);
switch (verbose) {
case 1: System.out.println("message: " + count); break;
case 2: System.out.println("message: " + count);
System.out.println("topic: " + topic);
String json = genson.serialize(mess);
System.out.println(json);
break;
}
if (count == blockSize && notify != null) {
count = 0;
notify.update();
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
In [ ]:
try {
IBusConsumer bc = new BusConsumerComponent();
bc.setBusURI("tcp://localhost:1883");
bc.setTopic("sensor/+/+");
bc.setNumber(3);
bc.setVerbose(2);
} catch (Exception e) {
e.printStackTrace();
}
In [ ]:
try {
IBusConsumer bc = new BusConsumerComponent();
bc.setBusURI("tcp://localhost:1883");
bc.setTopic("sensor/+/+");
bc.setBlockSize(5);
bc.setNumber(2);
bc.setVerbose(2);
IConsole console = new ConsoleComponent();
console.connect(bc);
bc.connect(console);
} catch (Exception e) {
e.printStackTrace();
}
In [ ]:
public interface IProjectionProperties {
String[] getAttributes();
void setAttributes(String attribute[]);
}
In [ ]:
public interface IProjection extends IProjectionProperties, ITableProducer, ITableReceptacle {
}
In [ ]:
public class ProjectionComponent implements IProjection {
private ITableProducer provider;
private String attributes[] = null;
public String[] getAttributes() {
return attributes;
}
public void setAttributes(String attributes[]) {
this.attributes = attributes;
}
public void connect(ITableProducer provider) {
this.provider = provider;
}
public String[] requestAttributes() {
return attributes;
}
public String[][] requestInstances() {
String[][] instances = null;
if (provider != null) {
String[][] allInstances = provider.requestInstances();
if (allInstances != null && attributes != null) {
instances = new String[allInstances.length][];
// busca a posicao dos atributos selecionados na tabela original
String[] allAttributes = provider.requestAttributes();
int attrPos[] = new int[attributes.length];
for (int as = 0; as < attributes.length; as++) {
int aa;
for (aa = 0; aa < allAttributes.length &&
!attributes[as].equalsIgnoreCase(allAttributes[aa]); aa++)
/* nothing */;
if (aa < allAttributes.length)
attrPos[as] = aa;
else
attrPos[as] = -1;
}
// filtra atributos selecionados
for (int i = 0; i < allInstances.length; i++) {
instances[i] = new String[attributes.length];
for (int as = 0; as < attributes.length; as++)
if (attrPos[as] > -1)
instances[i][as] = allInstances[i][attrPos[as]];
}
}
}
return instances;
}
}
In [ ]:
try {
IBusConsumer bc = new BusConsumerComponent();
bc.setBusURI("tcp://localhost:1883");
bc.setTopic("sensor/+/+");
bc.setBlockSize(3);
bc.setNumber(6);
bc.setVerbose(1);
IProjection projection = new ProjectionComponent();
String[] attributes = {"dimension", "value"};
projection.setAttributes(attributes);
projection.connect(bc);
IConsole console = new ConsoleComponent();
console.connect(projection);
bc.connect(console);
} catch (Exception e) {
e.printStackTrace();
}
In [ ]:
public interface ISelectionProperties {
String getAttribute();
void setAttribute(String attributeA);
public String getOperator();
public void setOperator(String operator);
public String getValue();
public void setValue(String value);
boolean isNominalComparison();
void setNominalComparison(boolean nominalComparison);
}
In [ ]:
public interface ISelection extends ISelectionProperties, ITableProducer, ITableReceptacle {
}
In [ ]:
import java.util.ArrayList;
public class SelectionComponent implements ISelection {
private ITableProducer provider;
private String attribute = null,
operator = null,
value = null;
private boolean nominalComparison = true;
public String getAttribute() {
return attribute;
}
public void setAttribute(String attribute) {
this.attribute = attribute;
}
public String getOperator() {
return operator;
}
public void setOperator(String operator) {
this.operator = operator;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public boolean isNominalComparison() {
return nominalComparison;
}
public void setNominalComparison(boolean nominalComparison) {
this.nominalComparison = nominalComparison;
}
public void connect(ITableProducer provider) {
this.provider = provider;
}
public String[] requestAttributes() {
return (provider == null) ? null : provider.requestAttributes();
}
public String[][] requestInstances() {
ArrayList<String[]> instances = null;
if (provider != null) {
String[][] allInstances = provider.requestInstances();
if (allInstances != null) {
// busca a posicao dos atributos selecionados na tabela original
String[] allAttributes = provider.requestAttributes();
int atrPos;
for (atrPos = 0; atrPos < allAttributes.length &&
!attribute.equalsIgnoreCase(allAttributes[atrPos]); atrPos++)
/* nothing */;
if (atrPos < allAttributes.length) {
instances = new ArrayList<String[]>();
for (String[] ai: allInstances) {
boolean match = false;
if (nominalComparison) {
switch (operator.charAt(0)) {
case '=': if (ai[atrPos].equalsIgnoreCase(value))
match = true;
break;
case '<': if (ai[atrPos].compareTo(value) < 0)
match = true;
break;
case '>': if (ai[atrPos].compareTo(value) > 0)
match = true;
break;
case '!': if (!ai[atrPos].equalsIgnoreCase(value))
match = true;
break;
}
} else {
switch (operator.charAt(0)) {
case '=': if (Double.parseDouble(ai[atrPos]) == Double.parseDouble(value))
match = true;
break;
case '<': if (Double.parseDouble(ai[atrPos]) < Double.parseDouble(value))
match = true;
break;
case '>': if (Double.parseDouble(ai[atrPos]) > Double.parseDouble(value))
match = true;
break;
case '!': if (Double.parseDouble(ai[atrPos]) != Double.parseDouble(value))
match = true;
break;
}
}
if (match)
instances.add(ai);
}
}
}
}
return (instances == null) ? null : instances.toArray(new String[1][]);
}
}
In [ ]:
try {
IBusConsumer bc = new BusConsumerComponent();
bc.setBusURI("tcp://localhost:1883");
bc.setTopic("sensor/+/+");
bc.setBlockSize(3);
bc.setNumber(6);
bc.setVerbose(1);
ISelection selection = new SelectionComponent();
selection.connect(bc);
selection.setAttribute("value");
selection.setOperator(">");
selection.setValue("22");
selection.setNominalComparison(false);
IConsole console = new ConsoleComponent();
console.connect(selection);
bc.connect(console);
} catch (Exception e) {
e.printStackTrace();
}
In [ ]:
public interface IBusProducerProperties extends IBusProperties {
public int getVerbose();
public void setVerbose(int verbose);
}
In [ ]:
public interface IBusProducer extends ITableReceptacle, IBusProducerProperties, IUpdate {
}
In [ ]:
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.owlike.genson.Genson;
import com.owlike.genson.GensonBuilder;
public class BusProducerComponent implements IBusProducer, MqttCallback {
private String busURI = null;
private String topic = null;
private int verbose = 2;
private int count = 0;
private ITableProducer iProducer;
private Genson genson;
private MqttClient client;
private MqttConnectOptions connectionOptions;
private String clientID;
private final int qos = 1;
public BusProducerComponent() {
genson = new GensonBuilder()
.useDateFormat(new SimpleDateFormat("yyyy-MM-dd"))
.useIndentation(true)
.useConstructorWithArguments(false)
.create();
}
public String getBusURI() {
return busURI;
}
public void setBusURI(String busURI) {
this.busURI = busURI;
clientID = UUID.randomUUID().toString();
connectionOptions = new MqttConnectOptions();
connectionOptions.setAutomaticReconnect(true);
connectionOptions.setCleanSession(true);
connectionOptions.setConnectionTimeout(20);
try {
client = new MqttClient(busURI, clientID);
client.connect(connectionOptions);
client.setCallback(this);
System.out.println("[MQTTProducer] connected");
} catch (MqttException e) {
e.printStackTrace();
}
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getVerbose() {
return verbose;
}
public void setVerbose(int verbose) {
this.verbose = verbose;
}
public void connect(ITableProducer producer) {
iProducer = producer;
}
public void update() {
if (iProducer != null) {
String instances[][] = iProducer.requestInstances();
double av = 0;
for (int i = 0; i < instances.length; i++)
av += Double.parseDouble(instances[i][3]);
av = (av == 0) ? 0 : av / instances.length;
Message mess = new Message("statistics", "avg", "calculus",
new SensorReading(Calendar.getInstance().getTime(), "temperature", av, instances[0][4])
);
String json = genson.serialize(mess);
MqttMessage message = new MqttMessage(json.getBytes());
message.setQos(qos);
try {
client.publish(topic, message);
} catch (MqttException e) {
e.printStackTrace();
}
count++;
switch (verbose) {
case 1: System.out.println("message: " + count); break;
case 2: System.out.println("message: " + count);
System.out.println(json);
break;
}
}
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("[BusProducerComponent] Connection lost. Reason: " + cause);
System.exit(1);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
In [ ]:
try {
IBusConsumer bc1 = new BusConsumerComponent();
bc1.setBusURI("tcp://localhost:1883");
bc1.setTopic("sensor/+/temperature");
bc1.setBlockSize(3);
bc1.setNumber(6);
bc1.setVerbose(2);
IBusProducer bp = new BusProducerComponent();
bp.setBusURI("tcp://localhost:1883");
bp.setTopic("sensor/*/temperature/avg");
bp.connect(bc1);
bc1.connect(bp);
IBusConsumer bc2 = new BusConsumerComponent();
bc2.setBusURI("tcp://localhost:1883");
bc2.setTopic("sensor/*/temperature/avg");
bc2.setVerbose(2);
} catch (Exception e) {
e.printStackTrace();
}
Conecte os componentes que você achar necessário para mostrar no console temperaturas que estão acima de 23graus. Mostre apenas as temperaturas. Para este caso faça 10 leituras de sensor.
In [ ]:
Conecte os componentes que você achar necessário para mostrar no console todas as leituras de sensor cuja temperatura está entre 22 e 24 graus. Para este caso faça 20 leituras de sensor.
In [ ]:
Conecte os componentes que você achar necessário para gerar mensagens no barramento a média de todas as leituras de sensor cuja temperatura está entre 22 e 24 graus. Para este caso faça 20 leituras de sensor.
In [ ]: