In [3]:
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

print sys.path

execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))


['/mnt/opt/spark/python/lib/py4j-0.8.2.1-src.zip', '/mnt/opt/spark/python', '', '/usr/local/lib/python2.7/dist-packages/setuptools-18.2-py2.7.egg', '/usr/lib/python2.7', '/usr/lib/python2.7/plat-x86_64-linux-gnu', '/usr/lib/python2.7/lib-tk', '/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', '/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages/PILcompat', '/usr/lib/python2.7/dist-packages/gtk-2.0', '/usr/lib/pymodules/python2.7', '/usr/local/lib/python2.7/dist-packages/IPython/extensions']
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
SparkContext available as sc, SQLContext available as sqlContext.

Parse Json


In [13]:
def parseRaw(json_map):
    url = json_map['url']
    content = json_map['html']
    return (url,content)

載入原始 RAW Data


In [38]:
import json
import pprint
pp = pprint.PrettyPrinter(indent=2)
path = "./pixnet.txt"
all_content = sc.textFile(path).map(json.loads).map(parseRaw)

利用 LXML Parser 來分析文章結構


In [39]:
def parseImgSrc(x):
    try:
        urls = list()
        import lxml.html
        from urlparse import urlparse
        root = lxml.html.fromstring(x)
        t =  root.getroottree()
        for src in root.xpath('//img/@src'):
            try :
                host = urlparse(src).netloc
                if '.' not in host : continue
                if host.count('.') == 1 : 
                    pass
                else: 
                    host = host[host.index('.')+1:]
                urls.append('imgsrc_'+host)
            except :
                print "Error Parse At:" , src
            
        for src in root.xpath('//input[@src]/@src'):
            try :
                host = urlparse(src).netloc
                if '.' not in host : continue
                if host.count('.') == 1 : 
                    pass
                else: 
                    host = host[host.index('.')+1:]
                urls.append('imgsrc_'+host)
            except :
                print "Error parseImgSrc At:" , src
        
    except :
        pass
    return  urls

取出 Image Src 的列表


In [41]:
image_list = all_content.map(lambda x :parseImgSrc(x[1]))
pp.pprint(image_list.first()[:10])


[ 'imgsrc_cloudfront.net',
  'imgsrc_static.flickr.com',
  'imgsrc_agoda.net',
  'imgsrc_pixfs.net',
  'imgsrc_pixfs.net',
  'imgsrc_pixfs.net',
  'imgsrc_pixfs.net',
  'imgsrc_pimg.tw',
  'imgsrc_pixfs.net',
  'imgsrc_pixfs.net']

統計 Image Src 的列表


In [47]:
img_src_count = all_content.map(
    lambda x :parseImgSrc(x[1])).flatMap(
    lambda x: x).countByValue()
for i in img_src_count:
    print i , ':' , img_src_count[i]


imgsrc_linkwithin.com : 5
imgsrc_sitebro.com : 10
imgsrc_cloudfront.net : 5
imgsrc_prchecker.info : 5
imgsrc_pimg.tw : 19
imgsrc_agoda.net : 103
imgsrc_googleusercontent.com : 1
imgsrc_visit-japan.jp : 5
imgsrc_yimg.com : 2
imgsrc_pixfs.net : 219
imgsrc_zenfs.com : 2
imgsrc_static.flickr.com : 53
imgsrc_staticflickr.com : 28
imgsrc_facebook.com : 12

請使用 reduceByKey , sortBy 來計算出 img src 排行榜

請參照以下文件 [http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD]


In [64]:
from operator import add
all_content.map(
    lambda x :parseImgSrc(x[1])).flatMap(
    lambda x: x).map(
    lambda x: (x,1)).reduceByKey(add).sortBy(lambda x:x[1] ,ascending =False).collect()


Out[64]:
[('imgsrc_pixfs.net', 219),
 ('imgsrc_agoda.net', 103),
 ('imgsrc_static.flickr.com', 53),
 ('imgsrc_staticflickr.com', 28),
 ('imgsrc_pimg.tw', 19),
 ('imgsrc_facebook.com', 12),
 ('imgsrc_sitebro.com', 10),
 ('imgsrc_linkwithin.com', 5),
 ('imgsrc_cloudfront.net', 5),
 ('imgsrc_prchecker.info', 5),
 ('imgsrc_visit-japan.jp', 5),
 ('imgsrc_yimg.com', 2),
 ('imgsrc_zenfs.com', 2),
 ('imgsrc_googleusercontent.com', 1)]

正確的排行如下:

[('imgsrc_pixfs.net', 219), ('imgsrc_agoda.net', 103), ('imgsrc_static.flickr.com', 53), ('imgsrc_staticflickr.com', 28), ('imgsrc_pimg.tw', 19), ('imgsrc_facebook.com', 12), ('imgsrc_sitebro.com', 10), ('imgsrc_linkwithin.com', 5), ('imgsrc_cloudfront.net', 5), ('imgsrc_prchecker.info', 5), ('imgsrc_visit-japan.jp', 5), ('imgsrc_yimg.com', 2), ('imgsrc_zenfs.com', 2), ('imgsrc_googleusercontent.com', 1)]