In [2]:
%run "/Users/mwc@databricks.com/_helper.py"
In [3]:
ACCESS_KEY = "[REPLACE_WITH_ACCESS_KEY]"
SECRET_KEY = "[REPLACE_WITH_SECRET_KEY]"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = get_bucket_name()
MOUNT_NAME = "mwc"
# Mount S3 bucket
try:
dbutils.fs.ls("/mnt/%s" % MOUNT_NAME)
except:
print "Mount not found. Attempting to mount..."
dbutils.fs.mount("s3n://%s:%s@%s/" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
In [4]:
display(dbutils.fs.ls("/mnt/mwc"))
In [5]:
%fs head dbfs:/mnt/mwc/accesslog/databricks.com-access.log
In [7]:
%sql
DROP TABLE IF EXISTS accesslog;
CREATE EXTERNAL TABLE accesslog (
ipaddress STRING,
clientidentd STRING,
userid STRING,
datetime STRING,
method STRING,
endpoint STRING,
protocol STRING,
responseCode INT,
contentSize BIGINT,
referrer STRING,
agent STRING,
duration STRING,
ip1 STRING,
ip2 STRING,
ip3 STRING,
ip4 STRING
)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \\"(\\S+) (\\S+) (\\S+)\\" (\\d{3}) (\\d+) \\"(.*)\\" \\"(.*)\\" (\\S+) \\"(\\S+), (\\S+), (\\S+), (\\S+)\\"'
)
LOCATION
"/mnt/mwc/accesslog/"
In [8]:
%sql select ipaddress, datetime, method, endpoint, protocol, responsecode, agent from accesslog limit 10;
In [10]:
%sql
DROP TABLE IF EXISTS distinct_ips;
create table distinct_ips as select distinct ip1 from accesslog where ip1 is not null;
select count(*) from distinct_ips;
In [11]:
import urllib2
import json
api_key = get_ip_loc_api_key()
def getCCA2(ip):
url = 'http://api.db-ip.com/addrinfo?addr=' + ip + '&api_key=%s' % api_key
str = json.loads(urllib2.urlopen(url).read())
return str['country'].encode('utf-8')
sqlContext.udf.register("mapCCA2", getCCA2)
In [12]:
%sql
DROP TABLE IF EXISTS mapIps;
CREATE TABLE mapIps AS SELECT ip1 AS ip, mapCCA2(ip1) AS cca2 FROM distinct_ips;
In [13]:
%sql SELECT * FROM mapIps LIMIT 40
In [14]:
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
fields = sc.textFile("/mnt/mwc/countrycodes/").map(lambda l: l.split(","))
countrycodes = fields.map(lambda x: Row(cn=x[0], cca2=x[1], cca3=x[2]))
sqlContext.createDataFrame(countrycodes).registerTempTable("countryCodes")
In [15]:
%sql
SELECT * FROM countryCodes LIMIT 20
In [16]:
%sql
SELECT ip, `mapIps`.cca2 as cca2, `countryCodes`.cca3 as cca3, cn FROM mapIps LEFT OUTER JOIN countryCodes where mapIps.cca2 = countryCodes.cca2
In [18]:
from user_agents import parse
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
# Convert None to Empty String
def xstr(s):
if s is None:
return ''
return str(s)
# Create UDFs to extract out Browser Family and OS Family information
def browserFamily(ua_string) : return xstr(parse(xstr(ua_string)).browser.family)
def osFamily(ua_string) : return xstr(parse(xstr(ua_string)).os.family)
sqlContext.udf.register("browserFamily", browserFamily)
sqlContext.udf.register("osFamily", osFamily)
In [19]:
%sql
DROP TABLE IF EXISTS userAgentTable;
DROP TABLE IF EXISTS userAgentInfo;
CREATE TABLE userAgentTable AS SELECT DISTINCT agent FROM accesslog;
CREATE TABLE userAgentInfo AS SELECT agent, osFamily(agent) as OSFamily, browserFamily(agent) as browserFamily FROM userAgentTable;
In [20]:
%sql
SELECT browserFamily, count(1) FROM UserAgentInfo group by browserFamily
To make finish basic preparation of these web logs, we will do the following:
In [22]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf
import time
# weblog2Time function
# Input: 04/Nov/2015:08:15:00 +0000
# Output: 2015-11-04 08:15:00
def weblog2Time(weblog_timestr):
weblog_time = time.strptime(weblog_timestr, "%d/%b/%Y:%H:%M:%S +0000")
weblog_t = time.mktime(weblog_time)
return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(weblog_t))
# Register the UDF
sqlContext.udf.register("weblog2Time", weblog2Time)
From here I'll use the SQL notebook to continue the SQL analysis, but refer back to this notebook for any user defined functions.