Work work, money money

Combine job board and BLS data to find trends in job / industry growth in Chicago and elsewhere.

Data:

Technology:

  • Apache Spark on an Amazon EC2 (Elastic Cloud 2) cluster; instructions below
  • input data stored in Amazon S3 buckets, output written to HDFS permanent storage
  • Images rendered in D3 via a private Lightning server

Steps to launch Spark AWS EC2 cluster

Here is how to do your own:

  1. Get an Amazon Web Services account
  2. Get the newest version of Spark, pre-built for Hadoop 2.4. It has to be pre-built so that the pyspark client will have the proper jars

     curl -O http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.0/spark-1.3.0-bin-hadoop2.4.tgz
     tar -xzvf spark-1.3.0-bin-hadoop2.4.tgz
  1. Launch it on Amazon EC2 using a script in the ec2 directory in the source instructions

     cd spark-1.3.0-bin-hadoop2.4
     export SPARK_HOME=`pwd`
     $SPARK_HOME/ec2/spark-ec2  --slaves 2  \
         --key-pair <Amazon_Keypair_Name>  \
         --identity-file <path/to/Amazon_Keypair.pem>  \
         --copy-aws-credentials  \
         --zone us-east-1b --instance-type=m1.medium  \
         launch spark_cluster
  2. At the end of the startup run, it will show a URL we can use to connect, or else navigate to the EC2 dashboard through the Amazon Web Service Console to find out what the IP address is for the master node. The Spark dashboard is on port 8080 by default: <ip address>:8080
    Mine is here: http://ec2-54-166-72-95.compute-1.amazonaws.com:8080/

Steps to launch IPython notebook connecting to AWS EC2 cluster

A handful of blogs describe how to set up IPython + Spark; they're helpful but outdated:

  1. Presentation of two distinct ways to do it
  2. Too much information
  3. Two separate ways presented as if they were one

IPython options are built into pyspark. Follow option 3 above to create a password-protected IPython notebook configuration file, but instead of setting c.NotebookApp.ip = * like it says, use the Master's designated address. (e.g. 'ec2-54-166-72-95.compute-1.amazonaws.com').

Configuration for Python 2.7

PySpark prefers Python 2.7 but Python 2.6 is the default Python for an Amazon EC2 instance. The below are a set of scripts to install Python 2.7 and some dependencies I need (requests, pymongo and lxml). All of the others are PySpark dependencies.

SSH in to the master node and execute:

xargs -L1 -a commands.txt ./go.sh


with commands.txt containing:

yes | yum install python27-devel
unlink /etc/alternatives/python
ln -s /usr/bin/python2.7 /etc/alternatives/python

wget https://bootstrap.pypa.io/ez_setup.py
python ez_setup.py
easy_install pip
rm ez_setup.py
rm setuptools-16.0.zip 

easy_install Cython
yes | yum install freetype-devel
yes | yum install libpng-devel
pip install numpy scipy
pip install matplotlib

yes | yum install libxml2-devel
yes | yum install libxslt-devel
pip install requests lxml pymongo
pip install ipython[notebook]


and go.sh containing:

#!/usr/bin/env bash

echo '------------------------------------------------'
echo $@;  $@

while read worker
do
  ssh ${worker} "echo 'machine ${worker}'; $@"
done < /root/spark/conf/slaves




Back to launching an IPython notebook

  1. Add a Custom TCP rule to the spark_cluster-master Amazon EC2 security group to allow the port for the IPython notebook (8888 by default; Cloudera says to watch for potential port assignment collisions but 8888 worked fine.) a. Navigate to the EC2 console b. Click on the master instance, and then on the security group assigned to that instance c. It will open another user interface. Click on ActionsEdit inbound rules and add a custom TCP rule with protocol TCP, port range 8888, and source Anywhere

  2. Set environment variables for <spark-home>/bin/pyspark to use to launch a properly configured IPython notebook, ready to use pyspark. As of Spark 1.2:

     export PYSPARK_DRIVER_PYTHON=ipython
     export PYSPARK_DRIVER_PYTHON_OPTS='notebook --profile=pyspark'
  1. Launch pyspark. Specifically designate the master node or else pyspark will run as a local standalone spark instance:

     /root/spark/bin/pyspark --master spark://ec2-54-166-72-95.compute-1.amazonaws.com:7077

And to ensure persistence after logging out, the above was wrapped in nohup <command> &


In [4]:
###
# There is now a 'SparkContext' instance available as the named variable 'sc'
# and there is a HiveContext instance (for SQL-like queries) available as 'sqlCtx'
#
## Check that this simple code runs without error:
sc.parallelize([1,2,3,4,5]).take(2)


Out[4]:
[1, 2]

In [ ]:
###
# Inspect the SparkContext [sc] or the HiveContext [sqlCtx]
#help(sc)
help(sqlCtx)

De Pie :: (parallel calculation)

${SPARK_HOME}/spark/examples/src/main/python/pi.py


In [55]:
from random import random
from operator import add

def monte_carlo(_):
    """4 * area (1 quadrant of a unit circle)  pi"""
    x = random()
    y = random()
    return 4.0 if pow(x, 2) + pow(y, 2) < 1 else 0

N = 1000
parts = 2
sc.parallelize(xrange(N), parts).map(monte_carlo).reduce(add) / N


Out[55]:
3.140564

The data

We will merge a job postings dataset and the BLS Occupations and Earnings data together, using location, occupation, and possibly industry category.

Amazon S3 buckets

Information on connecting to S3 from Spark is at the bottom of the Spark docs on EC2 scripts, and copied here:

You can specify a path in S3 as input through a URI of the form s3n://<bucket>/path. To provide AWS credentials for S3 access, launch the Spark cluster with the option --copy-aws-credentials. Full instructions on S3 access using the Hadoop input libraries can be found on the Hadoop S3 page.

In addition to using a single input file, you can also use a directory of files as input by simply giving the path to the directory.

Separate files listing the BLS categories were loaded to an Amazon S3 bucket: tts-wwmm/areas.txt, tts-wwmm/industry.txt, tts-wwmm/occupations.txt. All are two-column files with no headers, and a tab separating the variable code and the variable label.

The JobsAggregator data

JobsAggregator aggregates from Indeed, SimplyHired, CareerBuilder, Monster, and CareerJet, showing the most recent job posts on each site.

The function scrape in the file jobs_aggregator_scraper.py iteratively scrapes the site, and returns a generator that yields current job listings (as a dictionary) for a given state and occupation.

The BLS data

The data are actually not best pulled via the BLS API. It has only the most recent year's statistics; the rest are archived at http://www.bls.gov/oes/tables.htm.

The contents of the archive files were loaded to a MongoDB database. There was manual work to handle different column names and file formats for the different years. Data are available by occupation at the state and national level, and at more aggregated levels for municipal areas. Below is an example of one observation. The ANNUAL and OVERALL entries are lists of dictionaries with one entry per year, possibly with data from as far back as 2000.

{    "AREA": "3800003",
"AREA_NAME": "East Central North Dakota",
       "ST": "ND",
 "OCC_CODE": "39-1012",
"OCC_TITLE": "Slot key persons",
   "ANNUAL": [
        {
             "YEAR": 2009,
            "pct90": 34530,
            "pct75": 32730,
            "pct50": 30170,
            "pct25": 27610,
            "pct10": 19250
        }
    ],
    "OVERALL": [
        {
               "YEAR": 2009,
          "JOBS_1000": 0.923,
            "TOT_EMP": 40,
             "A_MEAN": 29100,
          "MEAN_PRSE": 5.6,
             "H_MEAN": 13.99,
           "EMP_PRSE": 30.2
        }
    ]
}

Load the lookup tables from Amazon S3


In [13]:
### ------------------------------------------------- AMAZON ----- ###
# ⇒ These files identify columns that will be common to the job
#    board data and the BLS datasets.
#
# To use S3 buckets add `--copy-aws-credentials` to the ec2 launch command.
#
# Create a Resilient Distributed Dataset with the
# list of occupations in the BLS dataset:
#     https://s3.amazonaws.com/tts-wwmm/occupations.txt
from pyspark.sql import Row

# Load the occupations lookups and convert each line to a Row.
lines = sc.textFile('s3n://tts-wwmm/occupations.txt')
Occupation = Row('OCC_CODE', 'OCC_TITLE')
occ = lines.map(lambda l: Occupation( *l.split('\t') ))

# Do the same for the areas lookups.
lines = sc.textFile('s3n://tts-wwmm/areas.txt')
Area = Row('AREA', 'AREA_NAME')
area = lines.map(lambda l: Area( *l.split('\t') ))
area_df = sqlCtx.createDataFrame(area)
area_df.registerTempTable('area')

# Just to show how sqlCtx.sql works
states = sqlCtx.sql("SELECT AREA_NAME, AREA FROM area WHERE AREA RLIKE '^S.*'")
print states.take(2)

# Same as above, but result is another Resilient Distributed Dataset
states = area.filter(lambda a: a.AREA.startswith('S'))

# Create every combination of occupation, state
occ_by_states = occ.cartesian(states)

# Broadcast makes a static copy of the variable available to all nodes
#broadcast_state_names = sc.broadcast(broadcast_state_names)
#
#print broadcast_state_names.take(2)


[Row(AREA_NAME=u'Alabama', AREA=u'S0100000'), Row(AREA_NAME=u'Alaska', AREA=u'S0200000')]

Scrape JobsAggregator


In [ ]:
### ----------------------------------------- JOBS_AGGREGATOR ----- ###
#
# Make `jobs_aggregator_scraper.py` available on all nodes
# and iteratively get the top 5 jobs from each poster in each state for
# each occupation via JobsAggregator.com
sc.addPyFile('s3n://tts-wwmm/jobsaggregator_scraper.py')

def scrape_occupation(occ_state):
    from jobsaggregator_scraper import scrape
    occ_row, state_entry = occ_state
    return [Row(**job)
            for job in scrape(state=state_entry[1], occupation=occ_row.OCC_TITLE)]

jobs = occ_by_states.flatMap(scrape_occupation).distinct()
jobs_df = sqlCtx.inferSchema(jobs)
jobs_df.registerTempTable('jobs')

In [16]:
jobs_df.toJSON().saveAsTextFile('wwmm/jobsaggregator_json')

In [15]:
jobs.saveAsTextFile('wwmm/jobsaggregator_df')

In [19]:
jobs.take(2)


Out[19]:
[Row(area_name=u'Pennsylvania', date='2015-5-25', description='dishwasher: dennys is hiring in new kensington pa dishwasher responsibilities ensure all dishware is clean ', employer='dennys', job_title='dishwasher', occ_title=u'Dishwashers'),
 Row(area_name=u'California', date='2015-5-23', description='part time dog walker, pet sitter:  job is ideal for future vet techs dog moms i am looking people with a solid work ethic people who are mature organization with a steady contribution an ideal candidate would have a background in or human health care for dog ', employer='', job_title='part time dog walker, pet sitter', occ_title=u'Animal-Trainers')]

Load OES Data


In [40]:
### -------------------------------------------- BLS OES DATA ----- ###
#
# The OES data were loaded to a mongolabs database. Read the URI
# (which has a user name and password) from an environment variable
# and create a connection. The pymongo API is very simple.
#
# Datasets are stored one entry per Occupation ID (OCC_ID)
# per area (00-0000)
from pymongo import MongoClient

MONGO_URI = os.getenv('MONGO_URI')
client = MongoClient(MONGO_URI)  # connection
oe = client.oe  # database

# Confirm we can get data from each collection
oo = oe['nat'].find(filter={'OCC_CODE':'00-0000'},
               projection={'_id':False,
                           'OCC_CODE':True, 'OCC_TITLE':True,
                           'ANNUAL':{'$slice':-5}, 'OVERALL':{'$slice':-2}})

for o in oo:
    print o


{u'ANNUAL': [{u'pct75': 54250.0, u'pct50': 33840.0, u'pct10': 17690.0, u'pct90': 83140.0, u'YEAR': 2010, u'pct25': 22150.0}, {u'pct75': 55470.0, u'pct50': 34460.0, u'pct10': 18000.0, u'pct90': 85280.0, u'YEAR': 2011, u'pct25': 22380.0}, {u'pct75': 56200.0, u'pct50': 34750.0, u'pct10': 18090.0, u'pct90': 86810.0, u'YEAR': 2012, u'pct25': 22480.0}, {u'pct75': 56860.0, u'pct50': 35080.0, u'pct10': 18190.0, u'pct90': 88330.0, u'YEAR': 2013, u'pct25': 22670.0}, {u'pct75': 57720.0, u'pct50': 35540.0, u'pct10': 18350.0, u'pct90': 90060.0, u'YEAR': 2014, u'pct25': 22950.0}], u'OCC_CODE': u'00-0000', u'OVERALL': [{u'TOT_EMP': 132588810.0, u'YEAR': 2013, u'A_MEAN': 46440.0, u'MEAN_PRSE': 0.1, u'H_MEAN': 22.33, u'EMP_PRSE': 0.1}, {u'TOT_EMP': 135128260.0, u'YEAR': 2014, u'A_MEAN': 47230.0, u'MEAN_PRSE': 0.1, u'H_MEAN': 22.71, u'EMP_PRSE': 0.1}], u'OCC_TITLE': u'All Occupations'}

In [68]:
# Which OCC contains software-type people?
occ_df = sqlCtx.createDataFrame(occ)
occ_df.registerTempTable('occ')

computer_jobs = sqlCtx.sql((
    "SELECT OCC_CODE, OCC_TITLE "
     "FROM occ "
     "WHERE OCC_TITLE RLIKE 'omputer'"
    )).collect()

In [67]:
for row in computer_jobs:
    print "{OCC_CODE}: {OCC_TITLE}".format(**row.asDict())


11-3021: Computer and Information Systems Managers
15-0000: Computer and Mathematical Occupations
15-1100: Computer Occupations
15-1111: Computer and Information Research Scientists
15-1120: Computer and Information Analysts
15-1121: Computer Systems Analysts
15-1131: Computer Programmers
15-1142: Network and Computer Systems Administrators
15-1143: Computer Network Architects
15-1150: Computer Support Specialists
15-1151: Computer User Support Specialists
15-1152: Computer Network Support Specialists
15-1199: Computer Occupations, All Other
17-2061: Computer Hardware Engineers
17-2072: Electronics Engineers, Except Computer
25-1020: Math and Computer Teachers, Postsecondary
25-1021: Computer Science Teachers, Postsecondary
43-9011: Computer Operators
43-9071: Office Machine Operators, Except Computer
49-2011: Computer, Automated Teller, and Office Machine Repairers
51-4010: Computer Control Programmers and Operators
51-4011: Computer-Controlled Machine Tool Operators, Metal and Plastic
51-4012: Computer Numerically Controlled Machine Tool Programmers, Metal and Plastic

In [72]:
# Want Chicago's area code

chicago = sqlCtx.sql((
    "SELECT AREA, AREA_NAME "
     "FROM area "
     "WHERE AREA_NAME RLIKE 'icago' or AREA_NAME RLIKE 'llinois'"
    )).collect()

print "\n".join("{}: {}".format(c.AREA, c.AREA_NAME) for c in chicago)


M0016974: Chicago-Joliet-Naperville, IL Metropolitan Division
M0016980: Chicago-Joliet-Naperville, IL-IN-WI
S1700000: Illinois
M1700001: Northwest Illinois nonmetropolitan area
M1700002: West Central Illinois nonmetropolitan area
M1700003: East Central Illinois nonmetropolitan area
M1700004: South Illinois nonmetropolitan area

In [80]:
# Now get the data:
## -------------------------------------- National

desired_data = {'_id':False,
                'ANNUAL':{'$slice':-5}, 'OVERALL':{'$slice':-2}}
nat = oe['nat'].find(filter={'OCC_CODE':'15-1131'},
               projection=desired_data)

nat = [n for n in nat]
len(nat)


Out[80]:
1

In [84]:
## -------------------------------------- State

il = oe['st'].find(filter={'OCC_CODE':'15-1131',
                           'AREA':'17'},
                  projection=desired_data)

il = [i for i in il]
len(il)


Out[84]:
1

In [90]:
## -------------------------------------- Municipal Areas
## The lookup for chicago didn't work...
## ... so I am looking through all of the municipal areas...
chi = oe['ma'].find(filter={'OCC_CODE':'15-1131'},
                   projection=desired_data)
chi = [c for c in chi if 'IL' in c['AREA_NAME']]
len(chi)


Out[90]:
6

In [100]:
# Get the mean 
import tablib

nat_annual = tablib.Dataset()
nat_annual.dict = nat[0]['ANNUAL']
il_annual = tablib.Dataset()
il_annual.dict = il[0]['ANNUAL']
chi_annual = tablib.Dataset()
chi_annual.dict = chi[1]['ANNUAL']

Lightning-viz plots for inline D3.js in IPython

http://lightning-viz.org/


In [45]:
from lightning import Lightning

lgn = Lightning(host="https://tts-lightning.herokuapp.com",
                ipython=True,
                auth=("tanya@tickel.net", "password"))

In [110]:
# Median salaries
lgn.line(series=[nat_annual['pct50'], il_annual['pct50'], chi_annual['pct50']],
         index=nat_annual['YEAR'],
         color=[[0,0,0],[255,0,0],[0,155,0]],
         size=[5,2,2],
         xaxis="Year",
         yaxis="Median annual salary")


Out[110]:

In [112]:
# How about regionally?

all_states = oe['st'].find(filter={'OCC_CODE':'15-1131'},
                  projection={'$_id': False,
                             'OVERALL':{'$slice':-2}})
all_states = [a for a in all_states]
len(all_states)


Out[112]:
53

In [114]:
state_abbrs = [a['ST'] for a  in all_states]
mean_salaries = [a['OVERALL'][0]['A_MEAN'] for a in all_states]
num_employed = [a['OVERALL'][0]['TOT_EMP'] for a in all_states]

In [118]:
# Mean salaries
print "max average salary:", max(mean_salaries)
print "Illinois:", mean_salaries[state_abbrs.index('IL')]
lgn.map(regions=state_abbrs, values=mean_salaries)


max average salary: 111320.0
Illinois: 74620.0
Out[118]:

In [121]:
# Employees
print "Most programmers:", max(num_employed)
print "Illinois:", num_employed[state_abbrs.index('IL')]
lgn.map(regions=state_abbrs, values=num_employed)


Most programmers: 38750.0
Illinois: 20620.0
Out[121]:

In [124]:
salaries = tablib.Dataset(*zip(state_abbrs, mean_salaries),
                          headers=('State', 'Salary'))
employees= tablib.Dataset(*zip(state_abbrs, num_employed),
                         headers=('State', 'Employees'))

In [138]:
salaries = salaries.sort("Salary", reverse=True)
print "\n".join("{s[0]}: {s[1]:0,.0f}".format(s=s) for s in salaries[:5])


WA: 111,320
NM: 96,190
MD: 94,100
DC: 91,700
CO: 90,500

In [139]:
employees = employees.sort("Employees", reverse=True)
print "\n".join("{e[0]}: {e[1]:0,.0f}".format(e=e) for e in employees[:5])


CA: 38,750
TX: 23,040
NY: 22,020
IL: 20,620
WA: 15,640