In [ ]:
'''
empty_bytes =bytes(4)
#print(type(empty_bytes))
#print(empty_bytes)
mutable_bytes = bytearray(empty_bytes)
#print(mutable_bytes)
mutable_bytes[0] = 255
mutable_bytes.append(255)
print(mutable_bytes)
immutable_bytes = bytes(mutable_bytes)
print(immutable_bytes)
'''
empty_bytes = bytes(4)
mutable_bytes = bytearray(empty_bytes)
mutable_bytes[0] = 1
mutable_bytes[1] = 2
mutable_bytes[2] = 3
mutable_bytes[3] = 4
pad_bytes = bytes(mutable_bytes)
bins = bytes(0)
print(bins)
print(empty_bytes + pad_bytes)
'''
print(type(empty_bytes))
print(empty_bytes[0:2])
print(empty_bytes[:2])
print(empty_bytes[2:])
print(empty_bytes + pad_bytes)
'''
In [ ]:
from flask import Flask, url_for
app = Flask(__name__)
@app.route('/')
def api_root():
return 'Welcome'
@app.route('/articles')
def api_articles():
return 'List of ' + url_for('api_articles')
@app.route('/articles/<articleid>')
def api_article(articleid):
return 'you are reading ' + articleid
if __name__ == '__main__':
app.run()
In [ ]:
from flask import json, Flask, request
app = Flask(__name__)
@app.route('/messages', methods = ['POST'])
def api_message():
if request.headers['Content-Type'] == 'text/plain':
return "Text Message: " + request.data
elif request.headers['Content-Type'] == 'application/json':
return "JSON Message: " + json.dumps(request.json)
elif request.headers['Content-Type'] == 'application/octet-stream':
f = open('./binary', 'wb')
f.write(request.data)
f.close()
return "Binary message written!"
else:
return "JSON Message: " + json.dumps(request.json)
if __name__ == '__main__':
app.run()
In [ ]:
from flask import Flask, jsonify, abort, make_response
from flask_restful import Resource, Api
from flask_restful import reqparse
app = Flask(__name__)
tasks = [
{
'id': 1,
'title': u'Buy groceries',
'description': u'Milk, Cheese, Pizza, Fruit, Tylenol',
'done': False
},
{
'id': 2,
'title': u'Learn Python',
'description': u'Need to find a good Python tutorial on the web',
'done': False
}
]
@app.route('/todo/api/v1.0/tasks', methods=['GET'])
def get_tasks():
return jsonify({'tasks': tasks})
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['GET'])
def get_task(task_id):
task = [task for task in tasks if task['id'] == task_id]
if len(task) == 0:
abort(404)
return jsonify({'task': task[0]})
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'error': 'Not found'}), 404)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80, debug=True)
In [ ]:
# TCP server example
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("", 5000))
server_socket.listen(5)
print ("TCPServer Waiting for client on port 5000")
while 1:
client_socket, address = server_socket.accept()
print ("I got a connection from ", address)
while 1:
data = input('SEND( TYPE q or Q to Quit):')
if(data == 'Q' or data == 'q'):
client_socket.send (data.encode())
client_socket.close()
break
else:
client_socket.send(data.encode())
data = client_socket.recv(512).decode()
if(data == 'q' or data == 'Q'):
client_socket.close()
break;
else:
print ("RECEIVED:" , data)
break;
server_socket.close()
print("SOCKET closed... END")
In [ ]:
class message():
def parse(self, bins):
json = {}
_bytes = bytearray(bins)
'''
message_type :
1. W : PC 에서 ETN DEVICE로 Data를 보내는 의미로 'W'=0x57을 보낸다.
2. R : PC 에서 ETN DEVICE로 STATUS 정보를 보내라는 의미로 'R'=0x52을 보낸다.
3. A : ETN DEVICE에서 PC로 REQ에 대한 ACK를 보낸다는 의미로 'A'=0x41을 보낸다.
'''
if _bytes[0] == 0x57:
json["message_type"] = "W"
elif _bytes[0] == 0x52:
json["message_type"] = "R"
elif _bytes[0] == 0x41:
json["message_type"] = "A"
else:
json["message_type"] = "Unknown"
'''
sound_group :
1. 현재 설정된 Group 정보가 들어 있다
1) 0x00 : WS(5 Warning Sounds) *ETNB Product (built-in buzzer)
2) 0x01 : WP(Special 5 Warning Sounds)
3) 0x02 : WM(5 sounds of melody)
4) 0x03 : WA(5 sounds of alarm)
5) 0x04 : WB(Play buzzer sound)
'''
if _bytes[1] == 0x00:
json['sound_group'] = 'WS'
elif _bytes[1] == 0x01:
json['sound_group'] = 'WP'
elif _bytes[1] == 0x02:
json['sound_group'] = 'WM'
elif _bytes[1] == 0x03:
json['sound_group'] = 'WA'
elif _bytes[1] == 0x04:
json['sound_group'] = 'WB'
else:
json['sound_group'] = 'Unknown'
'''
[lamp common setting]
1) 0x00: LAMP OFF
2) 0x01: LAMP BLINK(ON / OFF)
3) 0x02: LAMP ON
4) Else: N / A
'''
'''
red_lamp
'''
if _bytes[2] == 0x00:
json["red_lamp"] = "blink"
elif _bytes[2] == 0x01:
json["red_lamp"] = "on"
elif _bytes[2] == 0x02:
json["red_lamp"] = "blink"
else:
json["red_lamp"] = "Unknown"
'''
yellow_lamp
'''
if _bytes[3] == 0x00:
json["yellow_lamp"] = "blink"
elif _bytes[3] == 0x01:
json["yellow_lamp"] = "on"
elif _bytes[3] == 0x02:
json["yellow_lamp"] = "blink"
else:
json["yellow_lamp"] = "Unknown"
'''
green_lamp
'''
if _bytes[4] == 0x00:
json["green_lamp"] = "blink"
elif _bytes[4] == 0x01:
json["green_lamp"] = "on"
elif _bytes[4] == 0x02:
json["green_lamp"] = "blink"
else:
json["green_lamp"] = "Unknown"
'''
blue_lamp
'''
if _bytes[5] == 0x00:
json["blue_lamp"] = "blink"
elif _bytes[5] == 0x01:
json["blue_lamp"] = "on"
elif _bytes[5] == 0x02:
json["blue_lamp"] = "blink"
else:
json["blue_lamp"] = "Unknown"
'''
white_lamp
'''
if _bytes[6] == 0x00:
json["white_lamp"] = "blink"
elif _bytes[6] == 0x01:
json["white_lamp"] = "on"
elif _bytes[6] == 0x02:
json["white_lamp"] = "blink"
else:
json["white_lamp"] = "Unknown"
json["sound_channel"] = int(_bytes[7])
return json
def generate(self, json):
_bytes = bytearray(bytes(10))
'''
message_type :
1. W : PC 에서 ETN DEVICE로 Data를 보내는 의미로 'W'=0x57을 보낸다.
2. R : PC 에서 ETN DEVICE로 STATUS 정보를 보내라는 의미로 'R'=0x52을 보낸다.
3. A : ETN DEVICE에서 PC로 REQ에 대한 ACK를 보낸다는 의미로 'A'=0x41을 보낸다.
'''
if json['message_type'] == 'W':
_bytes[0] = 0x57
elif json['message_type'] == 'R':
_bytes[0] = 0x52
elif json['message_type'] == 'A':
_bytes[0] = 0x41
else:
print('error')
'''
sound_group :
1. 현재 설정된 Group 정보가 들어 있다
1) 0x00 : WS(5 Warning Sounds) *ETNB Product (built-in buzzer)
2) 0x01 : WP(Special 5 Warning Sounds)
3) 0x02 : WM(5 sounds of melody)
4) 0x03 : WA(5 sounds of alarm)
5) 0x04 : WB(Play buzzer sound)
'''
if json['sound_group'] == 'WS':
_bytes[1] = 0x00
elif json['sound_group'] == 'WP':
_bytes[1] = 0x01
elif json['sound_group'] == 'WM':
_bytes[1] = 0x02
elif json['sound_group'] == 'WA':
_bytes[1] = 0x03
elif json['sound_group'] == 'WB':
_bytes[1] = 0x04
'''
[lamp common setting]
1) 0x00: LAMP OFF
2) 0x01: LAMP BLINK(ON / OFF)
3) 0x02: LAMP ON
4) Else: N / A
'''
'''
red_lamp
'''
if json["red_lamp"] == "on":
_bytes[2] = 0x01
elif json["red_lamp"] == "blink":
_bytes[2] = 0x02
else :
_bytes[2] = 0x00
'''
yellow_lamp
'''
if json["yellow_lamp"] == "on":
_bytes[3] = 0x01
elif json["yellow_lamp"] == "blink":
_bytes[3] = 0x02
else :
_bytes[3] = 0x00
'''
green_lamp
'''
if json["green_lamp"] == "on":
_bytes[4] = 0x01
elif json["green_lamp"] == "blink":
_bytes[4] = 0x02
else:
_bytes[4] = 0x00
'''
blue_lamp
'''
if json["blue_lamp"] == "on":
_bytes[5] = 0x01
elif json["blue_lamp"] == "blink":
_bytes[5] = 0x02
else:
_bytes[5] = 0x00
'''
white_lamp
'''
if json["white_lamp"] == 'on':
_bytes[6] = 0x01
elif json["white_lamp"] == "blink":
_bytes[6] = 0x02
else:
_bytes[6] = 0x00
_bytes[7] = json["sound_channel"]
return bytes(_bytes)
In [ ]:
import socket
import message
json_list = []
json = {
"message_type": "W",
"sound_group": "WP",
"red_lamp": 'blink',
"yellow_lamp": 'blink',
"green_lamp": 'on',
"blue_lamp": 'blink',
"white_lamp": 'on',
"sound_channel": 1
}
json_list.append(json)
json = {
'message_type': 'W',
'sound_group': 'WA',
'red_lamp': 'blink',
'yellow_lamp': 'blink',
'green_lamp': 'on',
'blue_lamp': 'blink',
'white_lamp': 'on',
'sound_channel': 1
}
json_list.append(json)
#message.generate(json)
msg = message.message()
bin_msg = msg.generate(json)
ip = "127.0.0.1"
port = 20000
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((ip, port))
s.send(bin_msg)
if json["message_type"] == "R" :
bins = bytes(0)
while 1:
bins += s.recv(512)
if len(bins) > 9:
json = msg.parse(bins[:10])
print(json)
break
s.close()
In [ ]:
import socket
import message
msg = message.message()
host = '127.0.0.1'
port = 20000
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen(5)
while 1:
client, address = s.accept()
bins = bytes(0)
while 1:
bins += client.recv(512)
if len(bins) > 9:
json = msg.parse(bins[:10])
print(json)
if json["message_type"] == "R":
json["message_type"] = "A"
client.send(msg.generate(json))
break