In [2]:
import pyspark.sql.functions as sql
import pyspark.sql.types as types
import nltk
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords

In [3]:
# run nltk.download("stopwords")
tokenizer = RegexpTokenizer(r'\w+')
stop_words = set(stopwords.words("english"))

def tokenize(document):
    tokens = tokenizer.tokenize(document.lower())
    return [w for w in tokens if not w in stop_words]

tokenize_udf = sql.udf(tokenize, types.ArrayType(
                                                types.StringType()
                                                )
                      )

tokenize("The quickish brown fox'es jumped over the lazy-dog.")


Out[3]:
['quickish', 'brown', 'fox', 'es', 'jumped', 'lazy', 'dog']

In [4]:
bhl_df_version = "20160516"
bhl_df = sqlContext.read.load("/guoda/data/bhl-{0}.parquet"
                              .format(bhl_df_version))

In [6]:
bhl_df.printSchema()


root
 |-- itemid: integer (nullable = true)
 |-- titleid: integer (nullable = true)
 |-- thumbnailpageid: integer (nullable = true)
 |-- barcode: string (nullable = true)
 |-- marcitemid: string (nullable = true)
 |-- callnumber: string (nullable = true)
 |-- volumeinfo: string (nullable = true)
 |-- itemurl: string (nullable = true)
 |-- localid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- institutionname: string (nullable = true)
 |-- zquery: string (nullable = true)
 |-- creationdate: date (nullable = true)
 |-- ocrtext: string (nullable = true)


In [5]:
count = bhl_df.count()

In [11]:
# crashes
#sample_bhl_df = bhl_df.sample(withReplacement=False, fraction=100.0/count)
#sample_bhl_df.count()
#sample_bhl_df.write.mode("overwrite").parquet("/guoda/data/bhl-1k-{}.parquet".format(bhl_df_version))

In [5]:
(bhl_df
 .filter(sql.substring(bhl_df["barcode"], 6, 2) == '11')
 .write
 .mode("overwrite")
 .parquet("/guoda/data/bhl-tiny-{}.parquet".format(bhl_df_version))
 )


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-5-19a8a984d1f7> in <module>()
      3  .write
      4  .mode("overwrite")
----> 5  .parquet("/guoda/data/bhl-tiny-{}.parquet".format(bhl_df_version))
      6  )

/opt/spark/latest/python/pyspark/sql/readwriter.py in parquet(self, path, mode, partitionBy, compression)
    639             self.partitionBy(partitionBy)
    640         self._set_opts(compression=compression)
--> 641         self._jwrite.parquet(path)
    642 
    643     @since(1.6)

/opt/spark/latest/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1129             proto.END_COMMAND_PART
   1130 
-> 1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
   1133             answer, self.gateway_client, self.target_id, self.name)

/opt/spark/latest/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
    881         connection = self._get_connection()
    882         try:
--> 883             response = connection.send_command(command)
    884             if binary:
    885                 return response, self._create_connection_guard(connection)

/opt/spark/latest/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in send_command(self, command)
   1026 
   1027         try:
-> 1028             answer = smart_decode(self.stream.readline()[:-1])
   1029             logger.debug("Answer received: {0}".format(answer))
   1030             if answer.startswith(proto.RETURN_MESSAGE):

/usr/lib/python3.5/socket.py in readinto(self, b)
    573         while True:
    574             try:
--> 575                 return self._sock.recv_into(b)
    576             except timeout:
    577                 self._timeout_occurred = True

KeyboardInterrupt: 

In [ ]:


In [ ]:
sample_bhl_df = sqlContext.read.load("/guoda/data/bhl-tiny-{}.parquet"
                              .format(bhl_df_version))

In [ ]:


In [ ]:
idb_df_version = "20160516"