网络编程和异步IO都有非常丰富的内容。这里只不过是非常简单的介绍。python的socket模块可以用来进行TCP/UDP协议的简单处理

1. TCP - 可靠连接

- TCP协议负责在两台计算机之间建立可靠的链接,保证数据按顺序到达

2. UDP - 面向无连接的协议

- UDP协议不需要建立连接,不能保证数据到达性和顺序性

3. 协程(coroutine)

- 协程在执行过程中,子程序内部中断,然后转而去执行别的子程序,在适当的时候再返回来接着执行。类似CPU的中断
- 协程不是函数调用
- 协程通过yield和send函数进行协作
- 材料[A Curious Course on Coroutines and Concurrency](http://www.dabeaz.com/coroutines/index.html

4. asyncio

- 对异步IO的支持
- @asyncio.coroutine装饰器
- yield from调用另一个coroutine
- get_event_loop()/run_until_complete()/await()

5. async/await新语法

- @asyncio.coroutine换成asyncio
- yield from换成await

In [1]:
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('www.sina.com.cn', 80))
s.send(b'GET / HTTP/1.1\r\nHost: www.sina.com.cn\r\nConnection: close\r\n\r\n')
buffer = [] 
while True:
    d = s.recv(1024)
    if d:
        buffer.append(d)
    else:
        break
data = b''.join(buffer)
s.close()
header, html = data.split(b'\r\n\r\n', 1)
print(header.decode('utf-8'))

#Server
def tcplink(sock, addr):
    print('Accept new connection from %s:%s...' %)


HTTP/1.1 200 OK
Server: nginx
Date: Wed, 21 Jun 2017 15:48:25 GMT
Content-Type: text/html
Last-Modified: Wed, 21 Jun 2017 15:47:05 GMT
Vary: Accept-Encoding
Expires: Wed, 21 Jun 2017 15:49:25 GMT
Cache-Control: max-age=60
X-Powered-By: shci_v1.03
Age: 18
Content-Length: 598447
X-Cache: HIT from cnc.xidan.sinacache.87.nb.sinaedge.com
Connection: close

In [ ]:
# server
 
import socket
import threading
import time
 
def tcplink(sock, addr):
    print('Accept new connection from %s:%s...' %addr)
    sock.send(b'Welcome!')
    while True:
        data = sock.recv(1024)
        time.sleep(1)
        if not data or data.decode('utf-8') == 'exit':
            break
        sock.send(('Hello, %s!' %data.decode('utf-8')).encode('utf-8'))
    sock.close()
    print('Connection from %s:%s closed.' %addr)

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 9999))
s.listen(5)
print('Waiting for connection...')
while True:
    sock, addr = s.accept()
    t = threading.Thread(target=tcplink, args=(sock, addr))
    t.start()

# client
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 9999))
print(s.recv(1024).decode('utf-8'))
for data in [b'Michael', b'Tracy', b'Sarah']:
    s.send(data)
    print(s.recv(1024).decode('utf-8'))
s.send(b'exit')
s.close()

In [ ]:
# UDP
# server
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(('127.0.0.1', 9998))

print('Bind UDP on 9998')

while True:
    data, addr = s.recvfrom(1024)
    print('Received from %s:%s' %addr)
    s.sendto(b'Hello, %s!' %data, addr)

import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 
for data in [b'Michael', b'Tracy', b'Sarah']:
    s.sendto(data, ('127.0.0.1', 9998))
    print(s.recv(1024).decode('utf-8'))
s.close()

In [1]:
def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' %n)
        r = '200 0k'
        
def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' %n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' %r)
    c.close()

c = consumer()
produce(c)


[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 0k
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 0k
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 0k
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 0k
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 0k

In [1]:
import asyncio
import threading

@asyncio.coroutine
def hello():
    print('Hello world!')
    r = yield from asyncio.sleep(10)
    print('Hello again!')

loop = asyncio.get_event_loop()
loop.run_until_complete(hello())


Hello world!
Hello again!
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-1-7645a6507625> in <module>()
     20 loop = asyncio.get_event_loop()
     21 tasks = [hello(), hello()]
---> 22 loop.run_until_complete(tasks)

/usr/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
    365 
    366         new_task = not isinstance(future, futures.Future)
--> 367         future = tasks.ensure_future(future, loop=self)
    368         if new_task:
    369             # An exception is raised if the future didn't complete, so there

/usr/lib/python3.5/asyncio/tasks.py in ensure_future(coro_or_future, loop)
    552         return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    553     else:
--> 554         raise TypeError('A Future, a coroutine or an awaitable is required')
    555 
    556 

TypeError: A Future, a coroutine or an awaitable is required

In [2]:
import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! %s' %threading.currentThread())
    yield from asyncio.sleep(5)
    print('Hello world! %s' %threading.currentThread())
    
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))


Hello world! <_MainThread(MainThread, started 139851585234688)>
Hello world! <_MainThread(MainThread, started 139851585234688)>
Hello world! <_MainThread(MainThread, started 139851585234688)>
Hello world! <_MainThread(MainThread, started 139851585234688)>
Out[2]:
({<Task finished coro=<hello() done, defined at <ipython-input-2-4a75ec31209a>:4> result=None>,
  <Task finished coro=<hello() done, defined at <ipython-input-2-4a75ec31209a>:4> result=None>},
 set())

In [5]:
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' %host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0/\r\nHost:%s\r\n\r\n' %host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' %(host, line.decode('utf-8').rstrip()))
    writer.close()
    
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))


wget www.sina.com.cn...
wget www.163.com...
wget www.sohu.com...
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
www.163.com header > Date: Sat, 24 Jun 2017 16:14:17 GMT
www.163.com header > Content-Length: 0
www.163.com header > Location: http://www.163.com/special/0077jt/error_isp.html
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Server: nginx
www.sina.com.cn header > Date: Sat, 24 Jun 2017 16:14:07 GMT
www.sina.com.cn header > Content-Type: text/html
www.sina.com.cn header > Last-Modified: Sat, 24 Jun 2017 16:11:02 GMT
www.sina.com.cn header > Vary: Accept-Encoding
www.sina.com.cn header > Expires: Sat, 24 Jun 2017 16:15:07 GMT
www.sina.com.cn header > Cache-Control: max-age=60
www.sina.com.cn header > X-Powered-By: shci_v1.03
www.sina.com.cn header > Age: 10
www.sina.com.cn header > Content-Length: 598395
www.sina.com.cn header > X-Cache: HIT from cnc.xidan.sinacache.86.nb.sinaedge.com
www.sina.com.cn header > Connection: close
www.sohu.com header > HTTP/1.1 400 Bad Request
www.sohu.com header > Server: nginx
www.sohu.com header > Date: Sat, 24 Jun 2017 16:14:17 GMT
www.sohu.com header > Content-Type: text/html
www.sohu.com header > Content-Length: 166
www.sohu.com header > Connection: close
Out[5]:
({<Task finished coro=<wget() done, defined at <ipython-input-5-55f9e54626fd>:3> result=None>,
  <Task finished coro=<wget() done, defined at <ipython-input-5-55f9e54626fd>:3> result=None>,
  <Task finished coro=<wget() done, defined at <ipython-input-5-55f9e54626fd>:3> result=None>},
 set())

In [ ]: