In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
This imports the col function which is used later to get the column object for something that is calculated as part of the pipeline and does not yet have a name that can be refered to.
In [2]:
from pyspark.sql.functions import col
There is a parquet file stored on the HDFS file system used by the Spark cluster that already has a copy of iDigBio data in it. This is a testing export and does not have all of the columns from iDigBio but it has a lot of the commonly used ones.
In [3]:
# Confirm we've started SPARK and created a SQL context that can be used
sqlContext
Out[3]:
In [15]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-20190612T171757.parquet")
In [16]:
idb_df.count()
Out[16]:
In [17]:
checklists = (idb_df
.select(idb_df.countrycode, idb_df.scientificname)
.filter(idb_df.countrycode != "")
.groupBy(idb_df.countrycode, idb_df.scientificname)
.count()
)
In [ ]:
In [18]:
print(checklists.count())
#checklists.head(10)
In [19]:
checklists.head(10)
Out[19]:
All the iDigBio data is first filtered to only those records related to arthopods. Notice that there are two syntaxes for the filter function and each line uses one of them. The SQL-style IN clause was an easier way of expressing the country codes than a set of equality and or operations.
In [20]:
arthropods = (idb_df
.filter(idb_df.phylum == "arthropoda")
.filter("countrycode IN ('usa', 'can', 'mex')")
)
arthropods.count()
Out[20]:
The syntax for summarizing data also resembles SQL. (In fact, all this can be done with an actual SQL statement using registerTempTable() and sqlContext.sql() if you wanted to work only with SQL syntax.) Notice the use of the col() function to get the newly calculated column of counts. This is needed because there is no column arthropods.counts, the column only exists in the context of the calulation before the assignment to species_counts happens.
In [21]:
species_counts = (arthropods
.groupBy(arthropods.scientificname)
.count()
.orderBy(col("count"), ascending=False)
)
In [22]:
print(species_counts.count())
species_counts.head(10)
Out[22]:
To make a quick graph, the Spark dataframe needs to be converted to a structure that graphing libraries like matplotlib understand. The topPandas() method of a dataframe will convert the data to a Pandas dataframe but this is a single object in the memory of the notebook server, it is not on the Spark cluster. Additionally, graphing more than 10,000 things is unreadable so be very sure to cut down the size of the dataframe before converting to Pandas. Here only species with more than 100 occurences are graphed.
In [23]:
top_species_for_graph = (species_counts
.filter("count > 500")
.toPandas()
)
print(top_species_for_graph.shape)
In [24]:
ax = plt.bar(left=top_species_for_graph.index,
height=top_species_for_graph["count"])
In [14]:
(species_counts
.write
.format("com.databricks.spark.csv")
.mode("overwrite")
.option("header", "true")
.save("/outputs/species_counts.csv")
)
In [ ]: