Parse Json


In [1]:
def parseRaw(json_map):
    url = json_map['url']
    content = json_map['html']
    return (url,content)

載入原始 RAW Data


In [2]:
import json
import pprint
pp = pprint.PrettyPrinter(indent=2)
path = "./pixnet.txt"
all_content = sc.textFile(path).map(json.loads).map(parseRaw)

利用 LXML Parser 來分析文章結構

  • lxml.html urlparse 需在涵式內被import,以供RDD運算時使用
  • 其他import python package的方法 Submitting Applications
    • Use spark-submit --py-files to add .py, .zip or .egg files to be distributed with your application.
  • lxml.html.fromstring 的input為HTML string,回傳為可供 xpath 處理的物件
  • XPath syntax Ref_1, Ref_2
    • / Selects from the root node
    • // Selects all nodes in the document from the current node
    • @ Selects attributes
    • //@lang Selects all attributes that are named lang
    • //title[@lang] Selects all the title elements that have an attribute named lang
  • XPath usful Chrome plugin XPath Helper

In [3]:
def parseImgSrc(x):
    try:
        urls = list()
        import lxml.html
        from urlparse import urlparse
        node = lxml.html.fromstring(x)
        root =  node.getroottree()
        for src in root.xpath('//img/@src'):
            try :
                host = urlparse(src).netloc
                if '.' not in host : continue
                if host.count('.') == 1 : 
                    pass
                else: 
                    host = host[host.index('.')+1:]
                urls.append('imgsrc_'+host)
            except :
                print "Error Parse At:" , src
            
        for src in root.xpath('//input[@src]/@src'):
            try :
                host = urlparse(src).netloc
                if '.' not in host : continue
                if host.count('.') == 1 : 
                    pass
                else: 
                    host = host[host.index('.')+1:]
                urls.append('imgsrc_'+host)
            except :
                print "Error parseImgSrc At:" , src
        
    except :
        print "Unexpected error:", sys.exc_info()
    return  urls

In [4]:
all_content.map(lambda x: x[1]).first()[:100]


Out[4]:
u'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd'

取出 Image Src 的列表


In [5]:
image_list = all_content.map(lambda x :parseImgSrc(x[1]))
pp.pprint(image_list.first()[:10])


[ 'imgsrc_cloudfront.net',
  'imgsrc_pimg.tw',
  'imgsrc_pimg.tw',
  'imgsrc_pimg.tw',
  'imgsrc_pimg.tw',
  'imgsrc_pimg.tw',
  'imgsrc_pimg.tw',
  'imgsrc_pimg.tw',
  'imgsrc_pimg.tw',
  'imgsrc_pimg.tw']

統計 Image Src 的列表


In [6]:
img_src_count = all_content.map(
    lambda x :parseImgSrc(x[1])).flatMap(
    lambda x: x).countByValue()
for i in img_src_count:
    print i , ':' , img_src_count[i]


imgsrc_maxcdn.com : 83
imgsrc_sopili.net : 5
imgsrc_conn.tw : 5
imgsrc_visitkorea.or.kr : 1
imgsrc_cloudfront.net : 7
imgsrc_pimg.tw : 728
imgsrc_vbtrax.com : 5
imgsrc_ipeen.com.tw : 4
imgsrc_oeya.com : 30
imgsrc_pixfs.net : 267
imgsrc_histats.com : 5
imgsrc_facebook.com : 10

請使用 reduceByKey , sortBy 來計算出 img src 排行榜

請參照以下文件 [http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD]

幾種RDD sorting的方式

  • 針對key值排序
    • 使用 sortByKey
      • sc.parallelize(tmp).sortByKey(True, 1).collect()
    • 使用 sortBy
      • sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
    • 使用 takeOrdered
      • sc.parallelize(tmp).takeOrdered(10, lambda s: -1 * s[0])
  • 針對value排序
    • 使用 sortBy
      • sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
    • 使用 takeOrdered
      • sc.parallelize(tmp).takeOrdered(10, lambda s: -1 * s[1])

takeOrdered()使用方式

  • sort by keys (ascending): RDD.takeOrdered(num, key = lambda x: x[0])
  • sort by keys (descending): RDD.takeOrdered(num, key = lambda x: -x[0])
  • sort by values (ascending): RDD.takeOrdered(num, key = lambda x: x[1])
  • sort by values (descending): RDD.takeOrdered(num, key = lambda x: -x[1])

In [13]:
from operator import add
all_content.map(
    lambda x :parseImgSrc(x[1])).flatMap(lambda x: x).map(lambda x: (x,1)).reduceByKey(add).sortBy(
    lambda x: x[1], ascending=False).collect()


Out[13]:
[('imgsrc_pimg.tw', 728),
 ('imgsrc_pixfs.net', 267),
 ('imgsrc_maxcdn.com', 83),
 ('imgsrc_oeya.com', 30),
 ('imgsrc_facebook.com', 10),
 ('imgsrc_cloudfront.net', 7),
 ('imgsrc_sopili.net', 5),
 ('imgsrc_conn.tw', 5),
 ('imgsrc_vbtrax.com', 5),
 ('imgsrc_histats.com', 5),
 ('imgsrc_ipeen.com.tw', 4),
 ('imgsrc_visitkorea.or.kr', 1)]

正確的排行如下:

[說明] 由於是實際網頁資料,結果多少會有變動出入,大致上符合或無明顯異常即可。

[('imgsrc_pixfs.net', 219), ('imgsrc_agoda.net', 103), ('imgsrc_static.flickr.com', 53), ('imgsrc_staticflickr.com', 28), ('imgsrc_pimg.tw', 19), ('imgsrc_facebook.com', 12), ('imgsrc_sitebro.com', 10), ('imgsrc_linkwithin.com', 5), ('imgsrc_cloudfront.net', 5), ('imgsrc_prchecker.info', 5), ('imgsrc_visit-japan.jp', 5), ('imgsrc_yimg.com', 2), ('imgsrc_zenfs.com', 2), ('imgsrc_googleusercontent.com', 1)]


In [ ]: