byte handling examples

  • bytes()
  • bytearray()

In [ ]:
'''
empty_bytes =bytes(4)
#print(type(empty_bytes))
#print(empty_bytes)

mutable_bytes = bytearray(empty_bytes)
#print(mutable_bytes)

mutable_bytes[0] = 255
mutable_bytes.append(255)
print(mutable_bytes)

immutable_bytes = bytes(mutable_bytes)
print(immutable_bytes)
'''
empty_bytes = bytes(4)
mutable_bytes = bytearray(empty_bytes)
mutable_bytes[0] = 1
mutable_bytes[1] = 2
mutable_bytes[2] = 3
mutable_bytes[3] = 4
pad_bytes = bytes(mutable_bytes)
bins = bytes(0)
print(bins)
print(empty_bytes + pad_bytes)
'''
print(type(empty_bytes))
print(empty_bytes[0:2])
print(empty_bytes[:2])
print(empty_bytes[2:])
print(empty_bytes + pad_bytes)
'''

Simple web api - I


In [ ]:
from flask import Flask, url_for
app = Flask(__name__)

@app.route('/')
def api_root():
    return 'Welcome'

@app.route('/articles')
def api_articles():
    return 'List of ' + url_for('api_articles')

@app.route('/articles/<articleid>')
def api_article(articleid):
    return 'you are reading ' + articleid

if __name__ == '__main__':
    app.run()

Simple Web Api II


In [ ]:
from flask import json, Flask, request
app = Flask(__name__)
@app.route('/messages', methods = ['POST'])
def api_message():

    if request.headers['Content-Type'] == 'text/plain':
        return "Text Message: " + request.data

    elif request.headers['Content-Type'] == 'application/json':
        return "JSON Message: " + json.dumps(request.json)

    elif request.headers['Content-Type'] == 'application/octet-stream':
        f = open('./binary', 'wb')
        f.write(request.data)
        f.close()
        return "Binary message written!"
    else:
        return "JSON Message: " + json.dumps(request.json)

if __name__ == '__main__':
    app.run()

More detailed Web Api


In [ ]:
from flask import Flask, jsonify, abort, make_response
from flask_restful import Resource, Api
from flask_restful import reqparse

app = Flask(__name__)

tasks = [
    {
        'id': 1,
        'title': u'Buy groceries',
        'description': u'Milk, Cheese, Pizza, Fruit, Tylenol',
        'done': False
    },
    {
        'id': 2,
        'title': u'Learn Python',
        'description': u'Need to find a good Python tutorial on the web',
        'done': False
    }
]

@app.route('/todo/api/v1.0/tasks', methods=['GET'])
def get_tasks():
    return jsonify({'tasks': tasks})

@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['GET'])
def get_task(task_id):
    task = [task for task in tasks if task['id'] == task_id]
    if len(task) == 0:
        abort(404)
    return jsonify({'task': task[0]})

@app.errorhandler(404)
def not_found(error):
    return make_response(jsonify({'error': 'Not found'}), 404)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80, debug=True)

TCP Server Example


In [ ]:
# TCP server example
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("", 5000))
server_socket.listen(5)

print ("TCPServer Waiting for client on port 5000")

while 1:
	client_socket, address = server_socket.accept()
	print ("I got a connection from ", address)
	while 1:
		data = input('SEND( TYPE q or Q to Quit):')
		if(data == 'Q' or data == 'q'):
			client_socket.send (data.encode())
			client_socket.close()
			break
		else:
			client_socket.send(data.encode())
		
		data = client_socket.recv(512).decode()
		if(data == 'q' or data == 'Q'):
			client_socket.close()
			break;
		else:
			print ("RECEIVED:" , data)
	break;
server_socket.close()
print("SOCKET closed... END")

Message Class


In [ ]:
class message():

    def parse(self, bins):
        json = {}
        _bytes = bytearray(bins)

        '''
        message_type :        
            1. W : PC 에서 ETN DEVICE로 Data를 보내는 의미로 'W'=0x57을 보낸다.
            2. R : PC 에서 ETN DEVICE로 STATUS 정보를 보내라는 의미로 'R'=0x52을 보낸다.
            3. A : ETN DEVICE에서 PC로 REQ에 대한 ACK를 보낸다는 의미로 'A'=0x41을 보낸다.
        '''        
        if _bytes[0] == 0x57:
            json["message_type"] = "W"        
        elif _bytes[0] == 0x52:
            json["message_type"] = "R"
        elif _bytes[0] == 0x41:
            json["message_type"] = "A"
        else:
            json["message_type"] = "Unknown"

        '''
        sound_group : 
            1. 현재 설정된 Group 정보가 들어 있다
                1) 0x00 : WS(5 Warning Sounds) *ETNB Product (built-in buzzer)
                2) 0x01 : WP(Special 5 Warning Sounds)
                3) 0x02 : WM(5 sounds of melody)
                4) 0x03 : WA(5 sounds of alarm)
                5) 0x04 : WB(Play buzzer sound)
        '''
        if _bytes[1] == 0x00:
            json['sound_group'] = 'WS'
        elif _bytes[1] == 0x01:
            json['sound_group'] = 'WP'
        elif _bytes[1] == 0x02:
            json['sound_group'] = 'WM'
        elif _bytes[1] == 0x03:
            json['sound_group'] = 'WA'
        elif _bytes[1] == 0x04:
            json['sound_group'] = 'WB'
        else:
            json['sound_group'] = 'Unknown'

        '''
        [lamp common setting]
            1) 0x00: LAMP OFF
            2) 0x01: LAMP BLINK(ON / OFF)
            3) 0x02: LAMP ON
            4) Else: N / A
        '''

        '''
        red_lamp
        '''           
        if _bytes[2] == 0x00:
            json["red_lamp"] = "blink"
        elif _bytes[2] == 0x01:
            json["red_lamp"] = "on"
        elif _bytes[2] == 0x02:
            json["red_lamp"] = "blink"
        else:
            json["red_lamp"] = "Unknown"

        '''
        yellow_lamp
        '''
        if _bytes[3] == 0x00:
            json["yellow_lamp"] = "blink"
        elif _bytes[3] == 0x01:
            json["yellow_lamp"] = "on"
        elif _bytes[3] == 0x02:
            json["yellow_lamp"] = "blink"
        else:
            json["yellow_lamp"] = "Unknown"

        '''
        green_lamp
        '''
        if _bytes[4] == 0x00:
            json["green_lamp"] = "blink"
        elif _bytes[4] == 0x01:
            json["green_lamp"] = "on"
        elif _bytes[4] == 0x02:
            json["green_lamp"] = "blink"
        else:
            json["green_lamp"] = "Unknown"
        
        '''
        blue_lamp
        '''
        if _bytes[5] == 0x00:
            json["blue_lamp"] = "blink"
        elif _bytes[5] == 0x01:
            json["blue_lamp"] = "on"
        elif _bytes[5] == 0x02:
            json["blue_lamp"] = "blink"
        else:
            json["blue_lamp"] = "Unknown"

        '''
        white_lamp
        '''
        if _bytes[6] == 0x00:
            json["white_lamp"] = "blink"
        elif _bytes[6] == 0x01:
            json["white_lamp"] = "on"
        elif _bytes[6] == 0x02:
            json["white_lamp"] = "blink"
        else:
            json["white_lamp"] = "Unknown"

        json["sound_channel"] = int(_bytes[7])

        return json

    def generate(self, json):
        _bytes = bytearray(bytes(10))

        '''
        message_type :        
            1. W : PC 에서 ETN DEVICE로 Data를 보내는 의미로 'W'=0x57을 보낸다.
            2. R : PC 에서 ETN DEVICE로 STATUS 정보를 보내라는 의미로 'R'=0x52을 보낸다.
            3. A : ETN DEVICE에서 PC로 REQ에 대한 ACK를 보낸다는 의미로 'A'=0x41을 보낸다.
        '''
        if json['message_type'] == 'W':
            _bytes[0] = 0x57
        elif json['message_type'] == 'R':
            _bytes[0] = 0x52
        elif json['message_type'] == 'A':
            _bytes[0] = 0x41
        else:
            print('error')

        '''
        sound_group : 
            1. 현재 설정된 Group 정보가 들어 있다
                1) 0x00 : WS(5 Warning Sounds) *ETNB Product (built-in buzzer)
                2) 0x01 : WP(Special 5 Warning Sounds)
                3) 0x02 : WM(5 sounds of melody)
                4) 0x03 : WA(5 sounds of alarm)
                5) 0x04 : WB(Play buzzer sound)
        '''
        if json['sound_group'] == 'WS':
            _bytes[1] = 0x00
        elif json['sound_group'] == 'WP':    
            _bytes[1] = 0x01
        elif json['sound_group'] == 'WM':    
            _bytes[1] = 0x02
        elif json['sound_group'] == 'WA':    
            _bytes[1] = 0x03
        elif json['sound_group'] == 'WB':    
            _bytes[1] = 0x04

        '''
        [lamp common setting]
            1) 0x00: LAMP OFF
            2) 0x01: LAMP BLINK(ON / OFF)
            3) 0x02: LAMP ON
            4) Else: N / A
        '''

        '''
        red_lamp
        '''           
        if json["red_lamp"] == "on":
            _bytes[2] = 0x01
        elif json["red_lamp"] == "blink":
            _bytes[2] = 0x02
        else :
            _bytes[2] = 0x00

        '''
        yellow_lamp
        '''
        if json["yellow_lamp"] == "on":
            _bytes[3] = 0x01
        elif json["yellow_lamp"] == "blink":
            _bytes[3] = 0x02
        else :
            _bytes[3] = 0x00

        '''
        green_lamp
        '''
        if json["green_lamp"] == "on":
            _bytes[4] = 0x01
        elif json["green_lamp"] == "blink":
            _bytes[4] = 0x02
        else:
            _bytes[4] = 0x00

        '''
        blue_lamp
        '''
        if json["blue_lamp"] == "on":
            _bytes[5] = 0x01
        elif json["blue_lamp"] == "blink":
            _bytes[5] = 0x02
        else:
            _bytes[5] = 0x00

        '''
        white_lamp
        '''
        if json["white_lamp"] == 'on':
            _bytes[6] = 0x01
        elif json["white_lamp"] == "blink":
            _bytes[6] = 0x02
        else:
            _bytes[6] = 0x00

        _bytes[7] = json["sound_channel"]

        return bytes(_bytes)

Socket Client Example


In [ ]:
import socket
import message

json_list = []

json = {
    "message_type": "W",
    "sound_group": "WP",
    "red_lamp": 'blink',
    "yellow_lamp": 'blink',
    "green_lamp": 'on',
    "blue_lamp": 'blink',
    "white_lamp": 'on',
    "sound_channel": 1
}

json_list.append(json)


json = { 
    'message_type': 'W',
    'sound_group': 'WA',
    'red_lamp': 'blink',
    'yellow_lamp': 'blink',
    'green_lamp': 'on', 
    'blue_lamp': 'blink', 
    'white_lamp': 'on', 
    'sound_channel': 1
}

json_list.append(json)

#message.generate(json)
msg = message.message()
bin_msg = msg.generate(json)
ip = "127.0.0.1"
port = 20000
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((ip, port))   
    s.send(bin_msg)
    if json["message_type"] == "R" :
        bins = bytes(0)
        while 1:
            bins += s.recv(512)
            if len(bins) > 9:
                json = msg.parse(bins[:10])  
                print(json)              
                break
    s.close()

Socker Server Example


In [ ]:
import socket
import message

msg = message.message()
host = '127.0.0.1'
port = 20000

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((host, port))
    s.listen(5)
    while 1:
        client, address = s.accept()
        bins = bytes(0)
        while 1:        
            bins += client.recv(512)            
            if len(bins) > 9:
                json = msg.parse(bins[:10])
                print(json)
                if json["message_type"] == "R":
                    json["message_type"] = "A"
                    
                    client.send(msg.generate(json))                
                break