Web-site Demo

http://myweatherproject.s3-website-us-east-1.amazonaws.com/

If viewing on github, here is a sample of the index web-page and Chicago city web-page

index.html
chicago.html

Architecture

EC2

Weather API

  • Obtains data from Weather Underground
  • Creates a list of tuples: (city, current_weather), for speed layer
  • Ships raw data to Firehose

Speed Layer

  • Pulls in web-site HTML as String
  • Updates Current Weather for each City using Regular Expressions

E-mail

  • Sends completion e-mail that Weather API and Speed Layer are Complete
  • Indicates number of cities updated (expecting all)

Kinesis

Firehose

  • Packages raw data from all cities together into a single file
  • Ships raw data to S3

S3

Raw Data

  • Stores raw data

Web Host

  • Hosts web-site

EMR Spark Cluster

Normalize

  • city: Data about the city
  • nearby: Nearby locations for each city
  • cityDay: Data about the city on the given date
  • weather: Weather data for the given city, date, and time
  • forecast: Forecasted weather for the city retrieved at the given date and time about the forecast date and forecast time
  • path: All S3 paths that have been loaded into the tables
  • stats: Output from analyze job discussed below

Normalize Process

  • Hourly cronjob following EC2 API job
  • weather_normalize.py
  • Load each table from parquet*
  • Check S3 for any/all new files that are not in "path" table
  • For each new file:
    • Normalize the file's data
    • Add filepath "source" data for each record (track lineage)
    • Append new data to full tables
  • Enforce keys (see below)
  • Write back to parquet
  • Send Job Completion E-mail

* Self-healing mechanism recreates tables from raw data if issues encountered with parquet files. This was used during development but hasn't been encountered in production.

Forecast Problem/Solution

Problem - can't explode multiple columns
Solution - switch to RDD

DataFrame:
City, Date, Time, [forecast date/times], [forecast temperatures], [forecast humidity], [ ]...

RDD:
Zip:
City, Date, Time, zip(forecast date/times, forecast temps, hum etc.)
City, Date, Time, [(dt, temp, hum, ...), (dt, temp, hum, ...), (dt, temp, hum...), ...)

Reshape:
[(city, date, time, dt, temp, hum, ...), (city, date, time, dt, temp, hum, ...), ...]

FlatMap:
(city, date, time, dt, temp, hum, ...)

Switch Back to DF

Enforce Keys

  • I noticed that Weather Underground shipped me 2 different historical temperatures for the same city/day (they were different by 1 degree).
  • If I simply append the new data, weather underground may not live up to my keys.
  • To enforce my keys, I will use the most recent data provided by Weather Underground for each key.
  • Because I tracked the data lineage (source) of each piece of information, I can accomplish this as follows:
    select *
    from
        (select *
        ,row_number() over(partition by city, date order by source desc) as rk
        from cityDay2V)
    where rk=1').drop('rk')

  • I enforce keys for every table

Analyze

  • Hourly cronjob following Web Update job (we discuss it first since the previously analyzed data is used in the web update)
  • weather_analyze.py
  • Load tables from Parquet
  • Join Actual Weather that occured back onto the Previous Forecasts that were made
  • I truncated minutes and joined to the nearest hour (reasonable since most data was between xx:00 and xx:02)
  • Calculate the number of hours between forecast and actual weather (call it "forecast hours")
    • For example, at 11:00 we forecast weather for 2:00, the forecast hours are 3
  • Calculate the difference between the forecast weather features and the actual weather features
  • Calculate counts, means, and standard deviations for the differences by "forecast hours"
  • Write Stats to Parquet
  • Send Job Completion E-mail

Web Update

  • Hourly cronjob following Normalize process
  • weather_report.py
  • Load tables from Parquet
  • Phase 1: Preprocess DataFrames to filter on the Current Data needed and Cache the smaller tables
  • Phase 2: For each city:
    • Query all the different tables for the current data for each section of the html report
    • Create city web-page html using both strings and pandas DataFrame.to_html()
    • Create plots by joining stats with forecast, calculating confidence intervals, using DataFrame.plot(), and saving each image to S3
  • Create index and error web-pages.
  • Send Job Completion E-mail

Hourly E-mails

Appendix - Big Data System Properties

Robustness and fault tolerance

How does my system have this property?

  • The system is broken down into 4 self-contained parts with e-mail notifications for the successful completion of each part. If one of these parts break, I will immediately know which part broke and can run the code for that part directly to identify the exact error within the code.
  • The normalize job incorporates self-healing code where if it encounters issues loading the needed parquet files, it will rebuild them from scratch from the source data.
  • Everything downstream from the S3 raw data can be easily reconstructed or moved to another server should any part of the system go down.
  • The enforce_keys function in the normalize process ensures that the keys for each table are respected by using the most recent data if duplicates are accidently sent from the external API.

How does my system fall short and how could it be improved?

  • The system is dependent on the EC2 machine connecting to the data api and the firehose to stream new data into S3. If this part of the system goes down, the weather for that timeframe would be lost forever.
    • The successful job completion e-mail serves as one measure to limit the amount of time that would pass before this situation would be caught.
  • The system is dependent on the weather underground api being available.
    • Potentially, I could source weather data from a second source in case weather underground ever failed, but the development and maintenance for this may be prohibitive.

Low latency reads and updates

How does my system have this property?

  • The speed layer gets the current weather directly to the web-site immediately when it is obtained.
  • The rest of the data is not urgent and is updated 9 minutes later (which is plenty soon enough).
  • The web-site is hosted on S3 and loads immediately upon request.

How does my system fall short and how could it be improved?

  • The weather conditions are updated hourly. This is a constraint of the number of api calls we are allowed to make on our budget.
    • The system could be improved by reading new weather data more frequently into our system.
  • The system could also get all of the data to the web-site with a quicker turnaround if we piped the stream directly into Spark Streaming and cut out the time delay from Firehose.

Scalability

How does my system have this property?

  • The EC2 API is only performing 15 requests per hour. There is a ton of room to scale if we paid weather underground for more requests.
  • Firehose will automatically scale to handle an increased load.
  • S3 is infinitely scalable for both raw data and web hosting of html pages.
  • Spark can also be scaled by easily adding more machines to increase capacity.

How does my system fall short and how could it be improved?

  • The self-healing recreation of parquet files from scratch would become more expensive if our data volume increased.
    • Instead, I would probably store the parquet files in a backup location and load from the backup if the primary load failed.

Generalization

How does my system have this property?

  • We store all raw data in S3 so any changes that we want to make downstream can be ran to incorporate all of the data that we have accumulated into the change. This makes our system quite flexible and adaptible.

How does my system fall short and how could it be improved?

  • There is a short lag (few minutes) in between the api call and firehose packaging and shipping the data to S3. This limits our ability to serve all the data downstream with no delay. This is being overcome with the speed layer for current temperature but not for the other pieces of data. There are potentially some other future applications that would want other real-time data that we aren't serving real-time today.
    • As mentioned above, we could improve this by using Spark Streaming instead of firehose to essentially cut out the time delay.

Extensibility

How does my system have this property?

  • It would be very easy to update the Weather Underground API to add a new weather feature that we want to collect and the data would automatically flow into S3.
  • The normalize process stores the data in 3NF, so the new feature would only need to belong to the single approrpriate table. There is some development required to pull in a new feature, but the system is extensible.
  • The web-site could also be extended to display new features.
  • If, for example, we wanted to add additional cities, we would only have to update the EC2 weather API. The Spark Cluster would automatically incorporate the new data. The new city would be auto-detected and a new web-page for the city would automatically be built and incorporated!

Ad hoc Queries

How does my system have this property?

  • The data tables are stored in Parquet on the cluster. It is super easy to open a new Jupyter Notebook, point to the tables, and begin querying. The data is already in 3NF, so it easy and obvious to join tables and create exactly what is needed.

How does my system fall short and how could it be improved?

  • There were some data elements in the Raw Data json structure that didn't seem useful, and that I didn't bring into the normalized format. I could normalize those additional features as well so they would also be available for querying in case of future use cases.

Minimal maintenance

How does my system have this property?

  • Every part of the system scales very easily, with many parts that will scale automatically. The cluster size we are using is more than sufficient to last months before we would need to add additional nodes.
  • The data system has already run from end-to-end (producing all 4 job complete e-mails) hundreds of times without ever encountering a single error.

How does my system fall short and how could it be improved?

  • The size of the spark cluster would need to be updated periodically. Maybe I could create a job that would measure utilization and notfiy me when we reached a certain capacity and it was time to update.

Debuggability

How does my system have this property?

  • As discussed above, the 4 discrete phases with e-mail completions makes it easy to find where the error started. From there, it is easy to run the single failing script ad hoc and retrieve the exact line number where the error is occurring.
  • S3 stores all raw data and the normalize process has a function that can be swapped in/out on demand to re-build from scratch whenever needed or desired.
    • The analyze and report jobs run on all of the data, so they will continue to work even without ever caring if we reload from scratch.

How does my system fall short and how could it be improved?

  • I could write it in additional checks throughout each of the 4 phases and report out on each, so that failures would be isolated to specific portions of the code within each phase.