In [7]:
from pyspark.sql.functions import year, udf
import matplotlib.pyplot as plt
%matplotlib inline
In [1]:
dir()
Out[1]:
In [1]:
df = sqlContext.read.load("/guoda/data/idigbio-20190612T171757.parquet")
In [3]:
df.count()
Out[3]:
In [4]:
year_summary = df.groupBy(year("datecollected").cast("integer").alias("yearcollected")).count().orderBy("yearcollected").persist()
In [5]:
year_summary.count()
Out[5]:
In [6]:
year_summary.printSchema()
In [7]:
year_summary.describe().show()
In [8]:
year_summary.head(10)
Out[8]:
In [9]:
year_summary.orderBy("yearcollected", ascending=False).head(10)
Out[9]:
In [10]:
pandas_year_summary = year_summary.filter(year_summary.yearcollected >= 1817).filter(year_summary.yearcollected <= 2017).orderBy("yearcollected").toPandas()
In [11]:
pandas_year_summary.head()
Out[11]:
In [12]:
plt.bar(pandas_year_summary["yearcollected"], pandas_year_summary["count"])
Out[12]:
In [13]:
yc_sum = (df
.groupBy(year("datecollected").cast("integer").alias("yearcollected"),
"continent")
.count()
.orderBy("yearcollected")
.persist()
)
In [14]:
yc_sum.head(10)
Out[14]:
We're going to have to fix some data!
In [11]:
continents = set(["africa", "asia", "australia", "europe", "north america", "south america"])
def fix_continent(c):
if c in continents:
return c
else:
return "other"
print(fix_continent("europe"))
print(fix_continent("oceana"))
In [13]:
fix_continent_udf = udf(fix_continent)
In [10]:
yc_sum = (df
.withColumn("fixed", fix_continent_udf(df.continent))
.groupBy(year("datecollected").cast("integer").alias("yearcollected"),
"fixed")
.count()
.orderBy("yearcollected")
.persist()
)
In [17]:
yc_sum.head(10)
Out[17]:
In [18]:
yc_sum.groupBy(yc_sum.fixed).count().show()
In [19]:
yc_cross = (df
.select(year("datecollected").cast("integer").alias("yearcollected"),
df.continent)
.withColumn("fixed", fix_continent_udf(df.continent))
.crosstab("yearcollected", "fixed")
)
In [20]:
yc_cross.head(5)
Out[20]:
In [21]:
pandas_yc_cross = (yc_cross
.filter(yc_cross.yearcollected_fixed >= 1817)
.filter(yc_cross.yearcollected_fixed <= 2017)
.orderBy(yc_cross.yearcollected_fixed)
.toPandas()
)
In [22]:
pandas_yc_cross.head(200)
Out[22]:
In [23]:
p1 = plt.bar(pandas_yc_cross['yearcollected_fixed'],
pandas_yc_cross['asia'], color='#d62728', edgecolor='none')
#p2 = plt.bar(pandas_yc_cross['yearcollected_fixed'],
# pandas_yc_cross['africa'], color='#05ff05', edgecolor='none')
In [27]:
import pandas as pd
import numpy as np
continents_list = sorted(continents)
continents_list.insert(0, "other")
# the 100k subset has no australia stuff so graph loop fails
#continents_list.remove("australia")
# blue -> red
#colors = ["#6d6263", "#00e5c8", "#1bc4ae", "#36a395", "#51827c", "#88414a", "#a32031", "#bf0018"]
# blue -> orange
colors = ["#a75902", "#00e5c8", "#1bcda7", "#37b686", "#539f65", "#51827c", "#bf6603"]
plots = []
bottoms = pd.DataFrame(np.zeros((len(pandas_yc_cross['yearcollected_fixed']), 1)))
for c in continents_list:
plots.append(
plt.bar(pandas_yc_cross['yearcollected_fixed'], pandas_yc_cross[c],
color=colors[len(plots)], edgecolor='none',
width=1.0, bottom=bottoms[0])
)
bottoms[0] += pandas_yc_cross[c]
#print(pandas_yc_cross[c])
#print(bottoms)
# Start of WWI
plt.axvline(x=1914)
# Start of WWII
plt.axvline(x=1939)
# "1988 - October 31: President Reagan signs the NSF Authorization Act of 1988, thereby authorizing the doubling of the NSF budget over the next five years."
plt.axvline(x=1988)
plt.legend(plots, continents_list, loc=2)
plt.title("Specimens in iDigBio by Collection Year and Continent")
plt.ylabel("Number of Specimen Records")
plt.xlabel("Year")
axes = plt.gca()
axes.set_xlim([1815, 2020])
axes.set_ylim([0, 1200000])
fig = plt.gcf()
fig.set_size_inches(12, 4)
In [1]:
df = sqlContext.read.parquet("/guoda/data/gbif-idigbio.parquet/source=gbif/date=20160825")
In [17]:
df.createOrReplaceTempView("df")
renamed_cols = sqlContext.sql("""
SELECT `http://rs.tdwg.org/dwc/terms/eventDate` as eventDate,
`http://rs.tdwg.org/dwc/terms/continent` as continent
FROM df
WHERE `http://rs.tdwg.org/dwc/terms/basisOfRecord` LIKE "%SPECIMEN%"
""").persist()
In [18]:
renamed_cols.count()
Out[18]:
In [37]:
renamed_cols.groupBy("continent").count().orderBy("count", ascending=False).head(100)
Out[37]:
In [52]:
date_group = renamed_cols.groupBy(year("eventDate").cast("integer").alias("year")).count().orderBy("year", ascending=False)
In [53]:
date_group.head(10)
Out[53]:
In [55]:
date_group.describe().show()
In [43]:
# need to do some harder-core data cleaning
continents = set(["africa", "asia", "oceania", "europe", "north america", "south america"])
def fix_continent_gbif(c):
for continent in continents:
if (c) and (continent in c.lower().replace("_", " ")):
return continent
return "other"
print(fix_continent_gbif(None))
print(fix_continent_gbif("europe"))
print(fix_continent_gbif("oceania"))
print(fix_continent_gbif("NORTH_AMERICA"))
print(fix_continent_gbif("North America, Canada, Manitoba, Churchill"))
print(fix_continent_gbif("East Indies, Indonesia: Pulo Pandjang, off Sumatra"))
print(fix_continent_gbif("Asia; Thailand; Pathum Thani"))
In [44]:
from pyspark.sql.functions import udf
fix_continent_gbif_udf = udf(fix_continent_gbif)
In [45]:
yc_cross_gbif = (renamed_cols
.select(year("eventDate").cast("integer").alias("yearcollected"),
renamed_cols.continent)
.withColumn("fixed", fix_continent_gbif_udf(renamed_cols.continent))
.crosstab("yearcollected", "fixed")
.persist()
)
In [46]:
pandas_yc_cross_gbif = (yc_cross_gbif
.filter(yc_cross_gbif.yearcollected_fixed >= 1817)
.filter(yc_cross_gbif.yearcollected_fixed <= 2017)
.orderBy(yc_cross_gbif.yearcollected_fixed)
.toPandas()
)
In [47]:
import pandas as pd
import numpy as np
continents_list = sorted(continents)
continents_list.insert(0, "other")
# the 100k subset has no australia stuff so graph loop fails
#continents_list.remove("australia")
# blue -> red
#colors = ["#6d6263", "#00e5c8", "#1bc4ae", "#36a395", "#51827c", "#88414a", "#a32031", "#bf0018"]
# blue -> orange
colors = ["#a75902", "#00e5c8", "#1bcda7", "#37b686", "#539f65", "#51827c", "#bf6603"]
plots = []
bottoms = pd.DataFrame(np.zeros((len(pandas_yc_cross_gbif['yearcollected_fixed']), 1)))
for c in continents_list:
plots.append(
plt.bar(pandas_yc_cross_gbif['yearcollected_fixed'], pandas_yc_cross_gbif[c],
color=colors[len(plots)], edgecolor='none',
width=1.0, bottom=bottoms[0])
)
bottoms[0] += pandas_yc_cross_gbif[c]
#print(pandas_yc_cross[c])
#print(bottoms)
# Start of WWI
plt.axvline(x=1914)
# Start of WWII
plt.axvline(x=1939)
# "1988 - October 31: President Reagan signs the NSF Authorization Act of 1988, thereby authorizing the doubling of the NSF budget over the next five years."
plt.axvline(x=1988)
plt.legend(plots, continents_list, loc=2)
plt.title("Specimens in GBIF by Collection Year and Continent")
plt.ylabel("Number of Specimen Records")
plt.xlabel("Year")
axes = plt.gca()
axes.set_xlim([1815, 2020])
axes.set_ylim([0, 1200000])
fig = plt.gcf()
fig.set_size_inches(12, 4)