Combine job board and BLS data to find trends in job / industry growth in Chicago and elsewhere.
Here is how to do your own:
Get the newest version of Spark, pre-built for Hadoop 2.4. It has to be pre-built so that the pyspark client will have the proper jars
curl -O http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.0/spark-1.3.0-bin-hadoop2.4.tgz
tar -xzvf spark-1.3.0-bin-hadoop2.4.tgz
Launch it on Amazon EC2 using a script in the ec2 directory in the source instructions
cd spark-1.3.0-bin-hadoop2.4
export SPARK_HOME=`pwd`
$SPARK_HOME/ec2/spark-ec2 --slaves 2 \
--key-pair <Amazon_Keypair_Name> \
--identity-file <path/to/Amazon_Keypair.pem> \
--copy-aws-credentials \
--zone us-east-1b --instance-type=m1.medium \
launch spark_cluster
At the end of the startup run, it will show a URL we can use to connect, or else
navigate to the EC2 dashboard through the Amazon Web Service Console
to find out what the IP address is for the master node. The Spark dashboard is on
port 8080 by default: <ip address>:8080
Mine is here: http://ec2-54-166-72-95.compute-1.amazonaws.com:8080/
A handful of blogs describe how to set up IPython + Spark; they're helpful but outdated:
IPython options are built into pyspark. Follow option 3 above to create a password-protected IPython notebook configuration file, but instead of setting c.NotebookApp.ip = *
like it says, use the Master's designated address. (e.g. 'ec2-54-166-72-95.compute-1.amazonaws.com'
).
PySpark prefers Python 2.7 but Python 2.6 is the default Python for an Amazon EC2 instance. The below are a set of scripts to install Python 2.7 and some dependencies I need (requests
, pymongo
and lxml
). All of the others are PySpark dependencies.
SSH in to the master node and execute:
xargs -L1 -a commands.txt ./go.sh
commands.txt
containing:yes | yum install python27-devel
unlink /etc/alternatives/python
ln -s /usr/bin/python2.7 /etc/alternatives/python
wget https://bootstrap.pypa.io/ez_setup.py
python ez_setup.py
easy_install pip
rm ez_setup.py
rm setuptools-16.0.zip
easy_install Cython
yes | yum install freetype-devel
yes | yum install libpng-devel
pip install numpy scipy
pip install matplotlib
yes | yum install libxml2-devel
yes | yum install libxslt-devel
pip install requests lxml pymongo
pip install ipython[notebook]
go.sh
containing:#!/usr/bin/env bash
echo '------------------------------------------------'
echo $@; $@
while read worker
do
ssh ${worker} "echo 'machine ${worker}'; $@"
done < /root/spark/conf/slaves
Add a Custom TCP rule to the spark_cluster-master
Amazon EC2 security group to allow the port for the IPython notebook (8888 by default; Cloudera says to watch for potential port assignment collisions but 8888 worked fine.)
a. Navigate to the EC2 console
b. Click on the master instance, and then on the security group assigned to that instance
c. It will open another user interface. Click on Actions → Edit inbound rules and add a custom TCP rule with protocol TCP, port range 8888, and source Anywhere
Set environment variables for <spark-home>/bin/pyspark
to use to launch a properly configured IPython notebook, ready to use pyspark. As of Spark 1.2:
export PYSPARK_DRIVER_PYTHON=ipython
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --profile=pyspark'
Launch pyspark. Specifically designate the master node or else pyspark
will run as a local standalone spark instance:
/root/spark/bin/pyspark --master spark://ec2-54-166-72-95.compute-1.amazonaws.com:7077
And to ensure persistence after logging out, the above was wrapped in nohup <command> &
In [4]:
###
# There is now a 'SparkContext' instance available as the named variable 'sc'
# and there is a HiveContext instance (for SQL-like queries) available as 'sqlCtx'
#
## Check that this simple code runs without error:
sc.parallelize([1,2,3,4,5]).take(2)
Out[4]:
In [ ]:
###
# Inspect the SparkContext [sc] or the HiveContext [sqlCtx]
#help(sc)
help(sqlCtx)
In [55]:
from random import random
from operator import add
def monte_carlo(_):
"""4 * area (1 quadrant of a unit circle) pi"""
x = random()
y = random()
return 4.0 if pow(x, 2) + pow(y, 2) < 1 else 0
N = 1000
parts = 2
sc.parallelize(xrange(N), parts).map(monte_carlo).reduce(add) / N
Out[55]:
We will merge a job postings dataset and the BLS Occupations and Earnings data together, using location, occupation, and possibly industry category.
Information on connecting to S3 from Spark is at the bottom of the Spark docs on EC2 scripts, and copied here:
You can specify a path in S3 as input through a URI of the form
s3n://<bucket>/path
. To provide AWS credentials for S3 access, launch the Spark cluster with the option--copy-aws-credentials
. Full instructions on S3 access using the Hadoop input libraries can be found on the Hadoop S3 page.In addition to using a single input file, you can also use a directory of files as input by simply giving the path to the directory.
Separate files listing the BLS categories were loaded to an Amazon S3 bucket: tts-wwmm/areas.txt
, tts-wwmm/industry.txt
, tts-wwmm/occupations.txt
. All are two-column files with no headers, and a tab separating the variable code and the variable label.
JobsAggregator aggregates from Indeed, SimplyHired, CareerBuilder, Monster, and CareerJet, showing the most recent job posts on each site.
The function scrape
in the file jobs_aggregator_scraper.py
iteratively scrapes the site, and returns a generator that yields current job listings (as a dictionary) for a given state and occupation.
The data are actually not best pulled via the BLS API. It has only the most recent year's statistics; the rest are archived at http://www.bls.gov/oes/tables.htm.
The contents of the archive files were loaded to a MongoDB database. There was manual work to handle different column names and file formats for the different years. Data are available by occupation at the state and national level, and at more aggregated levels for municipal areas. Below is an example of one observation. The ANNUAL
and OVERALL
entries are lists of dictionaries with one entry per year, possibly with data from as far back as 2000.
{ "AREA": "3800003",
"AREA_NAME": "East Central North Dakota",
"ST": "ND",
"OCC_CODE": "39-1012",
"OCC_TITLE": "Slot key persons",
"ANNUAL": [
{
"YEAR": 2009,
"pct90": 34530,
"pct75": 32730,
"pct50": 30170,
"pct25": 27610,
"pct10": 19250
}
],
"OVERALL": [
{
"YEAR": 2009,
"JOBS_1000": 0.923,
"TOT_EMP": 40,
"A_MEAN": 29100,
"MEAN_PRSE": 5.6,
"H_MEAN": 13.99,
"EMP_PRSE": 30.2
}
]
}
In [13]:
### ------------------------------------------------- AMAZON ----- ###
# ⇒ These files identify columns that will be common to the job
# board data and the BLS datasets.
#
# To use S3 buckets add `--copy-aws-credentials` to the ec2 launch command.
#
# Create a Resilient Distributed Dataset with the
# list of occupations in the BLS dataset:
# https://s3.amazonaws.com/tts-wwmm/occupations.txt
from pyspark.sql import Row
# Load the occupations lookups and convert each line to a Row.
lines = sc.textFile('s3n://tts-wwmm/occupations.txt')
Occupation = Row('OCC_CODE', 'OCC_TITLE')
occ = lines.map(lambda l: Occupation( *l.split('\t') ))
# Do the same for the areas lookups.
lines = sc.textFile('s3n://tts-wwmm/areas.txt')
Area = Row('AREA', 'AREA_NAME')
area = lines.map(lambda l: Area( *l.split('\t') ))
area_df = sqlCtx.createDataFrame(area)
area_df.registerTempTable('area')
# Just to show how sqlCtx.sql works
states = sqlCtx.sql("SELECT AREA_NAME, AREA FROM area WHERE AREA RLIKE '^S.*'")
print states.take(2)
# Same as above, but result is another Resilient Distributed Dataset
states = area.filter(lambda a: a.AREA.startswith('S'))
# Create every combination of occupation, state
occ_by_states = occ.cartesian(states)
# Broadcast makes a static copy of the variable available to all nodes
#broadcast_state_names = sc.broadcast(broadcast_state_names)
#
#print broadcast_state_names.take(2)
In [ ]:
### ----------------------------------------- JOBS_AGGREGATOR ----- ###
#
# Make `jobs_aggregator_scraper.py` available on all nodes
# and iteratively get the top 5 jobs from each poster in each state for
# each occupation via JobsAggregator.com
sc.addPyFile('s3n://tts-wwmm/jobsaggregator_scraper.py')
def scrape_occupation(occ_state):
from jobsaggregator_scraper import scrape
occ_row, state_entry = occ_state
return [Row(**job)
for job in scrape(state=state_entry[1], occupation=occ_row.OCC_TITLE)]
jobs = occ_by_states.flatMap(scrape_occupation).distinct()
jobs_df = sqlCtx.inferSchema(jobs)
jobs_df.registerTempTable('jobs')
In [16]:
jobs_df.toJSON().saveAsTextFile('wwmm/jobsaggregator_json')
In [15]:
jobs.saveAsTextFile('wwmm/jobsaggregator_df')
In [19]:
jobs.take(2)
Out[19]:
In [40]:
### -------------------------------------------- BLS OES DATA ----- ###
#
# The OES data were loaded to a mongolabs database. Read the URI
# (which has a user name and password) from an environment variable
# and create a connection. The pymongo API is very simple.
#
# Datasets are stored one entry per Occupation ID (OCC_ID)
# per area (00-0000)
from pymongo import MongoClient
MONGO_URI = os.getenv('MONGO_URI')
client = MongoClient(MONGO_URI) # connection
oe = client.oe # database
# Confirm we can get data from each collection
oo = oe['nat'].find(filter={'OCC_CODE':'00-0000'},
projection={'_id':False,
'OCC_CODE':True, 'OCC_TITLE':True,
'ANNUAL':{'$slice':-5}, 'OVERALL':{'$slice':-2}})
for o in oo:
print o
In [68]:
# Which OCC contains software-type people?
occ_df = sqlCtx.createDataFrame(occ)
occ_df.registerTempTable('occ')
computer_jobs = sqlCtx.sql((
"SELECT OCC_CODE, OCC_TITLE "
"FROM occ "
"WHERE OCC_TITLE RLIKE 'omputer'"
)).collect()
In [67]:
for row in computer_jobs:
print "{OCC_CODE}: {OCC_TITLE}".format(**row.asDict())
In [72]:
# Want Chicago's area code
chicago = sqlCtx.sql((
"SELECT AREA, AREA_NAME "
"FROM area "
"WHERE AREA_NAME RLIKE 'icago' or AREA_NAME RLIKE 'llinois'"
)).collect()
print "\n".join("{}: {}".format(c.AREA, c.AREA_NAME) for c in chicago)
In [80]:
# Now get the data:
## -------------------------------------- National
desired_data = {'_id':False,
'ANNUAL':{'$slice':-5}, 'OVERALL':{'$slice':-2}}
nat = oe['nat'].find(filter={'OCC_CODE':'15-1131'},
projection=desired_data)
nat = [n for n in nat]
len(nat)
Out[80]:
In [84]:
## -------------------------------------- State
il = oe['st'].find(filter={'OCC_CODE':'15-1131',
'AREA':'17'},
projection=desired_data)
il = [i for i in il]
len(il)
Out[84]:
In [90]:
## -------------------------------------- Municipal Areas
## The lookup for chicago didn't work...
## ... so I am looking through all of the municipal areas...
chi = oe['ma'].find(filter={'OCC_CODE':'15-1131'},
projection=desired_data)
chi = [c for c in chi if 'IL' in c['AREA_NAME']]
len(chi)
Out[90]:
In [100]:
# Get the mean
import tablib
nat_annual = tablib.Dataset()
nat_annual.dict = nat[0]['ANNUAL']
il_annual = tablib.Dataset()
il_annual.dict = il[0]['ANNUAL']
chi_annual = tablib.Dataset()
chi_annual.dict = chi[1]['ANNUAL']
In [45]:
from lightning import Lightning
lgn = Lightning(host="https://tts-lightning.herokuapp.com",
ipython=True,
auth=("tanya@tickel.net", "password"))
In [110]:
# Median salaries
lgn.line(series=[nat_annual['pct50'], il_annual['pct50'], chi_annual['pct50']],
index=nat_annual['YEAR'],
color=[[0,0,0],[255,0,0],[0,155,0]],
size=[5,2,2],
xaxis="Year",
yaxis="Median annual salary")
Out[110]:
In [112]:
# How about regionally?
all_states = oe['st'].find(filter={'OCC_CODE':'15-1131'},
projection={'$_id': False,
'OVERALL':{'$slice':-2}})
all_states = [a for a in all_states]
len(all_states)
Out[112]:
In [114]:
state_abbrs = [a['ST'] for a in all_states]
mean_salaries = [a['OVERALL'][0]['A_MEAN'] for a in all_states]
num_employed = [a['OVERALL'][0]['TOT_EMP'] for a in all_states]
In [118]:
# Mean salaries
print "max average salary:", max(mean_salaries)
print "Illinois:", mean_salaries[state_abbrs.index('IL')]
lgn.map(regions=state_abbrs, values=mean_salaries)
Out[118]:
In [121]:
# Employees
print "Most programmers:", max(num_employed)
print "Illinois:", num_employed[state_abbrs.index('IL')]
lgn.map(regions=state_abbrs, values=num_employed)
Out[121]:
In [124]:
salaries = tablib.Dataset(*zip(state_abbrs, mean_salaries),
headers=('State', 'Salary'))
employees= tablib.Dataset(*zip(state_abbrs, num_employed),
headers=('State', 'Employees'))
In [138]:
salaries = salaries.sort("Salary", reverse=True)
print "\n".join("{s[0]}: {s[1]:0,.0f}".format(s=s) for s in salaries[:5])
In [139]:
employees = employees.sort("Employees", reverse=True)
print "\n".join("{e[0]}: {e[1]:0,.0f}".format(e=e) for e in employees[:5])