Esse é o código utilizado no módulo ESP8266 versão 01, utilizada na Campainha IoT.
Existe uma pequena diferença no código original ao item exibido em seguida, acabei realizado pequenas correções em nomeclaturas de variáveis do programa.
Ambiente de desenvolvimento pode ser baixado e editar o código fonte a seguir, segue links para instalar um SDK Lua[1] seguindo pelo link[2], caso tenha mais curiosidade sobre a linguagem Lua[3].
Descrição sobre o programa da Camapinha
-- Campainha IoT - LHC - v1.1
-- e na resposta de um "Tocou" coloca o ESP em modo DeepSleep para economizar bateria.
-- Se nenhuma resposta for recebida em 15 segundos coloca o ESP em DeepSleep.
In [1]:
led_pin = 3
status_led = gpio.LOW
ip_servidor = "X.X.X.X."
ip_campainha = "X.X.X.X."
tensao=3333
function desliga_circuito()
print("Colocando ESP em Deep Sleep")
node.dsleep(0)
end
function read_voltage() -- Desconecta do wifi para poder ler a tensao de alimentação do ESP.
wifi.sta.disconnect()
tensao = adc.readvdd33()
print("Tensão: "..Tensao)
-- Inicializa o Wifi e conecta no servidor
print("Inicializando WiFi")
init_wifi()
end
function pisca_led()
gpio.write(led_pin, status_led)
if status_led == gpio.LOW then
status_led = gpio.HIGH
else
status_led = gpio.LOW
end
end
function init_pins()
gpio.mode(led_pin, gpio.OUTPUT)
gpio.write(led_pin, status_led)
end
function init_wifi()
wifi.setmode(wifi.STATION)
wifi.sta.config("SSID", "password")
wifi.sta.connect()
wifi.sta.setip({ip=ip_campainha,netmask="255.255.255.0",gateway="X.X.X.X"})
-- Aguarda conexão com Wifi antes de enviar o request.
function try_connect()
if (wifi.sta.status() == 5) then
tmr.stop(0)
print("Conectado, mandando request")
manda_request() -- Se nenhuma confirmação for recebida em 15 segundos, desliga o ESP.
tmr.alarm(2,15000,0, desliga_circuito)
else
print("Conectando...")
end
end
tmr.alarm(0,1000,1, function() try_connect() end )
end
function manda_request()
tmr.alarm(1, 200, 1, pisca_led)
print("Request enviado") -- Cria a conexão TCP
conn=net.createConnection(net.TCP,false) -- Envia o toque da campainha e tensão para o servidor
conn:on("connection", function(conn)
conn:send("GET /?bateria=" ..tensao.. " HTTP/1.0\r\n\r\n")
end)
-- Se receber "Tocou" do servidor, desliga o ESP.
conn:on("receive", function(conn, data)
if data:find("Tocou") ~= nil then
desliga_circuito()
end
end)
-- Conectar no servidor
conn:connect(9999,ip_servidor)
end
print("Inicializando pinos")
init_pins()
print ("Lendo tensao")
read_voltage()
-- Servidor local, roda o programa em python para receber comunicação do dispositivo, executar uma ação local de audio e enviar informações para plataforma online.
-- Verificando caso utilize Linux o import deve conter bibliotecas do sistema.
-- O código fonte está localizado no diretório /scr/servidor.py
In [ ]:
#!/usr/bin/python2
import time
import BaseHTTPServer
import os
import random
import string
import requests
from urlparse import parse_qs, urlparse
HOST_NAME = '0.0.0.0'
PORT_NUMBER = 9999
# A variável MP3_DIR será construida tendo como base o diretório HOME do usuário + Music/Campainha
# (e.g: /home/usuario/Music/Campainha)
MP3_DIR = os.path.join(os.getenv('HOME'), 'Music', 'Campainha')
VALID_CHARS = set(string.ascii_letters + string.digits + '_.')
CHAVE_THINGSPEAK = 'YOURKEY'
# Salva o arquivo de log no diretório do usuário (e.g: /home/usuário/campainha.log)
ARQUIVO_LOG = os.path.join(os.getenv('HOME'), 'campainha.log')
def filtra(mp3):
if not mp3.endswith('.mp3'):
return False
for c in mp3:
if not c in VALID_CHARS:
return False
return True
def log(msg, output_file=None):
if output_file is None:
output_file = open(ARQUIVO_LOG, 'a')
output_file.write('%s: %s\n' % (time.asctime(), msg))
output_file.flush()
class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(s):
s.send_header("Content-type", "text/plain")
query = urlparse(s.path).query
if not query:
s.send_response(404)
s.end_headers()
s.wfile.write('Not found')
return
components = dict(qc.split('=') for qc in query.split('&'))
if not 'bateria' in components:
s.send_response(404)
s.end_headers()
s.wfile.write('Not found')
return
s.send_response(200)
s.end_headers()
s.wfile.write('Tocou')
s.wfile.flush()
log("Atualizando thingspeak")
r = requests.post('https://api.thingspeak.com/update',
data={'api_key': CHAVE_THINGSPEAK, 'field1': components['bateria']})
log("Thingspeak retornou: %d" % r.status_code)
log("Tocando MP3")
mp3s = [f for f in os.listdir(MP3_DIR) if filtra(f)]
mp3 = random.choice(mp3s)
os.system("mpv " + os.path.join(MP3_DIR, mp3))
if __name__ == '__main__':
server_class = BaseHTTPServer.HTTPServer
httpd = server_class((HOST_NAME, PORT_NUMBER), MyHandler)
log("Server Starts - %s:%s" % (HOST_NAME, PORT_NUMBER))
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
log("Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER))