In [2]:
#ACCESSY_KEY_ID = "xxxx"
#SECERET_ACCESS_KEY = "xxxxx" 

#mounts_list = [
#{'bucket':'dannyfyp', 'mount_folder':'/mnt/apacheLog'}
#]

# Setup AWS configuration
import urllib
ACCESS_KEY = "xxx"
SECRET_KEY = "xxxx"
ENCODED_SECRET_KEY = urllib.quote(SECRET_KEY, "")
AWS_BUCKET_NAME = "dannyfyp"
MOUNT_NAME = "apacheLogs"


# Mount S3 bucket
try:
  dbutils.fs.mount("s3n://%s:%s@%s/" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
  #dbutils.fs.unmount("/mnt/apacheLogs")
  
except Exception:
  pass

In [3]:
1+1

In [4]:
inputPath= "/mnt/apacheLogs"

In [5]:
display(dbutils.fs.ls("/mnt/apacheLogs"))

In [6]:
spark

In [7]:
llk= spark.read.text("/mnt/apacheLogs")
llk.count()

In [8]:
#llk.show()
display(llk.limit(10))

In [9]:
from pyspark.sql.functions import split, regexp_extract
from pyspark.sql.types import *

split_df =llk.select(#regexp_extract('value', r'^([^\s]+\s)', 1).alias('a'),
                     #regexp_extract('value', r'^(\w+)', 1).alias('b'),
                     regexp_extract('value', r'^.*(\d+\.\d+\.\d+\.\d+)', 1).alias('host'),
                     #regexp_extract('value', r'.*^([^\s]+\s)', 1).alias('host'),
                     regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
                     regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
                     regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
                     regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_df.show(truncate=False)

In [10]:
display(split_df.limit(10))

In [11]:
bad_rows_df = split_df.filter(split_df['host'].isNull() |
                              split_df['timestamp'].isNull() |
                              split_df['path'].isNull() |
                              split_df['status'].isNull() |
                             split_df['content_size'].isNull())
bad_rows_df.count()

In [12]:
from pyspark.sql.functions import col, sum

def count_null(col_name):
  return sum(col(col_name).isNull().cast('integer')).alias(col_name)

# Build up a list of column expressions, one per column.
#
# This could be done in one line with a Python list comprehension, but we're keeping
# it simple for those who don't know Python very well.
exprs = []
for col_name in split_df.columns:
  exprs.append(count_null(col_name))

# Run the aggregation. The *exprs converts the list of expressions into
# variable function arguments.
split_df.agg(*exprs).show()

In [13]:
# Replace all null content_size values with 0.
cleaned_df = split_df.na.fill({'content_size': 0})

In [14]:
exprs = []
for col_name in cleaned_df.columns:
  exprs.append(count_null(col_name))

cleaned_df.agg(*exprs).show()

In [15]:
month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}

def parse_clf_time(s):
    """ Convert Common Log time format into a Python datetime object
    Args:
        s (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
    Returns:
        a string suitable for passing to CAST('timestamp')
    """
    # NOTE: We're ignoring time zone here. In a production application, you'd want to handle that.
    return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
      int(s[7:11]),
      month_map[s[3:6]],
      int(s[0:2]),
      int(s[12:14]),
      int(s[15:17]),
      int(s[18:20])
    )

u_parse_time = udf(parse_clf_time)

logs_df = cleaned_df.select('*', u_parse_time(split_df['timestamp']).cast('timestamp').alias('time')).drop('timestamp')
total_log_entries = logs_df.count()

In [16]:
display(logs_df.limit(10))

In [17]:
logs_df.cache()

In [18]:
status_to_count_df =(logs_df
                     .groupBy('status')
                     .count()
                     .sort('status')
                     .cache())

In [19]:
display(status_to_count_df)

In [20]:
host_sum_df =(logs_df
              .groupBy('host')
              .count()
             .sort('count', ascending=False))

display(host_sum_df)

In [21]:
paths_df = (logs_df
            .groupBy('path')
            .count()
            .sort('count', ascending=False)).take(2)
display(paths_df)

In [22]:
from pyspark.sql.functions import dayofmonth

day_to_host_pair_df = logs_df.select('host', dayofmonth('time').alias('day'))
day_group_hosts_df = day_to_host_pair_df.dropDuplicates()

daily_hosts_df = day_group_hosts_df.groupBy('day').count().withColumnRenamed('count', 'uniqueHost').sort('count', ascending= False).cache()

print 'Unique hosts per day:'
daily_hosts_df.sort('day', ascending= True).show(30, False)

days_with_hosts, hosts = [list(r) for r in zip(*daily_hosts_df.collect())]
#0.27s


  
print(days_with_hosts)
print(hosts)

In [23]:
display(daily_hosts_df)

In [24]:
total_req_per_day_df = logs_df.select('host', dayofmonth('time').alias('day')).groupBy('day').count()

avg_daily_req_per_host_df = (
  total_req_per_day_df
  .join(daily_hosts_df, 'day', 'left_outer')
  .select('day', (col('count') / col('uniqueHost'))
          .cast('integer')
          .alias('avg_reqs_per_host_per_day'))
  .cache()
)

In [25]:
print("Average daily hosts are given by:")
avg_daily_req_per_host_df.show()

In [26]:
from pyspark.sql.functions import *
rrStream= (
  spark    
    .readStream                       
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .text(inputPath)
)
spark.conf.set("spark.sql.shuffle.partitions", "1")

In [27]:
rrStream.isStreaming

In [28]:
spark.conf.set("spark.sql.shuffle.partitions", "1")

In [29]:
display(rrStream)

In [30]:
logStream= (
rrStream.select(#regexp_extract('value', r'^([^\s]+\s)', 1).alias('a'),
                     #regexp_extract('value', r'^(\w+)', 1).alias('b'),
                     regexp_extract('value', r'^.*(\d+\.\d+\.\d+\.\d+)', 1).alias('host'),
                     #regexp_extract('value', r'.*^([^\s]+\s)', 1).alias('host'),
                     regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('time_stamp'),
                     regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
                     regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
                     regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
)

In [31]:
logStream.dtypes

In [32]:
display(logStream)

In [33]:
status_to_count_df =(logStream
                     .groupBy('status')
                     .count()
                     .sort('count', ascending= False)
                   )

In [34]:
display(status_to_count_df)

In [35]:
host_sum_dfz =(logStream
              .groupBy('host')
              .count()
             .sort('count', ascending=False))

display(host_sum_dfz)

In [36]:
#split_df['timestamp']).cast('timestamp').alias('time')).drop('timestamp')
#fireServiceCallsTsDF = fireServiceCallsDF \
  #.withColumn('CallDateTS', unix_timestamp(fireServiceCallsDF['CallDate'], from_pattern1).cast("timestamp")) \
  #.drop('CallDate')

from_pattern = 'dd/MMM/yyyy hh:mm:ss z'
#to_pattern2 = 'yyyy-MMM-dd hh:mm:ss aa'
to_pattern = 'yyyy-MM-dd hh:mm'


#logStream= logStream.withColumn('Timestamp', unix_timestamp(logStream['time_stamp'], from_pattern2).cast("timestamp"))
logStreamz= logStream.withColumn('Timestamp', from_unixtime(unix_timestamp(logStream['time_stamp'], from_pattern), to_pattern))

In [37]:
#trying to fix time_stamp to unix timestamp
display(logStreamz)

In [38]:
"""
streamingCountsDF = (                 
  rrStream
    .groupBy(
      "host", 
      window(rrStream.timestamp, "1 hour"))
    .count().select("host", date_format("window.end", "MMM-dd HH:mm").alias("time"), "count").orderBy("time", "host")
  
  )
  """

In [39]:
"""import urllib2

for r in ip_add:
  url = 'http://freegeoip.net/json/' + 'r'

  k= urllib2.urlopen(url).read()
  print(k)

"""

ipz_df= logs_df.select('host').dropDuplicates()

ipz_df.show()

In [40]:
import urllib2
from pyspark.sql import SQLContext, Row
ipz= logs_df.select('host').dropDuplicates().rdd

def getIP(ip):
  url = 'http://freegeoip.net/csv/' + ip
  str = urllib2.urlopen(url).read()
  cca2 = str.split(",")[1]
  country= str.split(",")[2]
  Lat= str.split(",")[8]
  Long= str.split(",")[9]
  
  #deeds= [cca2, country, Lat, Long]
  #deeds= {'cca2': cca2, 'country':country, 'latitude': Lat, 'longitude': Long}
  return cca2

mappedIPs = ipz.map(lambda x: (x[0], getIP(x[0])))

mappedIPs.take(10)

#ipz.map(lambda x: x[0])

In [41]:
# mappedIP2: contains the IP address and CCA2 codes
# The schema is encoded in a string.
schemaString = "ip cca2"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Create DataFrame with schema
mappedIP2 = sqlContext.createDataFrame(mappedIPs, schema)

mappedIP2.show()

In [42]:
mappedIP2.groupBy("ip", "cca2").count().show()

In [43]:
mappedIP2.registerTempTable("jayz")

In [44]:
%sql select * from jayz limit 10;

In [45]:
import urllib2
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *

# Obtain the unique agents from the accesslog table
ipaddresses = sqlContext.sql("select distinct ip1 from accesslog where ip1 is not null").rdd

# Convert None to Empty String
def xstr(s): 
  if s is None: 
    return '' 
  return str(s)

# getCCA2: Obtains two letter country code based on IP address
def getCCA2(ip):
  # Obtain CCA2 code from FreeGeoIP
  url = 'http://freegeoip.net/csv/' + ip
  str = urllib2.urlopen(url).read()
  cca2 = str.split(",")[1]
  
  # return
  return cca2

# Loop through distinct IP addresses and obtain two-letter country codes
mappedIPs = ipaddresses.map(lambda x: (x[0], getCCA2(x[0])))

# mappedIP2: contains the IP address and CCA2 codes
# The schema is encoded in a string.
schemaString = "ip cca2"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Create DataFrame with schema
mappedIP2 = sqlContext.createDataFrame(mappedIPs, schema)

# Obtain the Country Codes 
fields = sc.textFile("/mnt/tardis6/countrycodes/").map(lambda l: l.split(","))
countrycodes = fields.map(lambda x: Row(cn=x[0], cca2=x[1], cca3=x[2]))

# Country Codes DataFrame:
#   Create DataFrame (inferring schema using reflection)
countryCodesDF = sqlContext.createDataFrame(countrycodes)

# Join countrycodes with mappedIPsDF so we can have IP address and three-letter ISO country codes
mappedIP3 = mappedIP2 \
  .join(countryCodesDF, mappedIP2.cca2 == countryCodesDF.cca2, "left_outer") \
  .select(mappedIP2.ip, mappedIP2.cca2, countryCodesDF.cca3, countryCodesDF.cn)
  
# Register the mapping table
mappedIP3.registerTempTable("mappedIP3")


In [48]:
"""
val kafka = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "topic")
  .option("startingOffsets", "latest")
  .load()
  
  # Subscribe to 1 topic
ds1 = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics
ds2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
ds3 = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
"""