In [2]:
#ACCESSY_KEY_ID = "xxxx"
#SECERET_ACCESS_KEY = "xxxxx"
#mounts_list = [
#{'bucket':'dannyfyp', 'mount_folder':'/mnt/apacheLog'}
#]
# Setup AWS configuration
import urllib
ACCESS_KEY = "xxx"
SECRET_KEY = "xxxx"
ENCODED_SECRET_KEY = urllib.quote(SECRET_KEY, "")
AWS_BUCKET_NAME = "dannyfyp"
MOUNT_NAME = "apacheLogs"
# Mount S3 bucket
try:
dbutils.fs.mount("s3n://%s:%s@%s/" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
#dbutils.fs.unmount("/mnt/apacheLogs")
except Exception:
pass
In [3]:
1+1
In [4]:
inputPath= "/mnt/apacheLogs"
In [5]:
display(dbutils.fs.ls("/mnt/apacheLogs"))
In [6]:
spark
In [7]:
llk= spark.read.text("/mnt/apacheLogs")
llk.count()
In [8]:
#llk.show()
display(llk.limit(10))
In [9]:
from pyspark.sql.functions import split, regexp_extract
from pyspark.sql.types import *
split_df =llk.select(#regexp_extract('value', r'^([^\s]+\s)', 1).alias('a'),
#regexp_extract('value', r'^(\w+)', 1).alias('b'),
regexp_extract('value', r'^.*(\d+\.\d+\.\d+\.\d+)', 1).alias('host'),
#regexp_extract('value', r'.*^([^\s]+\s)', 1).alias('host'),
regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_df.show(truncate=False)
In [10]:
display(split_df.limit(10))
In [11]:
bad_rows_df = split_df.filter(split_df['host'].isNull() |
split_df['timestamp'].isNull() |
split_df['path'].isNull() |
split_df['status'].isNull() |
split_df['content_size'].isNull())
bad_rows_df.count()
In [12]:
from pyspark.sql.functions import col, sum
def count_null(col_name):
return sum(col(col_name).isNull().cast('integer')).alias(col_name)
# Build up a list of column expressions, one per column.
#
# This could be done in one line with a Python list comprehension, but we're keeping
# it simple for those who don't know Python very well.
exprs = []
for col_name in split_df.columns:
exprs.append(count_null(col_name))
# Run the aggregation. The *exprs converts the list of expressions into
# variable function arguments.
split_df.agg(*exprs).show()
In [13]:
# Replace all null content_size values with 0.
cleaned_df = split_df.na.fill({'content_size': 0})
In [14]:
exprs = []
for col_name in cleaned_df.columns:
exprs.append(count_null(col_name))
cleaned_df.agg(*exprs).show()
In [15]:
month_map = {
'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}
def parse_clf_time(s):
""" Convert Common Log time format into a Python datetime object
Args:
s (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
Returns:
a string suitable for passing to CAST('timestamp')
"""
# NOTE: We're ignoring time zone here. In a production application, you'd want to handle that.
return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
int(s[7:11]),
month_map[s[3:6]],
int(s[0:2]),
int(s[12:14]),
int(s[15:17]),
int(s[18:20])
)
u_parse_time = udf(parse_clf_time)
logs_df = cleaned_df.select('*', u_parse_time(split_df['timestamp']).cast('timestamp').alias('time')).drop('timestamp')
total_log_entries = logs_df.count()
In [16]:
display(logs_df.limit(10))
In [17]:
logs_df.cache()
In [18]:
status_to_count_df =(logs_df
.groupBy('status')
.count()
.sort('status')
.cache())
In [19]:
display(status_to_count_df)
In [20]:
host_sum_df =(logs_df
.groupBy('host')
.count()
.sort('count', ascending=False))
display(host_sum_df)
In [21]:
paths_df = (logs_df
.groupBy('path')
.count()
.sort('count', ascending=False)).take(2)
display(paths_df)
In [22]:
from pyspark.sql.functions import dayofmonth
day_to_host_pair_df = logs_df.select('host', dayofmonth('time').alias('day'))
day_group_hosts_df = day_to_host_pair_df.dropDuplicates()
daily_hosts_df = day_group_hosts_df.groupBy('day').count().withColumnRenamed('count', 'uniqueHost').sort('count', ascending= False).cache()
print 'Unique hosts per day:'
daily_hosts_df.sort('day', ascending= True).show(30, False)
days_with_hosts, hosts = [list(r) for r in zip(*daily_hosts_df.collect())]
#0.27s
print(days_with_hosts)
print(hosts)
In [23]:
display(daily_hosts_df)
In [24]:
total_req_per_day_df = logs_df.select('host', dayofmonth('time').alias('day')).groupBy('day').count()
avg_daily_req_per_host_df = (
total_req_per_day_df
.join(daily_hosts_df, 'day', 'left_outer')
.select('day', (col('count') / col('uniqueHost'))
.cast('integer')
.alias('avg_reqs_per_host_per_day'))
.cache()
)
In [25]:
print("Average daily hosts are given by:")
avg_daily_req_per_host_df.show()
In [26]:
from pyspark.sql.functions import *
rrStream= (
spark
.readStream
.option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
.text(inputPath)
)
spark.conf.set("spark.sql.shuffle.partitions", "1")
In [27]:
rrStream.isStreaming
In [28]:
spark.conf.set("spark.sql.shuffle.partitions", "1")
In [29]:
display(rrStream)
In [30]:
logStream= (
rrStream.select(#regexp_extract('value', r'^([^\s]+\s)', 1).alias('a'),
#regexp_extract('value', r'^(\w+)', 1).alias('b'),
regexp_extract('value', r'^.*(\d+\.\d+\.\d+\.\d+)', 1).alias('host'),
#regexp_extract('value', r'.*^([^\s]+\s)', 1).alias('host'),
regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('time_stamp'),
regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
)
In [31]:
logStream.dtypes
In [32]:
display(logStream)
In [33]:
status_to_count_df =(logStream
.groupBy('status')
.count()
.sort('count', ascending= False)
)
In [34]:
display(status_to_count_df)
In [35]:
host_sum_dfz =(logStream
.groupBy('host')
.count()
.sort('count', ascending=False))
display(host_sum_dfz)
In [36]:
#split_df['timestamp']).cast('timestamp').alias('time')).drop('timestamp')
#fireServiceCallsTsDF = fireServiceCallsDF \
#.withColumn('CallDateTS', unix_timestamp(fireServiceCallsDF['CallDate'], from_pattern1).cast("timestamp")) \
#.drop('CallDate')
from_pattern = 'dd/MMM/yyyy hh:mm:ss z'
#to_pattern2 = 'yyyy-MMM-dd hh:mm:ss aa'
to_pattern = 'yyyy-MM-dd hh:mm'
#logStream= logStream.withColumn('Timestamp', unix_timestamp(logStream['time_stamp'], from_pattern2).cast("timestamp"))
logStreamz= logStream.withColumn('Timestamp', from_unixtime(unix_timestamp(logStream['time_stamp'], from_pattern), to_pattern))
In [37]:
#trying to fix time_stamp to unix timestamp
display(logStreamz)
In [38]:
"""
streamingCountsDF = (
rrStream
.groupBy(
"host",
window(rrStream.timestamp, "1 hour"))
.count().select("host", date_format("window.end", "MMM-dd HH:mm").alias("time"), "count").orderBy("time", "host")
)
"""
In [39]:
"""import urllib2
for r in ip_add:
url = 'http://freegeoip.net/json/' + 'r'
k= urllib2.urlopen(url).read()
print(k)
"""
ipz_df= logs_df.select('host').dropDuplicates()
ipz_df.show()
In [40]:
import urllib2
from pyspark.sql import SQLContext, Row
ipz= logs_df.select('host').dropDuplicates().rdd
def getIP(ip):
url = 'http://freegeoip.net/csv/' + ip
str = urllib2.urlopen(url).read()
cca2 = str.split(",")[1]
country= str.split(",")[2]
Lat= str.split(",")[8]
Long= str.split(",")[9]
#deeds= [cca2, country, Lat, Long]
#deeds= {'cca2': cca2, 'country':country, 'latitude': Lat, 'longitude': Long}
return cca2
mappedIPs = ipz.map(lambda x: (x[0], getIP(x[0])))
mappedIPs.take(10)
#ipz.map(lambda x: x[0])
In [41]:
# mappedIP2: contains the IP address and CCA2 codes
# The schema is encoded in a string.
schemaString = "ip cca2"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Create DataFrame with schema
mappedIP2 = sqlContext.createDataFrame(mappedIPs, schema)
mappedIP2.show()
In [42]:
mappedIP2.groupBy("ip", "cca2").count().show()
In [43]:
mappedIP2.registerTempTable("jayz")
In [44]:
%sql select * from jayz limit 10;
In [45]:
import urllib2
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
# Obtain the unique agents from the accesslog table
ipaddresses = sqlContext.sql("select distinct ip1 from accesslog where ip1 is not null").rdd
# Convert None to Empty String
def xstr(s):
if s is None:
return ''
return str(s)
# getCCA2: Obtains two letter country code based on IP address
def getCCA2(ip):
# Obtain CCA2 code from FreeGeoIP
url = 'http://freegeoip.net/csv/' + ip
str = urllib2.urlopen(url).read()
cca2 = str.split(",")[1]
# return
return cca2
# Loop through distinct IP addresses and obtain two-letter country codes
mappedIPs = ipaddresses.map(lambda x: (x[0], getCCA2(x[0])))
# mappedIP2: contains the IP address and CCA2 codes
# The schema is encoded in a string.
schemaString = "ip cca2"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Create DataFrame with schema
mappedIP2 = sqlContext.createDataFrame(mappedIPs, schema)
# Obtain the Country Codes
fields = sc.textFile("/mnt/tardis6/countrycodes/").map(lambda l: l.split(","))
countrycodes = fields.map(lambda x: Row(cn=x[0], cca2=x[1], cca3=x[2]))
# Country Codes DataFrame:
# Create DataFrame (inferring schema using reflection)
countryCodesDF = sqlContext.createDataFrame(countrycodes)
# Join countrycodes with mappedIPsDF so we can have IP address and three-letter ISO country codes
mappedIP3 = mappedIP2 \
.join(countryCodesDF, mappedIP2.cca2 == countryCodesDF.cca2, "left_outer") \
.select(mappedIP2.ip, mappedIP2.cca2, countryCodesDF.cca3, countryCodesDF.cn)
# Register the mapping table
mappedIP3.registerTempTable("mappedIP3")
spark intergration with kafka 10. See link:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
In [48]:
"""
val kafka = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "topic")
.option("startingOffsets", "latest")
.load()
# Subscribe to 1 topic
ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics
ds2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern
ds3 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
"""