In [ ]:
import socket
def print_machine_info():
host_name = socket.gethostname()
print host_name
#ip_address = socket.gethostbyname(host_name)
print "Host name: %s" % host_name
#print "IP address: %s" % ip_address
print socket.gethostbyname_ex(host_name)
print_machine_info()
In [ ]:
import socket
def get_remote_machine_info():
remote_host = 'www.qq.com'
try:
print "IP address: %s" % socket.gethostbyname(remote_host)
print socket.gethostbyname_ex(remote_host)
except socket.error, err_msg:
print "%s: %s" % (remote_host, err_msg)
get_remote_machine_info()
In [ ]:
import socket
def find_service_name():
protocolname = 'tcp'
for port in [80, 25]:
print "Port: %s => service name: %s" % (port, socket.getservbyport(port, protocolname))
print "Port: %s => service name: %s" % (53, socket.getservbyport(53, 'udp'))
find_service_name()
In [ ]:
# 将 IPv4 地址转换成不同的格式
# socket.inet_aton() 将一个字符串 IP 地址转换为一个 32 位的网络序列IP地址
# socket.inet_ntoa() 将一个十进制网络字节序转换为点分十进制 IP 格式的字符串
import socket
from binascii import hexlify
def convert_ipv4_address():
for ip_addr in ['127.0.0.1', '192.168.0.1']:
packed_ip_addr = socket.inet_aton(ip_addr)
unpacked_ip_addr = socket.inet_ntoa(packed_ip_addr)
print "IP address: %s => packed: %s, unpacked: %s" % (ip_addr, hexlify(packed_ip_addr), unpacked_ip_addr)
if __name__ == '__main__':
convert_ipv4_address()
In [ ]:
# pip install ntplib
import ntplib
from time import ctime
def print_time():
ntp_client = ntplib.NTPClient()
response = ntp_client.request('time.nist.gov')
print ctime(response.tx_time)
print_time()
In [ ]:
# 把套接字改成阻塞或非阻塞模式
import socket
def test_socket_modes():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 1 阻塞模式
# 0 非阻塞模式
s.setblocking(1)
s.settimeout(0.5)
s.bind(("127.0.0.1", 0))
socket_address = s.getsockname()
print "Trivial Server launched on socket: %s" % str(socket_address)
while(1):
s.listen(1)
test_socket_modes()
In [ ]:
import socket
import sys
import argparse
host = 'localhost'
def echo_client(port):
"""A simple echo client"""
# Create a TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the Server
server_address = (host, port)
print "Connecting to %s port %s" % server_address
sock.connect(server_address)
# Send data
try:
# Send data
# Test message. This will be echoed1111.
message = "bye"
print "Sending: %s" % message
sock.sendall(message)
# Look for the response
amount_received = 0
amount_expected = len(message)
while amount_received < amount_expected:
data = sock.recv(2048)
amount_received += len(data)
print "Received: %s" % data
except socket.error, e:
print "Socket error: %s" % str(e)
except Exception, e:
print "Other exception: %s" % str(e)
finally:
print "Closing connection to the server"
sock.close()
echo_client(9999)
In [ ]:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 9999)
print "Connecting to %s port %s" % server_address
sock.connect(server_address)
with open('data/ft_test_client.txt', 'r') as fp:
sock.sendall(fp.read())
print 'sending file'
In [ ]:
from SocketServer import TCPServer, BaseRequestHandler
class MyBaseRequestHandlerr(BaseRequestHandler):
"""
#从BaseRequestHandler继承,并重写handle方法
"""
def handle(self):
while True:
try:
data = self.request.recv(1024).strip()
print "receive from (%r):%r" % (self.client_address, data)
self.request.sendall(data.upper())
if data == 'y':
print 'server exit'
break
except:
break
host = "0.0.0.0"
port = 9997
addr = (host, port)
server = TCPServer(addr, MyBaseRequestHandlerr)
print 'server is started'
server.serve_forever()
In [ ]:
# 在套接字服务器程序中使用 ThreadingMixIn
import os
import socket
import threading
import SocketServer
SERVER_HOST = 'localhost'
SERVER_PORT = 0 # Tells the kernel to pick up a port dynamically
BUF_SIZE = 1024
def client(ip, port, message):
"""A client to test threading mixin server"""
# Connect to the server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
try:
sock.sendall(message)
response = sock.recv(BUF_SIZE)
print "Client received: %s" % response
finally:
sock.close()
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
"""A example of threaded TCP request handler"""
def handle(self):
data = self.request.recv(BUF_SIZE)
current_thread = threading.current_thread()
response = '%s: %s' % (current_thread.name, data)
# print "Server sending response [current_thread name: data] = %s" % response
self.request.send(response)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
"""Nothing to add here, inherited everything necessary from parents"""
pass
def main():
# Launch the server
server = ThreadedTCPServer((SERVER_HOST, SERVER_PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address # Retrieve the port number
print ip, port, 'server'
# Start a thread with the server -- one thread per request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread exits
server_thread.daemon = True
server_thread.start()
print 'Server loop runing thread: %s' % server_thread.name
# Run clients
client(ip, port, "Hello from client 1")
client(ip, port, "Hello from client 2")
client(ip, port, "Hello from client 3")
# Server cleanup
server.shutdown()
if __name__ == '__main__':
main()
In [ ]:
# 在套接字服务器程序中使用 ForkingMixIn
import os
import socket
import threading
import SocketServer
SERVER_HOST = 'localhost'
SERVER_PORT = 0 # Pick up a port dynamically
BUF_SIZE = 1024
ECHO_MSG = 'Hello echo server!'
class ForkingClient():
"""TCP 客户端来检测ForkingProcess服务端"""
def __init__(self, ip, port):
# Create a socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect to the server
self.sock.connect((ip, port))
def run(self):
"""Client playing with the server"""
# Send the data to server
current_process_id = os.getpid()
print 'PID %s sending echo message to the server: %s' % (current_process_id, ECHO_MSG)
send_data_length = self.sock.send(ECHO_MSG)
print "Sent: %d characters, so far..." % send_data_length
# Display server response
response = self.sock.recv(BUF_SIZE)
print "PID %s received: %s" % (current_process_id, response[5:])
def shutdown(self):
"""Cleanup the client socket"""
self.sock.close()
class ForkingServerRequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
# Send the echo back to the client
data = self.request.recv(BUF_SIZE)
current_process_id = os.getpid()
response = '%s: %s' % (current_process_id, data)
print "Server sending response [current_process_id: data] = %s" % response
self.request.send(response)
return
class ForkingServer(SocketServer.ForkingMixIn, SocketServer.TCPServer):
"""Nothing to add here, inherited everything necessary from parents"""
pass
def main():
# 封装服务端
server = ForkingServer((SERVER_HOST, SERVER_PORT), ForkingServerRequestHandler)
ip, port = server.server_address # Retrieve the port number
server_thread = threading.Thread(target=server.serve_forever)
server_thread.setDaemon(True) # Don't hang on exit
server_thread.start()
print 'Server loop runing PID: %s' % os.getpid()
# Launch the client(s)
client1 = ForkingClient(ip, port)
client1.run()
client2 = ForkingClient(ip, port)
client2.run()
# Clean them up
server.shutdown()
client1.shutdown()
client2.shutdown()
server.socket.close()
if __name__ == '__main__':
main()
In [ ]:
import socket
address = ('127.0.0.1', 31500)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
msg = raw_input()
if not msg:
break
s.sendto(msg, address)
if msg == 'exit':
print 'client exit'
break
s.close()
In [ ]:
import httplib
import urlparse
import re
import urllib
DEFAULT_URL = 'http://www.qq.com'
HTTP_GOOD_CODES = [httplib.OK, httplib.FOUND, httplib.MOVED_PERMANENTLY]
def get_server_status_code(url):
"""Download just the header of a URL and return the server's status code"""
print urlparse.urlparse(url)
host, path = urlparse.urlparse(url)[1:3]
print host, path
try:
conn = httplib.HTTPConnection(host)
conn.request('GET', path)
return conn.getresponse().status
except StandardError, err:
print err
return None
print get_server_status_code(DEFAULT_URL)
In [ ]:
import httplib
REMOTE_SERVER_HOST = 'www.baidu.com'
REMOTE_SERVER_PATH = '/'
class HTTPClient:
def __init__(self, host):
self.host = host
def fetch(self, path):
http = httplib.HTTP(self.host)
# Prepare header
http.putrequest("GET", path)
http.putheader("User-Agent", 'AGENT')
http.putheader("Host", self.host)
http.putheader("Accept", "*/*")
http.endheaders()
try:
errcode, errmsg, headers = http.getreply()
except Exception, e:
print "Client failed error code: %s message: %s headers: %s" % (errcode, errmsg, headers)
else:
print "Got homepage from %s" % self.host
file = http.getfile()
return file.read()
client = HTTPClient(REMOTE_SERVER_HOST)
print client.fetch(REMOTE_SERVER_PATH)
In [ ]:
import urllib2
BROWSER = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:49.0) Gecko/20100101 Firefox/49.0'
URL = 'http://www.163.com'
def simulate_firefox():
opener = urllib2.build_opener()
opener.addheaders = [('User-agent', BROWSER)]
result = opener.open(URL)
print "Response headers:"
for header in result.headers.headers:
print "\t", header
simulate_firefox()
In [ ]:
# Get from http://www.xicidaili.com/
import urllib
URL = 'https://www.github.com'
PROXY_ADDRESS = '1.164.144.107:8080'
if __name__ == '__main__':
resp = urllib.urlopen(URL, proxies = {"http": PROXY_ADDRESS})
print "Proxy server returns response headers: %s" % resp.headers
In [ ]:
import cookielib
import urllib
import urllib2
NORMAL_URL = 'http://www.baidu.com'
def extract_cookie_info():
"""Fake login to a site with cookie"""
# Setup cookie jar
cj = cookielib.CookieJar()
# Create url opener
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
opener.open(NORMAL_URL)
try:
for cookie in cj:
print cookie.name, cookie.value
except IndexError:
return False, "No csrftoken"
extract_cookie_info()
In [ ]:
import BaseHTTPServer
class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
Page = '''\
<html>
<body>
<p>Hello, World!</p>
</body>
</html>
'''
def do_GET(self):
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", str(len(self.Page)))
self.end_headers()
self.wfile.write(self.Page)
serverAddress = ('0.0.0.0', 8080)
server = BaseHTTPServer.HTTPServer(serverAddress, RequestHandler)
server.serve_forever()
In [ ]:
import sys
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8800
class RequestHandler(BaseHTTPRequestHandler):
"""Custom request handler"""
def do_GET(self):
"""Handler for the GET requests"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
# Send the message to browser
self.wfile.write("<h1>Simple HTTP Server</h1>")
self.wfile.write('<img src="http://file06.16sucai.com/2016/0803/2fd023ed4f4d38facf52043d595a0af2.jpg" />')
return
class CustomHTTPServer(HTTPServer):
"""A custom HTTP server"""
def __init__(self, host, port):
server_address = (host, port)
HTTPServer.__init__(self, server_address, RequestHandler)
def run_server(port):
try:
server = CustomHTTPServer(DEFAULT_HOST, port)
print "Custom HTTP server started on port: %s" % port
server.serve_forever()
except Exception, err:
print "Error: %s" % err
except KeyboardInterrupt:
print "Server interrupted and is shutting down..."
server.socket.close()
run_server(8800)
In [ ]:
import string
import os
import sys
import gzip
import cStringIO
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8800
HTML_CONTENT = """<html><body><h1>Compressed Hello World!</h1></body></html>"""
class RequestHandler(BaseHTTPRequestHandler):
"""Custom request handler"""
def do_GET(self):
"""Handler for the GET requests"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Content-Encoding', 'gzip')
zbuf = self.compress_buffer(HTML_CONTENT)
sys.stdout.write("Content-Encoding: gzip\r\n")
self.send_header('Content-Length', len(zbuf))
self.end_headers()
# Send the message to browser
zbuf = self.compress_buffer(HTML_CONTENT)
sys.stdout.write("Content-Encoding: gzip\r\n")
sys.stdout.write("Content-Length: %d\r\n" % (len(zbuf)))
sys.stdout.write("\r\n")
self.wfile.write(zbuf)
return
def compress_buffer(self, buf):
zbuf = cStringIO.StringIO()
zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=6)
zfile.write(buf)
zfile.close()
return zbuf.getvalue()
server_address = (DEFAULT_HOST, DEFAULT_PORT)
server = HTTPServer(server_address, RequestHandler)
server.serve_forever()
In [ ]:
# 3416230579
import smtplib
from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
def send_email(from_addr, to_addr, subject, password):
msg = MIMEText("邮件正文",'html','utf-8')
msg['From'] = u'<%s>' % from_addr
msg['To'] = u'<%s>' % to_addr
msg['Subject'] = subject
smtp = smtplib.SMTP_SSL('smtp.qq.com', 465)
smtp.set_debuglevel(1)
smtp.ehlo("smtp.qq.com")
smtp.login(from_addr, password)
smtp.sendmail(from_addr, [to_addr], msg.as_string())
# 这里的密码是开启smtp服务时输入的客户端登录授权码,并不是邮箱密码
# 现在很多邮箱都需要先开启smtp才能这样发送邮件
send_email(u"8745010@qq.com",u"3416230579@qq.com",u"主题","xxxxxx")
In [ ]:
import smtplib
from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
from email.mime.multipart import MIMEMultipart
def send_email(from_addr, to_addr, subject, password):
#msg = MIMEText("邮件正文",'html','utf-8')
msg = MIMEMultipart()
msg['From'] = u'<%s>' % from_addr
msg['To'] = u'<%s>' % to_addr
msg['Subject'] = subject
att1 = MIMEText(open('test.txt', 'rb').read(), 'base64', 'gb2312')
att1["Content-Type"] = 'application/octet-stream'
att1["Content-Disposition"] = 'attachment; filename="test.txt"'
msg.attach(att1)
smtp = smtplib.SMTP_SSL('smtp.qq.com', 465)
smtp.set_debuglevel(1)
smtp.ehlo("smtp.qq.com")
smtp.login(from_addr, password)
smtp.sendmail(from_addr, [to_addr], msg.as_string())
# 这里的密码是开启smtp服务时输入的客户端登录授权码,并不是邮箱密码
# 现在很多邮箱都需要先开启smtp才能这样发送邮件
send_email(u"8745010@qq.com",u"3416230579@qq.com",u"主题","xxxxxx")
In [ ]:
import os, sys, string
import poplib
# pop3服务器地址
host = "pop3.163.com"
username = "xxxxxx@163.com"
password = "xxxxxxx"
pp = poplib.POP3(host)
pp.set_debuglevel(1)
pp.user(username)
pp.pass_(password)
# 获取服务器上信件信息,返回是一个列表,第一项是一共有多上封邮件,第二项是共有多少字节
ret = pp.stat()
print ret
# 需要取出所有信件的头部,信件id是从1开始的。
for i in range(1, ret[0]+1):
# 取出信件头部。注意:top指定的行数是以信件头为基数的,也就是说当取0行,
# 其实是返回头部信息,取1行其实是返回头部信息之外再多1行。
mlist = pp.top(i, 0)
print 'line: ', len(mlist[1])
# 列出服务器上邮件信息,这个会对每一封邮件都输出id和大小。不象stat输出的是总的统计信息
ret = pp.list()
print ret
# 取第一封邮件完整信息,在返回值里,是按行存储在down[1]的列表里的。down[0]是返回的状态信息
down = pp.retr(1)
print 'lines:', len(down)
# 输出邮件
for line in down[1]:
print line
# 退出
pp.quit()
In [ ]:
from ftplib import FTP
import os
def ftp_up(filename):
ftp=FTP()
ftp.set_debuglevel(2)#打开调试级别2,显示详细信息;0为关闭调试信息
ftp.connect('127.0.0.1','2121')#连接
ftp.login('user','12345')#登录,如果匿名登录则用空串代替即可
print ftp.getwelcome()#显示ftp服务器欢迎信息
ftp.cwd('ftp/') #选择操作目录
bufsize = 1024#设置缓冲块大小
file_handler = open(filename,'rb')#以读模式在本地打开文件
ftp.storbinary('STOR %s' % os.path.basename(filename),file_handler,bufsize)#上传文件
ftp.set_debuglevel(0)
file_handler.close()
ftp.quit()
print "ftp up OK"
def ftp_down(filename):
ftp=FTP()
ftp.set_debuglevel(2)
ftp.connect('127.0.0.1','2121')
ftp.login('user','12345')
print ftp.getwelcome()#显示ftp服务器欢迎信息
ftp.cwd('/') #选择操作目录
bufsize = 1024
filename = "ftp_down.txt"
file_handler = open(filename,'wb').write
ftp.retrbinary('RETR %s' % os.path.basename(filename),file_handler,bufsize)#接收服务器上文件并写入本地文件
ftp.set_debuglevel(0)
ftp.quit()
print "ftp down OK"
In [ ]:
ftp_up(filename='test.txt')
In [ ]:
ftp_down(filename='test.txt')
In [ ]:
import xmlrpclib
proxy = xmlrpclib.ServerProxy("http://localhost:8000/")
candidate = proxy.is_even(3)
print "the even is %d" % candidate
In [ ]:
import xmlrpclib
proxy = xmlrpclib.ServerProxy("http://localhost:8002/")
candidate = proxy.ret_a()
print "the variable 'a' in server is "+ str((proxy.from_string(candidate)))
In [ ]:
import xmlrpclib
proxy = xmlrpclib.ServerProxy("http://localhost:9999/")
handle = open("xmlrpc_logo.jpg", "wb")
handle.write(proxy.logo().data)
handle.close()
In [ ]: