Catálogo de Componentes com Mensagens

Interface ITableProducer compartilhada


In [ ]:
public interface ITableProducer {
  String[] requestAttributes();
  String[][] requestInstances();
}

In [ ]:
public interface ITableReceptacle {
  public void connect(ITableProducer producer);
}

Interface IUpdate compartilhada


In [ ]:
public interface IUpdate {
  public void update();
}

In [ ]:
public interface IUpdateReceptacle {
  public void connect(IUpdate notify);
}

Console


In [ ]:
public interface IConsole extends ITableReceptacle, IUpdate {
}

In [ ]:
public class ConsoleComponent implements IConsole {
  private ITableProducer iProducer;
  
  public void connect(ITableProducer producer) {
    iProducer = producer;
  }
  
  public void update() {
    if (iProducer != null) {
        System.out.println("=== Attributes ===");
        String attributes[] = iProducer.requestAttributes();
        for (int a = 0; a < attributes.length-1; a++)
          System.out.print(attributes[a] + ", ");
        System.out.println(attributes[attributes.length-1]);

        System.out.println();
        System.out.println("=== Instances ===");
        String instances[][] = iProducer.requestInstances();
        for (int i = 0; i < instances.length; i++) {
          for (int a = 0; a < attributes.length-1; a ++)
            System.out.print(instances[i][a] + ", ");
          System.out.println(instances[i][attributes.length-1]);
        }
    }
  }
}

SensorReading e Message


In [ ]:
import java.util.Date;

public class SensorReading {
   private Date timestamp;
   private String dimension;
   private double value;
   private String unity;
   
   public SensorReading() {
      /* nothing */
   }
   
   public SensorReading(Date timestamp, String dimension, double value, String unity) {
      this.timestamp = timestamp;
      this.dimension = dimension;
      this.value = value;
      this.unity = unity;
   }

   public Date getTimestamp() {
      return timestamp;
   }
   
   public void setTimestamp(Date timestamp) {
      this.timestamp = timestamp;
   }
   
   public String getDimension() {
      return dimension;
   }
   
   public void setDimension(String dimension) {
      this.dimension = dimension;
   }
   
   public double getValue() {
      return value;
   }
   
   public void setValue(double value) {
      this.value = value;
   }
   
   public String getUnity() {
      return unity;
   }
   
   public void setUnity(String unity) {
      this.unity = unity;
   }
}

In [ ]:
public class Message {
   private String source;
   private String name;
   private String type;
   private SensorReading body;
   
   public Message() {
      /* nothing */
   }
   
   public Message(String source, String name, String type, SensorReading body) {
      super();
      this.source = source;
      this.name = name;
      this.type = type;
      this.body = body;
   }

   public String getSource() {
      return source;
   }
   
   public void setSource(String source) {
      this.source = source;
   }
   
   public String getName() {
      return name;
   }
   
   public void setName(String name) {
      this.name = name;
   }
   
   public String getType() {
      return type;
   }
   
   public void setType(String type) {
      this.type = type;
   }
   
   public SensorReading getBody() {
      return body;
   }
   
   public void setBody(SensorReading body) {
      this.body = body;
   }
}

BusConsumer


In [ ]:
public interface IBusProperties {
  public String getBusURI();
  public void setBusURI(String dataSource);
  public String getTopic();
  public void setTopic(String topic);
}

In [ ]:
public interface IBusConsumerProperties extends IBusProperties {
  public int getBlockSize();
  public void setBlockSize(int blockSize);
  public int getNumber();
  public void setNumber(int number);
  public int getVerbose();
  public void setVerbose(int verbose);
}

In [ ]:
public interface IBusConsumer extends ITableProducer, IBusConsumerProperties, IUpdateReceptacle {
}

In [ ]:
%classpath add jar ../../../../src/lib/mqttv3-1.1.2.jar
%classpath add jar ../../../../src/lib/mysql-connector-java-5.1.17-bin.jar
%classpath add jar ../../../../src/lib/genson-1.6.jar

In [ ]:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.UUID;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.owlike.genson.Genson;
import com.owlike.genson.GensonBuilder;

public class BusConsumerComponent implements IBusConsumer, MqttCallback {
  private String busURI = null;
  private String topic = null;
  private int blockSize = 5;
  private int number = 10;
  private int verbose = 2;
  
  private String[] attributes = {"message", "timestamp", "dimension", "value", "unit"};
  private ArrayList<String[]> instArray = new ArrayList<String[]>();
  private IUpdate notify = null;
  private int count = 0;
  private int total = 0;
  
  private Genson genson;

  private MqttClient client;
  private MqttConnectOptions connectionOptions;
  private String clientID;  
  
  public BusConsumerComponent() {
    /* nothing */
  }

  public String getBusURI() {
    return busURI;
  }

  public void setBusURI(String busURI) {
    this.busURI = busURI;
  }
  
  public String getTopic() {
     return topic;
  }
  
  public void setTopic(String topic) {
     this.topic = topic;
     readDS();
  }
  
  public int getBlockSize() {
     return blockSize;
  }
  
  public void setBlockSize(int blockSize) {
     this.blockSize = blockSize;
  }
  
  public int getNumber() {
     return number;
  }
  
  public void setNumber(int number) {
     this.number = number;
  }
  
  public int getVerbose() {
     return verbose;
  }
  
  public void setVerbose(int verbose) {
     this.verbose = verbose;
  }
  
  public void connect(IUpdate notify) {
     this.notify = notify;
  }
  
  public String[] requestAttributes() {
    return attributes;
  }
  
  public String[][] requestInstances() {
    String instances[][] = instArray.toArray(new String[0][]);
    instArray = new ArrayList<String[]>();
    return instances;
  }
  
  private void readDS() {
    genson = new GensonBuilder()
      .useDateFormat(new SimpleDateFormat("yyyy-MM-dd"))
      .useIndentation(true)
      .useConstructorWithArguments(false)
      .create();
     
    clientID = UUID.randomUUID().toString();      
    connectionOptions = new MqttConnectOptions();
    connectionOptions.setAutomaticReconnect(true);
    connectionOptions.setCleanSession(true);
    connectionOptions.setConnectionTimeout(20);
     
    try {
      client = new MqttClient(busURI, clientID);
      client.connect(connectionOptions);            
      client.setCallback(this);
      client.subscribe(topic);
      System.out.println("[MQTTConsumer] connected");            
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
  
  @Override
  public void connectionLost(Throwable cause) {
    System.out.println("[BusConsumerComponent] Connection lost.  Reason: " + cause);
    System.exit(1);
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws MqttException {
    if (total < number) {
       Message mess = genson.deserialize(new String(message.getPayload()), Message.class);
       
       SensorReading body = mess.getBody();
       
       count++;
       total++;
       String reading[] = {Integer.toString(count),
                           Long.toString(body.getTimestamp().getTime()), body.getDimension(),
                           Double.toString(body.getValue()), body.getUnity()}; 
       instArray.add(reading);
       
       switch (verbose) {
          case 1: System.out.println("message: " + count); break;
          case 2: System.out.println("message: " + count);
                  System.out.println("topic: " + topic);
                  String json = genson.serialize(mess);
                  System.out.println(json);
                  break;
       }
   
       if (count == blockSize && notify != null) {
          count = 0;
          notify.update();
       }
    }
    
  }


  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
  }
}

Conectando BusConsumer

Propriedades

  • verbose - define se mostrará na tela as mensagens recebidas:
    • 0 - não mostra
    • 1 - mostra somente contagem
    • 2 - mostra contagem e mensagem

In [ ]:
try {
    IBusConsumer bc = new BusConsumerComponent();
    bc.setBusURI("tcp://localhost:1883");
    bc.setTopic("sensor/+/+");
    bc.setNumber(3);
    bc.setVerbose(2);
  } catch (Exception e) {
    e.printStackTrace();
  }

Conectando com o Console


In [ ]:
try {
     IBusConsumer bc = new BusConsumerComponent();
     bc.setBusURI("tcp://localhost:1883");
     bc.setTopic("sensor/+/+");
     bc.setBlockSize(5);
     bc.setNumber(2);
     bc.setVerbose(2);
 
     IConsole console = new ConsoleComponent();
     console.connect(bc);

     bc.connect(console);
  } catch (Exception e) {
    e.printStackTrace();
  }

Projeção


In [ ]:
public interface IProjectionProperties {
  String[] getAttributes();
  void setAttributes(String attribute[]);
}

In [ ]:
public interface IProjection extends IProjectionProperties, ITableProducer, ITableReceptacle {
}

In [ ]:
public class ProjectionComponent implements IProjection {
  private ITableProducer provider;
  
  private String attributes[] = null;
  
  public String[] getAttributes() {
    return attributes;
  }

  public void setAttributes(String attributes[]) {
    this.attributes = attributes;
  }
  
  public void connect(ITableProducer provider) {
    this.provider = provider;
  }
  
  public String[] requestAttributes() {
    return attributes;
  }
  
  public String[][] requestInstances() {
    String[][] instances = null;
    
    if (provider != null) {
      String[][] allInstances = provider.requestInstances();
      if (allInstances != null  && attributes != null) {
        instances = new String[allInstances.length][];
        
        // busca a posicao dos atributos selecionados na tabela original
        String[] allAttributes = provider.requestAttributes();
        int attrPos[] = new int[attributes.length];
        for (int as = 0; as < attributes.length; as++) {
          int aa;
          for (aa = 0; aa < allAttributes.length &&
               !attributes[as].equalsIgnoreCase(allAttributes[aa]); aa++)
            /* nothing */;
          if (aa < allAttributes.length)
            attrPos[as] = aa;
          else
            attrPos[as] = -1;
        }
        
        // filtra atributos selecionados
        for (int i = 0; i < allInstances.length; i++) {
          instances[i] = new String[attributes.length];
          for (int as = 0; as < attributes.length; as++)
            if (attrPos[as] > -1)
              instances[i][as] = allInstances[i][attrPos[as]];
        }
      }
    }
    
    return instances;
  }
}

In [ ]:
try {
    IBusConsumer bc = new BusConsumerComponent();
    bc.setBusURI("tcp://localhost:1883");
    bc.setTopic("sensor/+/+");
    bc.setBlockSize(3);
    bc.setNumber(6);
    bc.setVerbose(1);

    IProjection projection = new ProjectionComponent();
    String[] attributes = {"dimension", "value"};
    projection.setAttributes(attributes);
    projection.connect(bc);

    IConsole console = new ConsoleComponent();
    console.connect(projection);

    bc.connect(console);
  } catch (Exception e) {
    e.printStackTrace();
  }

Seleção


In [ ]:
public interface ISelectionProperties {
  String getAttribute();
  void setAttribute(String attributeA);
  public String getOperator();
  public void setOperator(String operator);
  public String getValue();
  public void setValue(String value);
  boolean isNominalComparison();
  void setNominalComparison(boolean nominalComparison);
}

In [ ]:
public interface ISelection extends ISelectionProperties, ITableProducer, ITableReceptacle {
}

In [ ]:
import java.util.ArrayList;

public class SelectionComponent implements ISelection {
  private ITableProducer provider;
  
  private String attribute = null,
                 operator = null,
                 value = null;
  private boolean nominalComparison = true;
  
  public String getAttribute() {
    return attribute;
  }

  public void setAttribute(String attribute) {
    this.attribute = attribute;
  }
  
  public String getOperator() {
    return operator;
  }
  
  public void setOperator(String operator) {
    this.operator = operator;
  }
  
  public String getValue() {
    return value;
  }
  
  public void setValue(String value) {
    this.value = value;
  }
  
  public boolean isNominalComparison() {
    return nominalComparison;
  }

  public void setNominalComparison(boolean nominalComparison) {
    this.nominalComparison = nominalComparison;
  }

  public void connect(ITableProducer provider) {
    this.provider = provider;
  }
  
  public String[] requestAttributes() {
    return (provider == null) ? null : provider.requestAttributes();
  }
  
  public String[][] requestInstances() {
    ArrayList<String[]> instances = null;
    
    if (provider != null) {
      String[][] allInstances = provider.requestInstances();
      
      if (allInstances != null) {
        // busca a posicao dos atributos selecionados na tabela original
        String[] allAttributes = provider.requestAttributes();
        int atrPos;
        for (atrPos = 0; atrPos < allAttributes.length &&
             !attribute.equalsIgnoreCase(allAttributes[atrPos]); atrPos++)
          /* nothing */;
        if (atrPos < allAttributes.length) {
          instances = new ArrayList<String[]>();
          
          for (String[] ai: allInstances) {
            boolean match = false;
            if (nominalComparison) {
              switch (operator.charAt(0)) {
                case '=': if (ai[atrPos].equalsIgnoreCase(value))
                       match = true;
                     break;
                case '<': if (ai[atrPos].compareTo(value) < 0)
                       match = true;
                     break;
                case '>': if (ai[atrPos].compareTo(value) > 0)
                       match = true;
                     break;
                case '!': if (!ai[atrPos].equalsIgnoreCase(value))
                     match = true;
                     break;
              }
            } else {
              switch (operator.charAt(0)) {
                case '=': if (Double.parseDouble(ai[atrPos]) == Double.parseDouble(value))
                            match = true;
                          break;
                case '<': if (Double.parseDouble(ai[atrPos]) < Double.parseDouble(value))
                            match = true;
                          break;
                case '>': if (Double.parseDouble(ai[atrPos]) > Double.parseDouble(value))
                            match = true;
                          break;
                case '!': if (Double.parseDouble(ai[atrPos]) != Double.parseDouble(value))
                            match = true;
                          break;
              }
            }
            if (match)
              instances.add(ai);
                
          }
        }
      }
    }
    
    return (instances == null) ? null : instances.toArray(new String[1][]);
  }
}

In [ ]:
try {
    IBusConsumer bc = new BusConsumerComponent();
    bc.setBusURI("tcp://localhost:1883");
    bc.setTopic("sensor/+/+");
    bc.setBlockSize(3);
    bc.setNumber(6);
    bc.setVerbose(1);

    ISelection selection = new SelectionComponent();
    selection.connect(bc);
    selection.setAttribute("value");
    selection.setOperator(">");
    selection.setValue("22");
    selection.setNominalComparison(false);

    IConsole console = new ConsoleComponent();
    console.connect(selection);

    bc.connect(console);
  } catch (Exception e) {
    e.printStackTrace();
  }

Producer Component - Média


In [ ]:
public interface IBusProducerProperties extends IBusProperties {
  public int getVerbose();
  public void setVerbose(int verbose);
}

In [ ]:
public interface IBusProducer extends ITableReceptacle, IBusProducerProperties, IUpdate {
}

In [ ]:
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.UUID;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.owlike.genson.Genson;
import com.owlike.genson.GensonBuilder;

public class BusProducerComponent implements IBusProducer, MqttCallback {
  private String busURI = null;
  private String topic = null;
  private int verbose = 2;
  
  private int count = 0;
  
  private ITableProducer iProducer;
  
  private Genson genson;

  private MqttClient client;
  private MqttConnectOptions connectionOptions;
  private String clientID;
  private final int qos = 1; 
  
  public BusProducerComponent() {
     genson = new GensonBuilder()
           .useDateFormat(new SimpleDateFormat("yyyy-MM-dd"))
           .useIndentation(true)
           .useConstructorWithArguments(false)
           .create();
  }

  public String getBusURI() {
    return busURI;
  }

  public void setBusURI(String busURI) {
    this.busURI = busURI;
    clientID = UUID.randomUUID().toString();      
    connectionOptions = new MqttConnectOptions();
    connectionOptions.setAutomaticReconnect(true);
    connectionOptions.setCleanSession(true);
    connectionOptions.setConnectionTimeout(20);
     
    try {
      client = new MqttClient(busURI, clientID);
      client.connect(connectionOptions);            
      client.setCallback(this);
      System.out.println("[MQTTProducer] connected");            
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
  
  public String getTopic() {
     return topic;
  }
  
  public void setTopic(String topic) {
     this.topic = topic;
  }
  
  public int getVerbose() {
     return verbose;
  }
  
  public void setVerbose(int verbose) {
     this.verbose = verbose;
  }
  
  public void connect(ITableProducer producer) {
     iProducer = producer;
  }
   
  public void update() {
     if (iProducer != null) {
        String instances[][] = iProducer.requestInstances();
        double av = 0;
        for (int i = 0; i < instances.length; i++)
           av += Double.parseDouble(instances[i][3]);
        av = (av == 0) ? 0 : av / instances.length;
        
        Message mess = new Message("statistics", "avg", "calculus",
              new SensorReading(Calendar.getInstance().getTime(), "temperature", av, instances[0][4])
        );
        String json = genson.serialize(mess);
        MqttMessage message = new MqttMessage(json.getBytes());
        message.setQos(qos);
        
         try {
            client.publish(topic, message);
          } catch (MqttException e) {
            e.printStackTrace();
          }
        count++;
        switch (verbose) {
        case 1: System.out.println("message: " + count); break;
        case 2: System.out.println("message: " + count);
                System.out.println(json);
                break;
        }        
     }
   }
  
  @Override
  public void connectionLost(Throwable cause) {
    System.out.println("[BusProducerComponent] Connection lost.  Reason: " + cause);
    System.exit(1);
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws MqttException {
  }

  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
  }
}

Publicando a Média


In [ ]:
try {
     IBusConsumer bc1 = new BusConsumerComponent();
     bc1.setBusURI("tcp://localhost:1883");
     bc1.setTopic("sensor/+/temperature");
     bc1.setBlockSize(3);
     bc1.setNumber(6);
     bc1.setVerbose(2);

     IBusProducer bp = new BusProducerComponent();
     bp.setBusURI("tcp://localhost:1883");
     bp.setTopic("sensor/*/temperature/avg");
     bp.connect(bc1);

     bc1.connect(bp);

     IBusConsumer bc2 = new BusConsumerComponent();
     bc2.setBusURI("tcp://localhost:1883");
     bc2.setTopic("sensor/*/temperature/avg");
     bc2.setVerbose(2);
  } catch (Exception e) {
    e.printStackTrace();
  }

Exercícios

Exercício 1

Conecte os componentes que você achar necessário para mostrar no console temperaturas que estão acima de 23graus. Mostre apenas as temperaturas. Para este caso faça 10 leituras de sensor.


In [ ]:

Exercício 2

Conecte os componentes que você achar necessário para mostrar no console todas as leituras de sensor cuja temperatura está entre 22 e 24 graus. Para este caso faça 20 leituras de sensor.


In [ ]:

Exercício 3

Conecte os componentes que você achar necessário para gerar mensagens no barramento a média de todas as leituras de sensor cuja temperatura está entre 22 e 24 graus. Para este caso faça 20 leituras de sensor.


In [ ]: