In [1]:
# import requests, StringIO, pandas as pd, json, re
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
#reference the tweet data from dashDB, 3 tables are pulled into dataframes
tweetsdf = sqlContext.read.format('jdbc').options(url='jdbc:db2://awh-yp-small02.services.dal.bluemix.net:50000/BLUDB', user='dash111118', password='XXXXXXXXXX',  dbtable='dash111118.HYATT_TWEETS').load()
sentimentdf = sqlContext.read.format('jdbc').options(url='jdbc:db2://awh-yp-small02.services.dal.bluemix.net:50000/BLUDB', user='dash111118', password='XXXXXXXXXX',  dbtable='dash111118.HYATT_SENTIMENTS').load()
locationdf = sqlContext.read.format('jdbc').options(url='jdbc:db2://awh-yp-small02.services.dal.bluemix.net:50000/BLUDB', user='dash111118', password='XXXXXXXXXX',  dbtable='dash111118.HYATT_LOCATIONS').load()

In [3]:
#import packages, instantiate dataframe, make cloudant connection, load cloudant data into dataframe
import requests, StringIO, pandas as pd, json, re

#This connectects to a Cloudant repository containing dummy geographic data of Hyatt locations
hyattlocations_df = sqlContext.read.format("com.cloudant.spark").\
option("cloudant.host","ajlevine.cloudant.com").\
option("cloudant.username", "ajlevine").\
option("cloudant.password","XXXXXXXXXX").\
load("hyattlocations")

In [4]:
#test data for posterity
tweetsdf.printSchema()


root
 |-- MESSAGE_ID: string (nullable = true)
 |-- MESSAGE_BODY: string (nullable = true)
 |-- MESSAGE_FAVORITES_COUNT: integer (nullable = true)
 |-- MESSAGE_INREPLYTO_URL: string (nullable = true)
 |-- MESSAGE_URL: string (nullable = true)
 |-- MESSAGE_POSTED_TIME: timestamp (nullable = true)
 |-- MESSAGE_RETWEET_COUNT: integer (nullable = true)
 |-- MESSAGE_LANGUAGE: string (nullable = true)
 |-- MESSAGE_ACTION: string (nullable = true)
 |-- MESSAGE_GENERATOR_DISPLAY_NAME: string (nullable = true)
 |-- MESSAGE_COUNTRY: string (nullable = true)
 |-- MESSAGE_LOCATION_DISPLAY_NAME: string (nullable = true)
 |-- MESSAGE_LOCATION: string (nullable = true)
 |-- MESSAGE_COUNTRY_CODE: string (nullable = true)
 |-- USER_ID: string (nullable = true)
 |-- USER_GENDER: string (nullable = true)
 |-- USER_DISPLAY_NAME: string (nullable = true)
 |-- USER_FAVORITES_COUNT: integer (nullable = true)
 |-- USER_FOLLOWERS_COUNT: integer (nullable = true)
 |-- USER_FRIENDS_COUNT: integer (nullable = true)
 |-- USER_IMAGE_URL: string (nullable = true)
 |-- USER_URL: string (nullable = true)
 |-- USER_LISTED_COUNT: integer (nullable = true)
 |-- USER_REGISTER_TIME: timestamp (nullable = true)
 |-- USER_SCREEN_NAME: string (nullable = true)
 |-- USER_STATUSES_COUNT: integer (nullable = true)
 |-- USER_SUMMARY: string (nullable = true)
 |-- USER_CITY: string (nullable = true)
 |-- USER_LOCATION_DISPLAY_NAME: string (nullable = true)
 |-- USER_STATE: string (nullable = true)
 |-- USER_COUNTRY: string (nullable = true)
 |-- USER_COUNTRY_CODE: string (nullable = true)
 |-- USER_SUB_REGION: string (nullable = true)


In [5]:
hyattlocations_df.printSchema()


root
 |-- _id: string (nullable = true)
 |-- _rev: string (nullable = true)
 |-- areaCode: long (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- hotelID: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- metroCode: long (nullable = true)
 |-- postalCode: long (nullable = true)
 |-- region: string (nullable = true)


In [6]:
#convert tweet table to Pandas frame
tweetPD = tweetsdf.toPandas()
tweetPD['MESSAGE_ID'] = tweetPD['MESSAGE_ID'].map(lambda x: x.lstrip('tag:search.twitter.com,'))
tweetPD = tweetPD.set_index(tweetPD["MESSAGE_ID"])
tweetPD.drop(['MESSAGE_ID'], axis=1, inplace=True)
tweetPD.head(3)


Out[6]:
MESSAGE_BODY MESSAGE_FAVORITES_COUNT MESSAGE_INREPLYTO_URL MESSAGE_URL MESSAGE_POSTED_TIME MESSAGE_RETWEET_COUNT MESSAGE_LANGUAGE MESSAGE_ACTION MESSAGE_GENERATOR_DISPLAY_NAME MESSAGE_COUNTRY ... USER_REGISTER_TIME USER_SCREEN_NAME USER_STATUSES_COUNT USER_SUMMARY USER_CITY USER_LOCATION_DISPLAY_NAME USER_STATE USER_COUNTRY USER_COUNTRY_CODE USER_SUB_REGION
MESSAGE_ID
2005:629425321276755968 bricknosh hits Bricks By The Bay! 💥 #bricksby... 0 None http://twitter.com/bricknosh/statuses/62942532... 2015-08-06 22:54:25 0 en post Instagram None ... 2015-02-01 06:33:45 bricknosh 32 Kiera & Fisch: LEGO enthusiasts, AFOLs in love... None California California United States US None
2005:629514333190529025 Apply now to work for hyatt as #Pastry #Intern... 0 None http://twitter.com/NeuvooIntAtl/statuses/62951... 2015-08-07 04:48:07 0 en post seed.mytweetsys.app.t00 None ... 2015-04-01 14:35:10 NeuvooIntAtl 10540 Looking for a Internship job in Atlanta? Chec... Atlanta Atlanta, Georgia Georgia United States US Fulton County
2005:629586671940562944 Hilton Worldwide and Hyatt Hotels Bring 300,00... 0 None http://twitter.com/HotelGlove/statuses/6295866... 2015-08-07 09:35:34 0 en post IFTTT None ... 2013-03-03 18:37:29 HotelGlove 34815 Curated luxury hotels you can book right now. None In a Hotel None None None None

3 rows × 32 columns


In [7]:
#convert Sentiment table to Pandas frame
sentimentPD = sentimentdf.toPandas()
sentimentPD['MESSAGE_ID'] = sentimentPD['MESSAGE_ID'].map(lambda x: x.lstrip('tag:search.twitter.com,'))
sentimentPD = sentimentPD.set_index(sentimentPD["MESSAGE_ID"])
sentimentPD.drop(['MESSAGE_ID'], axis=1, inplace=True)
sentimentPD.head(3)


Out[7]:
SENTIMENT_POLARITY SENTIMENT_TERM
MESSAGE_ID
2005:729347842368782338 POSITIVE fan
2005:736228502039461888 POSITIVE win
2005:736834560495734789 POSITIVE Perfect

In [8]:
#convert location table to Pandas frame

locationdf = locationdf.filter(locationdf['USER_LOCATION'] != "None")
locationPD = locationdf.toPandas()

#strip out unnecessary text 
locationPD['MESSAGE_ID'] = locationPD['MESSAGE_ID'].map(lambda x: x.lstrip('tag:search.twitter.com,'))
locationPD['USER_LOCATION'] = locationPD['USER_LOCATION'].map(lambda x: x.lstrip('POINT (').rstrip(')'))
#set message ID as the index
locationPD = locationPD.set_index(locationPD["MESSAGE_ID"])
locationPD.drop(['MESSAGE_ID'], axis=1, inplace=True)
locationPD.drop(['MESSAGE_LOCATION'], axis=1, inplace=True)
locationPD.head(3)


Out[8]:
USER_LOCATION
MESSAGE_ID
2005:549429828186017793 39.2191700 21.5169400
2005:549404785234169857 -87.6500500 41.8500300
2005:549498773735735296 2.0000000 46.0000000

In [9]:
#convert location data from cloudant to Pandas frame
hyattlocationsPD = hyattlocations_df.toPandas()

In [10]:
hyattlocationsPD.head(3)


Out[10]:
_id _rev areaCode city country hotelID latitude longitude metroCode postalCode region
0 f2665587412215b10f94b1982c7a5468 1-3f9899d0fe1e1152fd8a053357719b19 914 Valhalla US 605 41.0877 -73.7768 501 10595 NY
1 f2665587412215b10f94b1982c7a63f8 1-98757615ae4bcda07eb34175113e6128 412 Pittsburgh US 606 40.4406 -79.9959 508 15222 PA
2 f2665587412215b10f94b1982c7a6c05 1-9b03407922f1a9be10baba59661af62f 314 Bridgeton US 607 38.7667 -90.4201 609 63044 MO

In [11]:
#split user location column into latitude and longitude columns
foo = lambda x: pd.Series([i for i in reversed(x.split(' '))])
rev = locationPD['USER_LOCATION'].apply(foo)

rev.rename(columns={0:'Latitude',1:'Longitude'},inplace=True)
rev = rev[['Latitude','Longitude']]

locationPD = locationPD.join(rev, on=None, how='inner')

locationPD.drop(['USER_LOCATION'], axis=1, inplace=True)

#join sentiment table with tweet table on ID column 
sentimentJoin = tweetPD.join(sentimentPD, on=None, how='inner')
sentimentJoin.head(3)


Out[11]:
MESSAGE_BODY MESSAGE_FAVORITES_COUNT MESSAGE_INREPLYTO_URL MESSAGE_URL MESSAGE_POSTED_TIME MESSAGE_RETWEET_COUNT MESSAGE_LANGUAGE MESSAGE_ACTION MESSAGE_GENERATOR_DISPLAY_NAME MESSAGE_COUNTRY ... USER_STATUSES_COUNT USER_SUMMARY USER_CITY USER_LOCATION_DISPLAY_NAME USER_STATE USER_COUNTRY USER_COUNTRY_CODE USER_SUB_REGION SENTIMENT_POLARITY SENTIMENT_TERM
MESSAGE_ID
2005:517465393057648642 Hyatt Regency Pier 66 Amid lush tropical acres... 0 None http://twitter.com/Hotelsescape/statuses/51746... 2014-10-02 00:05:38 0 en post Sprout Social None ... 1756 More hotels. More rates. More availability. Ho... Orlando Orlando Florida United States US Orange County POSITIVE lush
2005:517480253074452480 @MAliceTurner Thanks for joining us at the Gra... 0 http://twitter.com/MAliceTurner/statuses/51747... http://twitter.com/HyattConcierge/statuses/517... 2014-10-02 01:04:41 0 en post Spredfast app None ... 100200 HyattConcierge. Striving to make a difference ... None 535+ properties. 47 countries None None None None POSITIVE Thanks
2005:517480253074452480 @MAliceTurner Thanks for joining us at the Gra... 0 http://twitter.com/MAliceTurner/statuses/51747... http://twitter.com/HyattConcierge/statuses/517... 2014-10-02 01:04:41 0 en post Spredfast app None ... 100200 HyattConcierge. Striving to make a difference ... None 535+ properties. 47 countries None None None None POSITIVE Enjoy

3 rows × 34 columns


In [12]:
#delete any records that contain Hyatt in the name
#This strips out any tweets originating from Hyatt, and any tweets by someone who's name contains "Hyatt" that might skew the results

sentimentJoin = sentimentJoin[~sentimentJoin.USER_DISPLAY_NAME.str.contains('hyatt')]
sentimentJoin = sentimentJoin[~sentimentJoin.USER_DISPLAY_NAME.str.contains('Hyatt')]
sentimentJoin = sentimentJoin.join(locationPD, on=None, how='inner')

In [13]:
#drop some unused columns

del sentimentJoin['MESSAGE_FAVORITES_COUNT']
del sentimentJoin['MESSAGE_INREPLYTO_URL']
del sentimentJoin['MESSAGE_URL']
del sentimentJoin['MESSAGE_LANGUAGE']
del sentimentJoin['MESSAGE_GENERATOR_DISPLAY_NAME']
del sentimentJoin['USER_STATUSES_COUNT']
del sentimentJoin['USER_SUMMARY']
del sentimentJoin['MESSAGE_ACTION']
del sentimentJoin['MESSAGE_COUNTRY_CODE']
del sentimentJoin['USER_LISTED_COUNT']
del sentimentJoin['USER_SUB_REGION']
del sentimentJoin['USER_REGISTER_TIME']
del sentimentJoin['MESSAGE_RETWEET_COUNT']
del sentimentJoin['USER_GENDER']
del sentimentJoin['USER_FAVORITES_COUNT']
del sentimentJoin['USER_IMAGE_URL']
del sentimentJoin['USER_URL']
del sentimentJoin['MESSAGE_COUNTRY']
del sentimentJoin['MESSAGE_LOCATION_DISPLAY_NAME']
del sentimentJoin['MESSAGE_LOCATION']
del sentimentJoin['USER_FOLLOWERS_COUNT']
del sentimentJoin['USER_FRIENDS_COUNT']
del sentimentJoin['USER_COUNTRY_CODE']

In [14]:
#install Seaborn statistical visualization tool 
!pip install --user seaborn


Requirement already satisfied (use --upgrade to upgrade): seaborn in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s808-1a68bb07328c75-5b7eca8c935f/.local/lib/python2.7/site-packages

In [15]:
%matplotlib inline

import matplotlib.pyplot as plt
# matplotlib.patches allows us create colored patches, we can use for legends in plots
import matplotlib.patches as mpatches
# seaborn also builds on matplotlib and adds graphical features and new plot types
import seaborn as sns

In [16]:
#cast data into float type
sentimentJoin['Latitude'] = sentimentJoin['Latitude'].astype(float)
sentimentJoin['Longitude'] = sentimentJoin['Longitude'].astype(float)

hyattlocationsPD['latitude'] = hyattlocationsPD['latitude'].astype(float)
hyattlocationsPD['longitude'] = hyattlocationsPD['longitude'].astype(float)

In [17]:
#adjust settings
sns.set_style("darkgrid")
plt.figure(figsize=(15,10))

#create scatterplots
plt.scatter(sentimentJoin.Longitude, sentimentJoin.Latitude, alpha=.9, s=4, color='darkseagreen')
plt.scatter(hyattlocationsPD.longitude, hyattlocationsPD.latitude, alpha=.9, s=4, color='purple')


#adjust more settings
plt.title('Locations of Tweets Across the World', size=25)
plt.ylim((-45,65))
plt.xlim((-150,150))
plt.xlabel('Longitude',size=2)
plt.ylabel('Latitude',size=2)

plt.show()



In [18]:
#adjust settings
sns.set_style("darkgrid")
plt.figure(figsize=(15,10))

#create scatterplots
plt.scatter(sentimentJoin.Longitude, sentimentJoin.Latitude, alpha=.9, s=4, color='darkseagreen')
plt.scatter(hyattlocationsPD.longitude, hyattlocationsPD.latitude, alpha=.9, s=4, color='purple')



#adjust more settings
plt.title('Locations of Tweets in America', size=25)
plt.ylim((20,55))
plt.xlim((-150,-50))
plt.xlabel('Longitude',size=20)
plt.ylabel('Latitude',size=20)

plt.show()



In [19]:
#break up into positive and negative tweets
positiveTweets = sentimentJoin[sentimentJoin.SENTIMENT_POLARITY.str.contains('POSITIVE')]
negativeTweets = sentimentJoin[sentimentJoin.SENTIMENT_POLARITY.str.contains('NEGATIVE')]

plt.figure(figsize=(15,10), dpi=0.1)

#create scatterplots
plt.scatter(negativeTweets.Longitude, negativeTweets.Latitude, color='red', s=20, marker ='.')
plt.scatter(positiveTweets.Longitude, positiveTweets.Latitude, s=10, color='green', marker ='.')
plt.scatter(hyattlocationsPD.longitude, hyattlocationsPD.latitude, alpha=.9, s=1, color='purple')


#create legend
red_patch = mpatches.Patch(color='red', label='Negative Tweets')
green_patch = mpatches.Patch(color='green', label='Positive Tweets')


#adjust more settings
plt.title('Positive and Negative Tweets', size=20)
plt.ylim((20,55))
plt.xlim((-150,-50))
plt.xlabel('Longitude',size=20)
plt.ylabel('Latitude',size=20)
plt.show()



In [20]:
#break up into positive and negative tweets
positiveTweets = sentimentJoin[sentimentJoin.SENTIMENT_POLARITY.str.contains('POSITIVE')]
negativeTweets = sentimentJoin[sentimentJoin.SENTIMENT_POLARITY.str.contains('NEGATIVE')]

plt.figure(figsize=(15,10), dpi=0.1)

#create scatterplots
plt.scatter(negativeTweets.Longitude, negativeTweets.Latitude, color='red', s=20, marker ='.')
plt.scatter(positiveTweets.Longitude, positiveTweets.Latitude, s=20, color='green', marker ='.')
plt.scatter(hyattlocationsPD.longitude, hyattlocationsPD.latitude, alpha=.9, s=10, color='purple')


#create legend
red_patch = mpatches.Patch(color='red', label='Negative Tweets')
green_patch = mpatches.Patch(color='green', label='Positive Tweets')


#adjust more settings
plt.title('Positive and Negative Tweets on the East Coast', size=20)
plt.ylim((37,42))
plt.xlim((-80,-65))
plt.xlabel('Longitude',size=20)
plt.ylabel('Latitude',size=20)
plt.show()



In [21]:
!pip install --user tweepy


Requirement already satisfied (use --upgrade to upgrade): tweepy in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s808-1a68bb07328c75-5b7eca8c935f/.local/lib/python2.7/site-packages
Requirement already satisfied (use --upgrade to upgrade): requests>=2.4.3 in /usr/local/src/bluemix_jupyter_bundle.v7/notebook/lib/python2.7/site-packages (from tweepy)
Requirement already satisfied (use --upgrade to upgrade): six>=1.7.3 in /usr/local/src/bluemix_jupyter_bundle.v7/notebook/lib/python2.7/site-packages (from tweepy)
Requirement already satisfied (use --upgrade to upgrade): requests-oauthlib>=0.4.1 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s808-1a68bb07328c75-5b7eca8c935f/.local/lib/python2.7/site-packages (from tweepy)
Requirement already satisfied (use --upgrade to upgrade): oauthlib>=0.6.2 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s808-1a68bb07328c75-5b7eca8c935f/.local/lib/python2.7/site-packages (from requests-oauthlib>=0.4.1->tweepy)

In [22]:
#method to send private twitter message to defined user. 

import tweepy

def get_api(cfg):
  auth = tweepy.OAuthHandler(cfg['consumer_key'], cfg['consumer_secret'])
  auth.set_access_token(cfg['access_token'], cfg['access_token_secret'])
  return tweepy.API(auth)

def sendDM(user,tweet):
  # Fill in the values noted in previous step here
  cfg = { 
    "consumer_key"        : "XXXXXXXXXXXXXXXXXXXXXXX",
    "consumer_secret"     : "XXXXXXXXXXXXXXXXXXXXXXX",
    "access_token"        : "XXXXXXXXXXXXXXXXXXXXXXX",
    "access_token_secret" : "XXXXXXXXXXXXXXXXXXXXXXX" 
    }

  api = get_api(cfg)
  status = api.send_direct_message(screen_name=user,text=tweet)

In [23]:
sendDM("ajlevine4e","We're Sorry you had a bad experience at a Hyatt Brand Hotel, Please take 10% off your next stay by using checkout code WERESORRY upon your next booking. For additional help, please contact us 24hours/day at 555-555-5555")

In [ ]: