In [1]:
from urllib.request import urlopen
from io import BytesIO
from PIL import Image
from imagehash import phash
from pyspark.sql import functions as F
from pyspark.sql import types as T
import numpy as np

In [2]:
df = sqlContext.read.parquet("/guoda/data/idigbio-media-20170709T013207-100k.parquet")

In [3]:
df[["accessuri"]].show()


+--------------------+
|           accessuri|
+--------------------+
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
|http://mediaphoto...|
+--------------------+
only showing top 20 rows


In [4]:
def accessimage(v):
    try:
        return hash(phash(Image.open(BytesIO(urlopen("http://api.idigbio.org/v2/media?size=thumbnail&filereference=" + v).read()))))
    except:
        return np.nan
ai_udf = F.udf(accessimage, T.IntegerType())

In [5]:
image_df = df.select("accessuri", ai_udf("accessuri"))

In [ ]:
image_df.groupby("accessimage(accessuri)").count().sort(F.col("count").desc()).show()

In [ ]: