In [1]:
#! /usr/bin/env python
# -*- coding=utf-8 -*-
# 刷 CSDN 博客访问量
import urllib
import urllib2
import re,random
from multiprocessing.dummy import Pool as ThreadPool
time_out = 10 # 全局变量 10 秒超时时间
count = 0
proxies = [None]
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept':'text/html;q=0.9,*/*;q=0.8',
'Accept-Charset':'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
#'Accept-Encoding':'gzip',
'Connection':'close',
'Referer':None #注意如果依然不能抓取的话,这里可以设置抓取网站的host
}
def get_proxy():
# 使用全局变量,修改之
global proxies
try:
url = 'http://www.xicidaili.com/'
#url = 'http://www.google.com/'
req_header = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept':'text/html;q=0.9,*/*;q=0.8',
'Accept-Charset':'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
#'Accept-Encoding':'gzip',
'Connection':'close',
'Referer':None #注意如果依然不能抓取的话,这里可以设置抓取网站的host
}
req_timeout = 10
req = urllib2.Request(url,None,req_header)
resp = urllib2.urlopen(req,None,req_timeout)
html = resp.read()
#req.add_header("User-Agent","Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/537.36
# (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36")
response = urllib2.urlopen(req)
the_page = response.read()
#req = urllib2.urlopen('http://www.xicidaili.com/',None,headers)
#req = urllib2.Request("http://www.xicidaili.com/", headers=headers)
#req = urllib2.urlopen('http://www.xicidaili.com/')
#print("===== start to get proxy list",req.read())
except urllib2.URLError,e:
print('无法获取代理信息!')
print e.reason
print e.reason[0]
print e.reason[1]
return
response =urllib2.urlopen(req)
print ("===== content-type : ",response.headers['content-type'] )
print ("===== headers.getparam('charset') : ",response.headers.getparam('charset') )
result = response.read().decode('utf-8')
#html = response.read().decode('utf-8')
'''
<tr class="odd">
<td class="country"><img src="http://fs.xicidaili.com/images/flag/cn.png" alt="Cn"></td>
<td>218.18.61.173</td>
<td>8118</td>
<td>
<a href="/2017-10-01/guangdong">广东深圳市福田区</a>
</td>
<td class="country">高匿</td>
<td>HTTP</td>
<td class="country">
<div title="0.16秒" class="bar">
<div class="bar_inner fast" style="width:97%">
</div>
</div>
</td>
<td class="country">
<div title="0.032秒" class="bar">
<div class="bar_inner fast" style="width:96%">
</div>
</div>
</td>
<td>3小时</td>
<td>17-10-01 04:09</td>
</tr>
'''
#print("===== get proxy page ,type : ",result,type(result) )
p = re.compile(r'''<tr\sclass[^>]*>\s+
<td>.+</td>\s+
<td>(.*)?</td>\s+
<td>(.*)?</td>\s+
<td>(.*)?</td>\s+
<td>(.*)?</td>\s+
<td>(.*)?</td>\s+
<td>(.*)?</td>\s+
</tr>''',re.VERBOSE)
proxy_list = p.findall(result)
#proxy_list = re.findall(r'<tr class="odd"><</tr>',result, re.M|re.I|re.S)
print("===== get proxy page proxy_list:,type ",proxy_list,type(proxy_list))
proxies.append('221.211.193.51:80')
proxies.append('120.7.84.59:8118')
proxies.append('203.91.121.76:3128')
proxies.append('180.168.179.193:8080')
for each_proxy in proxy_list[1:]:
if each_proxy[4] == 'HTTP':
proxies.append(each_proxy[0]+':'+each_proxy[1])
def change_proxy():
# 随机从序列中取出一个元素
proxy = random.choice(proxies)
# 判断元素是否合理
if proxy == None:
proxy_support =urllib2.ProxyHandler({})
else:
proxy_support = urllib2.ProxyHandler({'http':proxy})
opener = urllib2.build_opener(proxy_support)
opener.addheaders = [('User-Agent',headers['User-Agent'])]
urllib2.install_opener(opener)
print('智能切换代理:%s' % ('本机' if proxy==None else proxy))
def get_req(url):
# 先伪造一下头部吧,使用字典
blog_eader = {
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36',
'Host':'blog.csdn.net',
'Referer':'http://blog.csdn.net/',
'GET':url
}
#req = urllib2.urlopen(url,headers = blog_eader)
req = urllib2.Request(url, headers=headers)
return req
# 访问 博客
def look_blog(url):
# 切换一下IP
change_proxy()
req = get_req(url)
try:
urllib2.urlopen(req,timeout = time_out)
except:
return
else:
print('访问成功!')
# 迭代访问
def click_blog(url):
for i in range(0,count):
if(i == count):
break
print('当前访问 Blog %s 第 %d 次' % (url,i))
look_blog(url)
# 获取博客的文章链表
def get_blog_list(url):
req = get_req(url)
try:
print("=== start to get blog list from url: ",req)
response = urllib2.urlopen(req)
except:
print('无法挽回的错误')
return None
'''
# 由于 Csdn 是 utf-8 所以不需要转码
html = response.read()
# 存储一个正则表达式 规则
regx = '<span class="link_title"><a href="(.+?)">'
pat = re.compile(regx)
# 其实这里 写作 list1 = re.findall('<span class="link_title"><a href="(.+?)">',str(html)) 也是一样的结果
blog_list = re.findall(pat,str(html))
return blog_list
'''
return "https://youtu.be/RysyB_Zcjlo"
if __name__ == '__main__':
global count
# 基本参数初始化
# 获取代理
get_proxy()
print('有效代理个数为 : %d' % len(proxies))
#blogurl = input('输入blog链接:')
# 这个地方原本是我的默认输入偷懒用的
blogurl = 'https://youtu.be/RysyB_Zcjlo'
if len(blogurl) == 0:
#blogurl = 'http://blog.csdn.net/bkxiaoc/'
blogurl = 'https://youtu.be/RysyB_Zcjlo'
print('URL is :' ,blogurl)
try:
count = int(input('输入次数:'))
except ValueError:
print('参数错误')
quit()
if count == 0 or count > 999:
print('次数过大或过小')
quit()
print('次数确认为 %d' % count)
# 获取 博文 列表,由于测试时我的博文只有一页所以 只能获得一页的列表
blog_list = get_blog_list("https://youtu.be/RysyB_Zcjlo")
if len(blog_list) == 0:
print('未找到Blog列表')
quit()
print('启动!!!!!!!!!!!!!!!!!!!!')
# 迭代一下 使用多线程
index = 0
#maxConnection = 10
for each_link in range(10):
# 补全头部
each_link = 'https://youtu.be/RysyB_Zcjlo?' + str(each_link)
print ("===== each_link",each_link)
#blog_list[index] = each_link
index += 1
# 有多少个帖子就开多少个线程的一半 let's go
pool = ThreadPool(int(len(blog_list) / 2))
results = pool.map(click_blog, blog_list)
pool.close()
pool.join()
print('完成任务!!!!!!!!!!!!!!!!!!!!')
In [ ]: