Socket

打印设备名和 IPv4 地址


In [ ]:
import socket

def print_machine_info():
    host_name = socket.gethostname()
    print host_name
    #ip_address = socket.gethostbyname(host_name)
    print "Host name: %s" % host_name
    #print "IP address: %s" % ip_address
    print socket.gethostbyname_ex(host_name)

print_machine_info()

获取远程设备的 IP 地址


In [ ]:
import socket

def get_remote_machine_info():
    remote_host = 'www.qq.com'
    try:
        print "IP address: %s" % socket.gethostbyname(remote_host)
        print socket.gethostbyname_ex(remote_host)
    except socket.error, err_msg:
        print "%s: %s" % (remote_host, err_msg)

get_remote_machine_info()

通过指定的端口和协议找到服务名


In [ ]:
import socket

def find_service_name():
    protocolname = 'tcp'
    for port in [80, 25]:
        print "Port: %s => service name: %s" % (port, socket.getservbyport(port, protocolname))
    print "Port: %s => service name: %s" % (53, socket.getservbyport(53, 'udp'))

find_service_name()

IP地址转换


In [ ]:
# 将 IPv4 地址转换成不同的格式
# socket.inet_aton()    将一个字符串 IP 地址转换为一个 32 位的网络序列IP地址
# socket.inet_ntoa()    将一个十进制网络字节序转换为点分十进制 IP 格式的字符串

import socket
from binascii import hexlify

def convert_ipv4_address():
    for ip_addr in ['127.0.0.1', '192.168.0.1']:
        packed_ip_addr = socket.inet_aton(ip_addr)
        unpacked_ip_addr = socket.inet_ntoa(packed_ip_addr)
        print "IP address: %s => packed: %s, unpacked: %s" % (ip_addr, hexlify(packed_ip_addr), unpacked_ip_addr)

if __name__ == '__main__':
    convert_ipv4_address()

NTP


In [ ]:
# pip install ntplib
import ntplib
from time import ctime

def print_time():
    ntp_client = ntplib.NTPClient()
    response = ntp_client.request('time.nist.gov')
    print ctime(response.tx_time)

print_time()

Socket

把套接字改成阻塞或非阻塞模式


In [ ]:
# 把套接字改成阻塞或非阻塞模式

import socket

def test_socket_modes():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 1    阻塞模式
    # 0    非阻塞模式
    s.setblocking(1)
    
    s.settimeout(0.5)
    s.bind(("127.0.0.1", 0))

    socket_address = s.getsockname()
    print "Trivial Server launched on socket: %s" % str(socket_address)
    while(1):
        s.listen(1)

test_socket_modes()

Echo Client


In [ ]:
import socket
import sys
import argparse

host = 'localhost'

def echo_client(port):
    """A simple echo client"""
    # Create a TCP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # Connect the socket to the Server
    server_address = (host, port)
    print "Connecting to %s port %s" % server_address
    sock.connect(server_address)

    # Send data
    try:
        # Send data
        # Test message. This will be echoed1111.
        message = "bye"
        print "Sending: %s" % message
        sock.sendall(message)
        # Look for the response
        amount_received = 0
        amount_expected = len(message)
        while amount_received < amount_expected:
            data = sock.recv(2048)
            amount_received += len(data)
            print "Received: %s" % data
    except socket.error, e:
        print "Socket error: %s" % str(e)
    except Exception, e:
        print "Other exception: %s" % str(e)
    finally:
        print "Closing connection to the server"
        sock.close()
echo_client(9999)

文件传输


In [ ]:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 9999)
print "Connecting to %s port %s" % server_address
sock.connect(server_address)

with open('data/ft_test_client.txt', 'r') as fp:
    sock.sendall(fp.read())
    print 'sending file'

TCPServer


In [ ]:
from SocketServer import TCPServer, BaseRequestHandler 
class MyBaseRequestHandlerr(BaseRequestHandler):  
    """ 
    #从BaseRequestHandler继承,并重写handle方法 
    """  
    def handle(self):  
        while True:  
            try:  
                data = self.request.recv(1024).strip()  
                print "receive from (%r):%r" % (self.client_address, data)   
                self.request.sendall(data.upper())  
                if data == 'y':
                    print 'server exit'
                    break
            except:  
                break
host = "0.0.0.0"
port = 9997
addr = (host, port)  
server = TCPServer(addr, MyBaseRequestHandlerr) 
print 'server is started'
server.serve_forever()

ThreadingMixIn


In [ ]:
# 在套接字服务器程序中使用 ThreadingMixIn

import os
import socket
import threading
import SocketServer

SERVER_HOST = 'localhost'
SERVER_PORT = 0    # Tells the kernel to pick up a port dynamically
BUF_SIZE = 1024

def client(ip, port, message):
    """A client to test threading mixin server"""
    # Connect to the server
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))
    try:
        sock.sendall(message)
        response = sock.recv(BUF_SIZE)
        print "Client received: %s" % response
    finally:
        sock.close()

class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
    """A example of threaded TCP request handler"""
    def handle(self):
        data = self.request.recv(BUF_SIZE)
        current_thread = threading.current_thread()
        response = '%s: %s' % (current_thread.name, data)
        # print "Server sending response [current_thread name: data] = %s" % response
        self.request.send(response)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    """Nothing to add here, inherited everything necessary from parents"""
    pass

def main():
    # Launch the server
    server = ThreadedTCPServer((SERVER_HOST, SERVER_PORT), ThreadedTCPRequestHandler)
    ip, port = server.server_address    # Retrieve the port number
    print ip, port, 'server'
    # Start a thread with the server -- one thread per request
    server_thread = threading.Thread(target=server.serve_forever)
    # Exit the server thread when the main thread exits
    server_thread.daemon = True
    server_thread.start()
    print 'Server loop runing thread: %s' % server_thread.name

    # Run clients
    client(ip, port, "Hello from client 1")
    client(ip, port, "Hello from client 2")
    client(ip, port, "Hello from client 3")

    # Server cleanup
    server.shutdown()

if __name__ == '__main__':
    main()

ForkingMixIn


In [ ]:
# 在套接字服务器程序中使用 ForkingMixIn

import os
import socket
import threading
import SocketServer

SERVER_HOST = 'localhost'
SERVER_PORT = 0    # Pick up a port dynamically
BUF_SIZE = 1024
ECHO_MSG = 'Hello echo server!'

class ForkingClient():
    """TCP 客户端来检测ForkingProcess服务端"""
    def __init__(self, ip, port):
        # Create a socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Connect to the server
        self.sock.connect((ip, port))

    def run(self):
        """Client playing with the server"""
        # Send the data to server
        current_process_id = os.getpid()
        print 'PID %s sending echo message to the server: %s' % (current_process_id, ECHO_MSG)
        send_data_length = self.sock.send(ECHO_MSG)
        print "Sent: %d characters, so far..." % send_data_length

        # Display server response
        response = self.sock.recv(BUF_SIZE)
        print "PID %s received: %s" % (current_process_id, response[5:])

    def shutdown(self):
        """Cleanup the client socket"""
        self.sock.close()

class ForkingServerRequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        # Send the echo back to the client
        data = self.request.recv(BUF_SIZE)
        current_process_id = os.getpid()
        response = '%s: %s' % (current_process_id, data)
        print "Server sending response [current_process_id: data] = %s" % response
        self.request.send(response)
        return

class ForkingServer(SocketServer.ForkingMixIn, SocketServer.TCPServer):
    """Nothing to add here, inherited everything necessary from parents"""
    pass

def main():
    # 封装服务端
    server = ForkingServer((SERVER_HOST, SERVER_PORT), ForkingServerRequestHandler)
    ip, port = server.server_address    # Retrieve the port number
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.setDaemon(True)   # Don't hang on exit
    server_thread.start()
    print 'Server loop runing PID: %s' % os.getpid()

    # Launch the client(s)
    client1 = ForkingClient(ip, port)
    client1.run()

    client2 = ForkingClient(ip, port)
    client2.run()

    # Clean them up
    server.shutdown()
    client1.shutdown()
    client2.shutdown()
    server.socket.close()

if __name__ == '__main__':
    main()

UDP Client


In [ ]:
import socket  
  
address = ('127.0.0.1', 31500)  
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  
  
while True:  
    msg = raw_input()  
    if not msg:  
        break  
    s.sendto(msg, address)
    if msg == 'exit':
        print 'client exit'
        break
  
s.close()

HTTP

httplib


In [ ]:
import httplib
import urlparse
import re
import urllib

DEFAULT_URL = 'http://www.qq.com'
HTTP_GOOD_CODES = [httplib.OK, httplib.FOUND, httplib.MOVED_PERMANENTLY]

def get_server_status_code(url):
    """Download just the header of a URL and return the server's status code"""
    print urlparse.urlparse(url)
    host, path = urlparse.urlparse(url)[1:3]
    print host, path
    try:
        conn = httplib.HTTPConnection(host)
        conn.request('GET', path)
        return conn.getresponse().status
    except StandardError, err:
        print err
        return None
print get_server_status_code(DEFAULT_URL)

网页下载数据


In [ ]:
import httplib

REMOTE_SERVER_HOST = 'www.baidu.com'
REMOTE_SERVER_PATH = '/'

class HTTPClient:
    def __init__(self, host):
        self.host = host

    def fetch(self, path):
        http = httplib.HTTP(self.host)
        # Prepare header
        http.putrequest("GET", path)
        http.putheader("User-Agent", 'AGENT')
        http.putheader("Host", self.host)
        http.putheader("Accept", "*/*")
        http.endheaders()

        try:
            errcode, errmsg, headers = http.getreply()
        except Exception, e:
            print "Client failed error code: %s message: %s headers: %s" % (errcode, errmsg, headers)
        else:
            print "Got homepage from %s" % self.host

        file = http.getfile()
        return file.read()
client = HTTPClient(REMOTE_SERVER_HOST)
print client.fetch(REMOTE_SERVER_PATH)

Urllib2 Header


In [ ]:
import urllib2

BROWSER = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:49.0) Gecko/20100101 Firefox/49.0'
URL = 'http://www.163.com'

def simulate_firefox():
    opener = urllib2.build_opener()
    opener.addheaders = [('User-agent', BROWSER)]
    result = opener.open(URL)
    print "Response headers:"
    for header in result.headers.headers:
        print "\t", header

simulate_firefox()

通过代理服务器发送 Web 请求


In [ ]:
# Get from http://www.xicidaili.com/
import urllib

URL = 'https://www.github.com'
PROXY_ADDRESS = '1.164.144.107:8080'    

if __name__ == '__main__':
    resp = urllib.urlopen(URL, proxies = {"http": PROXY_ADDRESS})
    print "Proxy server returns response headers: %s" % resp.headers

In [ ]:
import cookielib
import urllib
import urllib2

NORMAL_URL = 'http://www.baidu.com'

def extract_cookie_info():
    """Fake login to a site with cookie"""
    # Setup cookie jar
    cj = cookielib.CookieJar()
    
    # Create url opener
    opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
    opener.open(NORMAL_URL)

    try:
        for cookie in cj:
            print cookie.name, cookie.value
    except IndexError:
        return False, "No csrftoken"
    
extract_cookie_info()

HTTP Server


In [ ]:
import BaseHTTPServer

class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    Page = '''\
        <html>
        <body>
        <p>Hello, World!</p>
        </body>
        </html>
    '''
    def do_GET(self):
        self.send_response(200)
        self.send_header("Content-Type", "text/html")
        self.send_header("Content-Length", str(len(self.Page)))
        self.end_headers()
        self.wfile.write(self.Page)
        
serverAddress = ('0.0.0.0', 8080)
server = BaseHTTPServer.HTTPServer(serverAddress, RequestHandler)
server.serve_forever()

Simple HTTP Server


In [ ]:
import sys
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer

DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8800

class RequestHandler(BaseHTTPRequestHandler):
    """Custom request handler"""
    def do_GET(self):
        """Handler for the GET requests"""
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        # Send the message to browser
        self.wfile.write("<h1>Simple HTTP Server</h1>")
        self.wfile.write('<img src="http://file06.16sucai.com/2016/0803/2fd023ed4f4d38facf52043d595a0af2.jpg" />')
        return

class CustomHTTPServer(HTTPServer):
    """A custom HTTP server"""
    def __init__(self, host, port):
        server_address = (host, port)
        HTTPServer.__init__(self, server_address, RequestHandler)

def run_server(port):
    try:
        server = CustomHTTPServer(DEFAULT_HOST, port)
        print "Custom HTTP server started on port: %s" % port
        server.serve_forever()
    except Exception, err:
        print "Error: %s" % err
    except KeyboardInterrupt:
        print "Server interrupted and is shutting down..."
        server.socket.close()
run_server(8800)

http server compress data


In [ ]:
import string
import os
import sys
import gzip
import cStringIO
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer

DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8800
HTML_CONTENT = """<html><body><h1>Compressed Hello World!</h1></body></html>"""

class RequestHandler(BaseHTTPRequestHandler):
    """Custom request handler"""
    def do_GET(self):
        """Handler for the GET requests"""
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.send_header('Content-Encoding', 'gzip')
        zbuf = self.compress_buffer(HTML_CONTENT)
        sys.stdout.write("Content-Encoding: gzip\r\n")
        self.send_header('Content-Length', len(zbuf))
        self.end_headers()

        # Send the message to browser
        zbuf = self.compress_buffer(HTML_CONTENT)
        sys.stdout.write("Content-Encoding: gzip\r\n")
        sys.stdout.write("Content-Length: %d\r\n" % (len(zbuf)))
        sys.stdout.write("\r\n")
        self.wfile.write(zbuf)
        return

    def compress_buffer(self, buf):
        zbuf = cStringIO.StringIO()
        zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=6)
        zfile.write(buf)
        zfile.close()
        return zbuf.getvalue()
    
server_address = (DEFAULT_HOST, DEFAULT_PORT)
server = HTTPServer(server_address, RequestHandler)
server.serve_forever()

EMAIL


In [ ]:
# 3416230579
import smtplib
from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr

def send_email(from_addr, to_addr, subject, password):
    msg = MIMEText("邮件正文",'html','utf-8')
    msg['From'] = u'<%s>' % from_addr
    msg['To'] = u'<%s>' % to_addr
    msg['Subject'] = subject

    smtp = smtplib.SMTP_SSL('smtp.qq.com', 465)
    smtp.set_debuglevel(1)
    smtp.ehlo("smtp.qq.com")
    smtp.login(from_addr, password)
    smtp.sendmail(from_addr, [to_addr], msg.as_string())

# 这里的密码是开启smtp服务时输入的客户端登录授权码,并不是邮箱密码
# 现在很多邮箱都需要先开启smtp才能这样发送邮件
send_email(u"8745010@qq.com",u"3416230579@qq.com",u"主题","xxxxxx")

In [ ]:
import smtplib

from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
from email.mime.multipart import MIMEMultipart

def send_email(from_addr, to_addr, subject, password):
    #msg = MIMEText("邮件正文",'html','utf-8')
    msg = MIMEMultipart()
    msg['From'] = u'<%s>' % from_addr
    msg['To'] = u'<%s>' % to_addr
    msg['Subject'] = subject
    att1 = MIMEText(open('test.txt', 'rb').read(), 'base64', 'gb2312')
    att1["Content-Type"] = 'application/octet-stream'
    att1["Content-Disposition"] = 'attachment; filename="test.txt"'

    msg.attach(att1)

    smtp = smtplib.SMTP_SSL('smtp.qq.com', 465)
    smtp.set_debuglevel(1)
    smtp.ehlo("smtp.qq.com")
    smtp.login(from_addr, password)
    smtp.sendmail(from_addr, [to_addr], msg.as_string())

# 这里的密码是开启smtp服务时输入的客户端登录授权码,并不是邮箱密码
# 现在很多邮箱都需要先开启smtp才能这样发送邮件
send_email(u"8745010@qq.com",u"3416230579@qq.com",u"主题","xxxxxx")

POP3


In [ ]:
import os, sys, string
import poplib

# pop3服务器地址
host = "pop3.163.com"
username = "xxxxxx@163.com"
password = "xxxxxxx"
pp = poplib.POP3(host)
pp.set_debuglevel(1)
pp.user(username)
pp.pass_(password)
# 获取服务器上信件信息,返回是一个列表,第一项是一共有多上封邮件,第二项是共有多少字节
ret = pp.stat()
print ret
# 需要取出所有信件的头部,信件id是从1开始的。
for i in range(1, ret[0]+1):
    # 取出信件头部。注意:top指定的行数是以信件头为基数的,也就是说当取0行,
    # 其实是返回头部信息,取1行其实是返回头部信息之外再多1行。
    mlist = pp.top(i, 0)
    print 'line: ', len(mlist[1])
# 列出服务器上邮件信息,这个会对每一封邮件都输出id和大小。不象stat输出的是总的统计信息
ret = pp.list()
print ret
# 取第一封邮件完整信息,在返回值里,是按行存储在down[1]的列表里的。down[0]是返回的状态信息
down = pp.retr(1)
print 'lines:', len(down)
# 输出邮件
for line in down[1]:
    print line
# 退出
pp.quit()

FTP

FTP Client


In [ ]:
from ftplib import FTP 
import os

def ftp_up(filename): 
    ftp=FTP() 
    ftp.set_debuglevel(2)#打开调试级别2,显示详细信息;0为关闭调试信息 
    ftp.connect('127.0.0.1','2121')#连接 
    ftp.login('user','12345')#登录,如果匿名登录则用空串代替即可 
    print ftp.getwelcome()#显示ftp服务器欢迎信息 
    ftp.cwd('ftp/') #选择操作目录 
    bufsize = 1024#设置缓冲块大小 
    file_handler = open(filename,'rb')#以读模式在本地打开文件 
    ftp.storbinary('STOR %s' % os.path.basename(filename),file_handler,bufsize)#上传文件
    ftp.set_debuglevel(0) 
    file_handler.close() 
    ftp.quit() 
    print "ftp up OK" 
 
def ftp_down(filename): 
    ftp=FTP() 
    ftp.set_debuglevel(2) 
    ftp.connect('127.0.0.1','2121') 
    ftp.login('user','12345') 
    print ftp.getwelcome()#显示ftp服务器欢迎信息 
    ftp.cwd('/') #选择操作目录 
    bufsize = 1024 
    filename = "ftp_down.txt" 
    file_handler = open(filename,'wb').write
    ftp.retrbinary('RETR %s' % os.path.basename(filename),file_handler,bufsize)#接收服务器上文件并写入本地文件 
    ftp.set_debuglevel(0) 
    ftp.quit() 
    print "ftp down OK"

In [ ]:
ftp_up(filename='test.txt')

In [ ]:
ftp_down(filename='test.txt')

XML RPC


In [ ]:
import xmlrpclib  
proxy = xmlrpclib.ServerProxy("http://localhost:8000/")  
candidate = proxy.is_even(3)  
print "the even is %d" % candidate

In [ ]:
import xmlrpclib  
proxy = xmlrpclib.ServerProxy("http://localhost:8002/")  
  
candidate = proxy.ret_a()  
print "the variable 'a' in server is "+ str((proxy.from_string(candidate)))

In [ ]:
import xmlrpclib
proxy = xmlrpclib.ServerProxy("http://localhost:9999/")
handle = open("xmlrpc_logo.jpg", "wb")
handle.write(proxy.logo().data)
handle.close()

In [ ]: