Parser Code Lab

Intended Audience

This lab is for people who want to learn how to write and execute a Plaso parser in Python. This tutorial assumes:

  • You have a functional development environment
  • You have used Plaso
  • You are familiar with the Python programming language
  • You are looking to write a parser (as an opposed to a plugin, which will be a separate codelab)

Objective

This lab will teach you how to write a text parser with tests for the Plaso framework. by the end you will be able to:

  • Write a text parser for a CSV file
  • Write a formatter for the events created by the parser
  • Write unit tests for the parser
  • Run the parser as part of plaso/log2timeline

Expectations

This lab should take you a couple hours to complete. Some of this is dependent entirely on strange build issues you might have. We are not attempting to get you to check in code yet, this is more to demonstrate how a parser is written. For this to be a checked in parser you need to write a parser against a file that is not already parsed and split the code here into several files (layout explained below).

Introduction

Welcome to writing a Plaso parser! From the outside, writing a parser can be daunting, but once you get your dev environment going, you've fought half the battle. This code lab features a text parser, but the formula can be used for any type. Other codelabs for plugin writing will be available soon as well.

Before Starting

Get familiar with the developers guide and more specifically the style guide. To make the code easier to maintain we follow a style guide, partially based on the Google Python Style Guide but slightly modified to fit our needs.

We also follow a code review process that is discussed on the code review site.

This is an iPython notebook, and if you are not familiar with it then here is the brief introduction. This is basically an iPython shell wrapped up in a pretty GUI (browser window). You can execute any Python code you wish, and quickly go back, edit and re-run code. To run the code, click the window with the code segment and press "Shift+Enter", that way you will see that the bracket on the left will change to indicate it has been executed and you may see some output below (if the code segment produced any output).

One thing to make note of is that some of the code segments depend on previous code segments having been executed. So in order for this codelab to work properly you need to execute EVERY code segment that is presented here, especially all class declarations and import statements, but to be sure just execute them all [except those explicitly stated as optional].

The first thing we need to do is to make sure your development environment is up-to date. Run the following code snippet (below) by clicking the tab and pressing "SHIFT-ENTER". If you'll see a printed warning start by upgrading the tool before continuing.


In [ ]:
# Check the plaso version to make sure we have the latest plaso version installed (we need the dev version)
import plaso
# Check plaso dependencies
from plaso import dependencies

installed_version = plaso.__version__
installed_date = plaso.VERSION_DATE

# Now we need to download the latest version from github to compare.
# Import a library to make the HTTP connection.
import urllib2

# The URL to the plaso init file that contains version information.
url = u'https://raw.githubusercontent.com/log2timeline/plaso/master/plaso/__init__.py'

# Download the file.
response = urllib2.urlopen(url)

latest_date = None
latest_version = None
dev_version = False
line = response.readline()
while line:
  line.strip()
  if line.startswith(u'VERSION_DATE'):
    latest_date = line.split()[-1]
    latest_date = latest_date[1:-1]
  elif line.startswith(u'__version__'):
    latest_version = line.split()[-1]
    latest_version = latest_version[1:-1]

  line = response.readline()

if dev_version:
  latest_version = u'{0:s}_{1:s}'.format(latest_version, latest_date)

print u'*'*40
print u'Checking plaso:'
print u'-'*40

if latest_version != installed_version:
  print u'You are not running the latest version of plaso, please update.'
  print u'Current running version: {0:s} should be {1:s}'.format(installed_version, latest_version)

elif latest_date != installed_date:
  print u'There have been code commits since you last update, please upgrade before continuing.'
  print u'Current running version: {0:s} should be {1:s}'.format(installed_date, latest_date)  
    
else:
  print u'BINGO, you have the latest version of plaso installed.'

print u'\n'
print u'*'*80
print u'Check dependencies:'
print u'-'*80


# TODO: Re-enable this! When enabled for some reason no output was produced later.
#if dependencies.CheckDependencies(latest_version_check=True):
#  print u'You are all set, all dependencies are up-to-date'
#else:
#  print u'Make sure to go over the dependency list and install or upgrade the missing dependencies.'
#  print u'See the "Development Release" tab in the sidebar at: https://github.com/log2timeline/plaso/wiki'

To make it easier to get documentation about various classes we may use in the codelab we'll start with defining a simple function to print out help (so to execute, click the code segment below and press "SHIFT+ENTER").


In [ ]:
import inspect

# Let's put this in a method so we can easily call it from other parts of the codelab.
def PrintClassHelp(class_object, filter_string=''):
  """Prints a help string for a given class object.

  Args:
    class_object: The class that we are about to inspect.
    filter_string: Filter class members that start with a particular string.
  """
  # Print the docstring of the class.
  print u''
  print class_object.__doc__
   
  # Print information for every member function.
  additional_members = []
  for member_name, member_value in inspect.getmembers(class_object):
    # Check to see if we are filtering out members starting with
    # a particular string.
    if filter_string and not member_name.startswith(filter_string):
      continue
    if inspect.ismethod(member_value):
      args = inspect.getargspec(member_value)
      doc_string = member_value.__doc__
        
      print u'{0}{1:>20s}({2}){0:>10}\n\n{3}\n{4}\n\n'.format(
          '*'*5, member_name, u','.join(args.args), doc_string, '-'*80)
    else:
      if not member_name.startswith('_'):
        additional_members.append(u'{} = {}'.format(member_name, repr(member_value)))

  if additional_members:
    print '\n'
    print '*** Additional Members of Class ***\n\n ',
    print u'\n  '.join(additional_members)

Disclaimer

During this codelab we will be using the iPython notebook interface for everything, which means we have all the classes and code in a single file. Once we deploy the code to the actual codebase we would need to save the code in several places, typically something like:

  • plaso/parsers/myparser.py
  • tests/parsers/myparser.py
  • plaso/formatters/myparser.py
  • tests/formatters/myparser.py

And make necessary changes to:

  • plaso/parsers/__init__.py
  • plaso/formatters/__init__.py

To include the new parser and formatter in the tool. We may also want to change the plaso/frontend/presets.py to include the parser in a preset.

We are however omitting all these details to make the codelab easier to follow along. This can also be used for people to test their parsers and play with them without the need to mess with the codebase and once the parser is fully functional then create the necessary files and start the code review process.

There are also a lot of comments in the code in this codelab that would typically be omitted from a released parser. To see the actual code that is used as an example here click on one of the below links:

Writing the Parser

We are going to write the parser completely in this iPython notebook, and test it there too. There is no need for anything else than this notebook (and the plaso libs available). The test file used for the example is included in the codelab so no need even to download that.

Before writing a parser, and now we are assuming we are attempting to parse a text file, ask yourself these questions:

  • Examine the lines, what do they contain?
  • Is there a header?
  • How are the timestamps formatted?
  • Is the timestamp stored in the systems local time zone or a common one (like UTC).
  • Does the file have a single line per entry? As in does a single line in the log file represent a single event?
  • Does the line have a fixed delimiter?
  • Are there always equal amount of fields in the file?

Remember that we are not about to submit this parser in for review, since it is already checked in, this is only for demonstration purposes, please refer to the plaso roadmap for open parser assignments (or add your own).

The header

First things first, every file checked into the project needs a header. That header contains among other a docstring as well as import statements.

The first line should be an encoding defintion. After that there is a doc string that needs to be created, it should not be longer than 80 characters in width. If you need more than a single line to describe the parser please still only use max 80 characters as the first line, ending with a dot. Then you can create a more detailed description two lines down (an example of that can be seen below).

The import order is defined in the style guide:

Imports are always put at the top of the file, just after any module comments and doc strings and before module globals and constants. 
Imports should be grouped with the order being most generic to least generic:

+ standard library imports
+ third-party imports
+ application-specific imports

Within each grouping, imports should be sorted lexicographically, ignoring case, according to each module's full package path.

In [ ]:
# -*- coding: utf-8 -*-
"""Parser for McAfee Anti-Virus Logs.

McAfee AV uses 4 logs to track when scans were run, when virus databases were
updated, and when files match the virus database."""

# A library that contains the EventObject for text based parsers.
from plaso.events import text_events
from plaso.lib import errors

# A library that contains all timestamp manipulations.
from plaso.lib import timelib

# A library that contains the parser manager, required for registering new parser.
from plaso.parsers import manager

# A library that contains assistants for text parsing.
from plaso.parsers import text_parser

The Parser Class

We now need to know what kind of parser you are trying to implement. ADDME we are in the progress of writing a documentation to assist people writing new parsers, describing the various different assistants available however for now we are trying to parse a simple text file, with one line per entry. For text parsing ATM we have the following choices:

  • Simple Text Parser (TextCSVParser): Good for CSV files or other files with fixed delimiter and the same number of fields in every line.
  • Single line text parser (PyparsingSingleLineTextParser): Good for other type of text files where we have a single line per entry.
  • Multi line text parser (PyparsingMultiLineTextParser): Good for text files that may have multiple lines per entry.
  • Slow lexical text parser (SlowLexicalTextParser): Should not be used for anything since it is very slow.

Which Parser Class is best for AccessProtectionLog.txt?

How do you break up a line? Look at it in your favorite text editor.

    9/27/2013 2:42:26 PM Blocked by Access Protection rule  SOMEDOMAIN\someUser C:\Windows\System32\procexp64.exe C:\Program Files (x86)\McAfee\Common Framework
    \UdaterUI.exe Common Standard Protection:Prevent termination of McAfee processes Action blocked : Terminate

What will you call the fields? What is the separator?

If you look at the line you may notice (not immediately visible in the notebook) that the text file is delimited by a tab character ('\t') and it consists of eight fields. If we break them down we can name them as:

  • date
  • time
  • status
  • username
  • filename
  • trigger_location
  • rule
  • action

We are taking an already checked in parser to use as an example, to avoid all namespace collitions we are appending the word "Foo" or "foo" to many of the class names and other fields to avoid name collisions.

VerifyRow()

During the extraction process a worker gets a copy of every file. That file needs to be evaluated to find the correct parser to parse it. The goal of the VerifyRow() function is to define a way for each parser to evaluate a single line or part of the text file to verify that this line matches the format the parser is meant to parse. This function returns True or False.

What makes your file the special, unique snowflake that it is? Will is always have the same number of columns? Is the timestamp format unique? What kind of strings are common in the lines? Use these as a base for creating VerifyRow().

It is important that this function quickly identifies the file in question, and accurately. The tool will collect by default every file found inside a disk image and attempt to parse it using this parser, thus it is important to quickly verify the file structure to not waste too much time as well as to do so accurately so there are no false hits in the parser.


In [ ]:
PrintClassHelp(text_parser.TextCSVParser, 'VerifyRow')

_GetTimestamp()

The timestamp in this text file is is split into 2 fields, the Date and the Time.

The date is a string like "9/27/2013". The sample was obtained from an American system, so the month is first (dates depend on local settings). Anecdotal reports suggest that European systems are day first, but for this code lab we will ignore that.

The time is a string like "2:42:26 PM". It's 12 hour time, and written in the local computer's time zone.

You can write your own timestamp parser, but the cool kids are all using timelib to create Plaso 64 bit timestamps the easy way. All time manipulations should be done using timelib. If there is no available function inside the common timelib you will have to add one. This is done to make sure we can easily test and fix issues that may come across time manipulations. Plaso stores it's timestamps as number of microseconds since the Epoch in UTC.

timelib has the class Timestamp which has the functions to make the proper timestamp:

  • FromPythonDatetime
  • FromTimeString
  • FromHFSTime
  • FromHfsPlusTime
  • FromCocoaTime
  • FromFatDateTime
  • FromWebKitTime
  • FromFiletime
  • FromPosixTime
  • FromTimeParts

What is the best function to convert our string into a Timestamp? When you're writing the function, think about error checking/confirming you have something to work with. Also, you'll want to write a full Doc String for this one as there are 2 arguments.

In our text parser we define a small little function that takes the timestamp as it is -- the text format -- and translates that into the proper integer value (microseconds since Epoch UTC). This function is named GetTimetamp.

  def _GetTimestamp(self, date, time):

Let's explore the timelib class a bit, see what functions exist within the timelib.Timestamp class and how they are used:


In [ ]:
# Import the library we are about to inspect.
from plaso.lib import timelib

# You can easily change the name of the class here if you want to explore a different
# class and it's members.
PrintClassHelp(timelib.Timestamp)

ParseRow()

Now to the actual meat of the parser, ParseRow(row). The text parser will parse each line in the text file, split it up into columns, assign the key value to each row according to the COLUMNS definition and pass that as a dict object to the ParseRow() function. The purpose of the ParseRow? To take that dict and turn that into an EventObject representing that particular log line.

So if you define the COLUMNS class constant as ['foo', 'bar'] and the text file contains only two fields, eg "stuff, more stuff" you will get a dict created for each row, called "row" that contains:

COLUMNS = ['foo', 'bar']

And the text file is

stuff, more stuff
egg, ham
hamster, mouse

The resulting parser would call the ParseRow function three times, each time assigning the row dict the following values:

{'foo': 'stuff', 'bar': 'more stuff'}

{'foo': 'egg', 'bar': 'ham'}

{'foo': 'hamster', 'bar': 'mouse'}


The ParseRow function goal is to extract meaning from the log line, create an event object and push that to the queue. You can use the default event class from the event library or you can make your own event class. The latter is good when you want to add some logic to a row or event (as discussed above). Creating a minimal Event lets you sent the data type.

def ParseRow(self, parser_mediator, row_offset, row):
    """Parses a row and extract event objects.

    Args:
      parser_mediator: a parser mediator object (instance of ParserMediator).
      row_offset: the offset of the row.
      row: a dictionary containing all the fields as denoted in the
           COLUMNS class list.
    """
    try:
      timestamp = self._GetTimestamp(
          row[u'date'], row[u'time'], parser_mediator.timezone)
    except errors.TimestampError as exception:
      parser_mediator.ProduceParseError(
          u'Unable to parse time string: [{0:s} {1:s}] with error {2:s}'.format(
              repr(row[u'date']), repr(row[u'time']), exception))
      return

    if timestamp is None:
      return

    event_object = McafeeAVEvent(timestamp, row_offset, row)
    parser_mediator.ProduceEvent(event_object)

In [ ]:
PrintClassHelp(text_parser.TextCSVParser, 'ParseRow')

You may have noticed the method is passed a "parser_mediator" object. The parser mediator is an object that is passed to each parser (and plugin) and allows it to interact with other parts of Plaso, essentially being a mediator. Here you can see how it provides functionality to produce events that will be queued for storage and producing parser error reports.

Below, you see an example of how the parser mediator object can also be used to access information about the system collected during pre-processing. In this case, parser_mediator.timezone provides the system's timezone. This part of Plaso is undergoing extensive development at the moment, and more features will be added in near future.

Write a Convenience Event Object - McafeeAVEvent

Each timestamped event is described as an EventObject. Sometimes it may be easier to create a convenience class to make it easier to create the EventObject. The reasons you may want to implement your own class is mainly for convenience reasons, eg. if for each created event object the same amount of calculations are made to normalize the timestamp or same conditions are applied before assigning a particular value (eg; compare variables foo and bar and assign the value of foobar depending on that comparison).

Generally, you will put the Event class in the parser file at the top.

The prime thing to have in an Event class is the DATA_TYPE. It is in essence a set of categories, separated by ':'.

DATA_TYPE = 'av:mcafee:accessprotectionlog'

If you would like to transform the row, you can implement the __init__() function. Make sure you add a Doc String spelling out what you pass in.

With the McAfee AV log, we put the "Date" and "Time" into the timestamp variable, so we don't need them in the row. It would thus be a good idea to remove them from the list of attributes before creating the event.

Write the McafeeAVEvent class and implement __init__().


In [ ]:
class McafeeAVEventFoo(text_events.TextEvent):
  """Convenience class for McAfee AV Log events """

  # This has to match the DATA_TYPE attribute in the formatter class (discussed later)
  DATA_TYPE = u'av:mcafee:accessprotectionlog:foo'

  def __init__(self, timestamp, offset, attributes):
    """Initializes a McAfee AV Log Event.
    Args:
      timestamp: the timestamp time value. The timestamp contains the
                 number of seconds since Jan 1, 1970 00:00:00 UTC.
      offset: the offset of the attributes.
      attributes: dict of elements from the AV log line.
    """
    del attributes[u'time']
    del attributes[u'date']
    super(McafeeAVEventFoo, self).__init__(timestamp, offset, attributes)
    self.full_path = attributes[u'filename']

For this example we used the assistant event object TextEvent. To get a bit more help about the __init__ function of the clas.


In [ ]:
PrintClassHelp(text_events.TextEvent, '__init__')

Here is the entire class for the parser.


In [ ]:
class McafeeAccessProtectionParserFoo(text_parser.TextCSVParser):
  """Parses the McAfee AV Access Protection Log."""
  # The above docstring has to be short and descriptive as that is used when
  # listing up all available parsers in the tool.

  # The name of the parser. This is the name of the parser as it will appear
  # when parser.parser_name is called, and is displayed in all statistics. Also
  # when parser selection is made. This has to be unique, since no two parsers can
  # share the same name.
  # This also has to be simple and descriptive (and lower case).
  NAME = u'mcafee_protection_foo'

  # The description field serves as a short one line describing the purpose of the
  # parser and is used when printing out information about each parser, eg when the
  # the front-end is called with "--info".
  DESCRIPTION = u'Parser for McAfee AV Access Protection log files.'

  # The value separator defines the delimiter of the text file, by default this is set
  # to ',', so it parsers typical CSV files. If you need to overwrite that, for instance
  # when parsing this text log we define it here.
  VALUE_SEPARATOR = b'\t'

  # If there is a header before the lines start it can be defined here, and
  # the number of header lines that need to be skipped before the parsing
  # starts.
  NUMBER_OF_HEADER_LINES = 0
    
  # If there is a special quote character used inside the structured text
  # it can be defined here.
  QUOTE_CHAR = '"'

  # Define the columns of the McAfee AV Access Protection Log.
  COLUMNS = [u'date', u'time', u'status', u'username', u'filename',
             u'trigger_location', u'rule', u'action']

  def _GetTimestamp(self, date, time, timezone):
    """Return a 64-bit signed timestamp in microseconds since Epoch.

     The timestamp is made up of two strings, the date and the time, separated
     by a tab. The time is in local time. The month and day can be either 1 or 2
     characters long.  E.g.: 7/30/2013\t10:22:48 AM

     Args:
       date: The string representing the date.
       time: The string representing the time.
       timezone: The timezone object.

     Returns:
       A plaso timestamp value, microseconds since Epoch in UTC.
    """

    if not (date and time):
      logging.warning('Unable to extract timestamp from McAfee AV logline.')
      return

    # TODO: Figure out how McAfee sets Day First and use that here.
    # The in-file time format is '07/30/2013\t10:22:48 AM'.
    return timelib.Timestamp.FromTimeString(
        u'{0:s} {1:s}'.format(date, time), timezone=timezone)

  def VerifyRow(self, parser_context, row):
    """Verify that this is a McAfee AV Access Protection Log file.
    
    Args:
      parser_context: A parser context object (instance of ParserContext).
      row: A single row from the CSV file.

    Returns:
      True if this is the correct parser, False otherwise.
    """

    if len(row) != 8:
      return False

    # This file can have the UTF-8 marker at the beginning of the first row.
    # TODO: Find out all the code pages this can have.  Asked McAfee 10/31.
    if row['date'][0:3] == '\xef\xbb\xbf':
      row['date'] = row['date'][3:]

    # Check the date format!
    # If it doesn't pass, then this isn't a McAfee AV Access Protection Log
    try:
      self._GetTimestamp(row['date'], row['time'], parser_context.timezone)
    except (TypeError, ValueError):
      return False

    # Use the presence of these strings as a backup or incase of partial file.
    if (not 'Access Protection' in row['status'] and
        not 'Would be blocked' in row['status']):
      return False

    return True
    
  def ParseRow(self, parser_mediator, row_offset, row):
    """Parses a row and extract event objects.
    Args:
      parser_mediator: a parser mediator object (instance of ParserMediator).
      row_offset: the offset of the row.
      row: a dictionary containing all the fields as denoted in the
           COLUMNS class list.
    """
    try:
      timestamp = self._GetTimestamp(
          row[u'date'], row[u'time'], parser_mediator.timezone)
    except errors.TimestampError as exception:
      parser_mediator.ProduceParseError(
          u'Unable to parse time string: [{0:s} {1:s}] with error {2:s}'.format(
              repr(row[u'date']), repr(row[u'time']), exception))
      return

    if timestamp is None:
      return

    event_object = McafeeAVEventFoo(timestamp, row_offset, row)
    parser_mediator.ProduceEvent(event_object)    
    
manager.ParsersManager.RegisterParser(McafeeAccessProtectionParserFoo)

[optional code segment] If you make some modifications to the class above you need to first de-register it before you can register it again. To be able to de-register it use the code block below:


In [ ]:
# OPTIONAL CODE BLOCK !! DON'T EXECUTE UNLESS YOU HAVE MADE SOME 
# CHANGES TO THE PARSER CODE AND WANT TO REGISTER IT AGAIN!!!
manager.ParsersManager.DeregisterParser(McafeeAccessProtectionParserFoo)

Writing the Formatter

Have you ever noticed the message string when you print out an event?


In [ ]:
import datetime

from plaso.formatters import manager as formatter_manager
from plaso.formatters import mediator as formatter_mediator
from plaso.lib import event
from plaso.lib import eventdata
from plaso.lib import timelib

# Import the Windows Registry formatter.
from plaso.formatters import mcafeeav

# Create a date object with the current date.
datetime_now = datetime.datetime.utcnow()

# Let's create a dummy event.
demo_event = event.EventObject()

# We need to set the data type so that it matches the McAfee AV protection log.
# [this is the checked in code, so we are omitting the "foo" that was added for
# the purpose of this codelab].
demo_event.data_type = 'av:mcafee:accessprotectionlog'

# Let's imagine that this comes from this made up filename and
# it contains these bogus values.
demo_event.filename = r'c:\some\path\i\do\not\know\file.txt'
demo_event.username = u'john'
demo_event.trigger_location = u'Home'
demo_event.status = u'updating'
demo_event.rule = u'my dummy rule'
demo_event.action = '[DONE]'

# Set the timestamp.
demo_event.timestamp = timelib.Timestamp.FromPythonDatetime(datetime_now)
demo_event.timestamp_desc = eventdata.EventTimestamp.WRITTEN_TIME

# And print the string.
print demo_event.GetString()

# And to re-iterate, let's print the message string.
formatter_mediator_object = formatter_mediator.FormatterMediator()
message_string, _ = formatter_manager.FormattersManager.GetMessageStrings(formatter_mediator_object, demo_event)

print u'MESSAGE STRING: {}'.format(message_string)

Did you notice that in the event above you never really told it how to construct this message string? How does the tool then know how to print it out?

That is the purpose of the formatter. The formatter is a simple class that defines what attributes need to be used and how they are put together to form this message string. You're going to need one for any parser you create (or more importantly any data type that exists).

The way the formatter works is that it looks at the data_type attribute in the EventObject and matches that to the formatters DATA_TYPE attribute. If they are the same, then the formatter proceeds to processing that EventObject and construct the messsage string.

Formatters go in separate files under plaso/formatters.

For the most part, you're just setting some values with formats. You'll want to set up structures that you want to see in your timeline.

Most importantly (to re-iterate), the DATA_TYPE must match the data_type attribute from the EventObject from the last section. Watch out for typos here -- there is no warning.

There are two formatters that you can use, the simple EventFormatter and the ConditionalEventFormatter. The former should only be used if you are absolutely sure all the attributes mentioned there are going to be set for each and every event object created. What that means is that for the vast majority of the formatters the ConditionalEventFormatter should be the formatter of choice. But for this simple text file we know that there are always 8 fields in the line and every field is always going to be set, so we can use the EventFormatter class.

There are two class constants that should always be set, irrelevant of the choice of formatters:

  • SOURCE_SHORT: This should match one of the common sources, eg. LOG, WEBHIST, etc. This should closely correspond to the TLN format by H. Carvey as a short description of the source, almost like a short name for the category of the source.
  • SOURCE_LONG: Since the category itself is not sufficient to describe the source we have an extra field called SOURCE_LONG that further defines that, for instance a browser history extracted from Chrome browser will have the source short set to WEBHIST, indicating that this comes from a web history, but the SOURCE_LONG contains the text "Chrome History", setting that apart from other browsers.

For the simple EventFormatter two class constants have to be set (or at least one):

  • FORMAT_STRING: An unicode string that contains formatting information, place all attribute names in {}. This is just a typical Python formatting string, so all typical rules apply. Timestamp, filename/path, username, hostname, etc information is presented in other fields and should not be a part of the message string.
  • FORMAT_STRING_SHORT: This is only needed when you think that the resulting message string may exceed 80 characters in with and you don't want that to be shorten, as in you don't want the short message string to just contain the first 77 characters of the longer version you can construct your own condensed one.

If you use the conditional formatter you need to define the following class constants:

  • FORMAT_STRING_PIECES: The same as the FORMAT_STRING, except that this is a list and only one attribute name should be defined per entry. If an attribute is not set in the event object then that particular entry in the list will be omitted.
  • FORMAT_STRING_SHORT_PIECES: Same as the FORMAT_STRING_SHORT except in the same format as FORMAT_STRING_PIECES, that is as a list.

Write the Formatter named mcafee.py:

class McafeeAccessProtectionLogEventFormatter(interface.EventFormatter):

An example of this formatter:


In [ ]:
"""The McAfee AV Logs file event formatter."""

from plaso.formatters import interface as formatter_interface
from plaso.formatters import manager as formatter_manager


class McafeeAccessProtectionLogEventFormatterFoo(
    formatter_interface.ConditionalEventFormatter):
  """Formatter for a McAfee Access Protection Log event."""

  DATA_TYPE = u'av:mcafee:accessprotectionlog:foo'

  FORMAT_STRING_PIECES = [
      u'File Name: {filename}',
      u'User: {username}',
      u'{trigger_location}',
      u'{status}',
      u'{rule}',
      u'{action}']

  FORMAT_STRING_SHORT_PIECES = [
      u'{filename}',
      u'{action}']

  SOURCE_LONG = u'McAfee Access Protection Log'
  SOURCE_SHORT = u'LOG'


formatter_manager.FormattersManager.RegisterFormatter(
    McafeeAccessProtectionLogEventFormatterFoo)

Test The Parser

It is very important to test the parser, to see if it can at least parse our sample dataset.

Create a temp test file

Let's create a test file so we can play with this parser:


In [ ]:
import tempfile

testfile_path = u''
text_to_enter = r"""9/27/2013	2:42:26 PM	Blocked by Access Protection rule 	SOMEDOMAIN\someUser	C:\Windows\System32\procexp64.exe	C:\Program Files (x86)\McAfee\Common Framework\UdaterUI.exe	Common Standard Protection:Prevent termination of McAfee processes	Action blocked : Terminate
9/27/2013	2:42:39 PM	Blocked by Access Protection rule 	SOMEDOMAIN\someUser	C:\Windows\System32\procexp64.exe	C:\Program Files (x86)\McAfee\Common Framework\FrameworkService.exe	Common Standard Protection:Prevent termination of McAfee processes	Action blocked : Terminate
9/27/2013	2:42:39 PM	Blocked by Access Protection rule 	SOMEDOMAIN\someUser	C:\Windows\System32\procexp64.exe	C:\Program Files (x86)\McAfee\Common Framework\UdaterUI.exe	Common Standard Protection:Prevent termination of McAfee processes	Action blocked : Terminate
9/27/2013	2:42:40 PM	Blocked by Access Protection rule 	SOMEDOMAIN\someUser	C:\Windows\System32\procexp64.exe	C:\Program Files (x86)\McAfee\Common Framework\McTray.exe	Common Standard Protection:Prevent termination of McAfee processes	Action blocked : Terminate
7/17/2013	1:49:34 PM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	NT AUTHORITY\SYSTEM	C:\Windows\System32\powercfg.exe	\REGISTRY\USER\.DEFAULT\Software\Microsoft\Windows\CurrentVersion\Internet Settings\ZoneMap\AutoDetect	Anti-spyware Standard Protection:Protect Internet Explorer favorites and settings	Action blocked : Create
7/17/2013	1:49:34 PM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	NT AUTHORITY\SYSTEM	C:\Windows\System32\powercfg.exe	C:\Windows\System32\config\systemprofile\AppData\Local\Microsoft\Windows\Temporary Internet Files\Content.IE5\index.dat	Anti-virus Maximum Protection:Protect cached files from password and email address stealers	Action blocked : Read
7/17/2013	1:53:31 PM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	TheGrid\clu	C:\Windows\system32\taskhost.exe	C:\Windows\Temp\SDIAG_1893e055-45e8-4dda-a6fc-036616ec15c7\DiagPackage.dll	Common Maximum Protection:Prevent creation of new executable files in the Windows folder	Action blocked : Create
7/17/2013	1:53:32 PM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	TheGrid\clu	C:\Windows\System32\sdiagnhost.exe	\REGISTRY\USER\S-1-5-21-218510691-2140962509-2033415169-18142\Software\Microsoft\Windows\CurrentVersion\Internet Settings\ZoneMap\AutoDetect	Anti-spyware Standard Protection:Protect Internet Explorer favorites and settings	Action blocked : Create
7/30/2013	10:06:05 AM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	NT AUTHORITY\SYSTEM	C:\Windows\TEMP\InstallPlugin_11_8_800_94.exe	C:\Windows\Temp\{49568447-C9D4-4C19-942B-4472959CBC07}\fpb.tmp	Anti-spyware Maximum Protection:Prevent all programs from running files from the Temp folder	Action blocked : Execute
7/30/2013	10:06:06 AM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	NT AUTHORITY\SYSTEM	C:\Windows\TEMP\InstallPlugin_11_8_800_94.exe	C:\Windows\Temp\{05007B29-A945-4346-8B04-7DD2F5453280}\InstallFlashPlayer.exe	Common Maximum Protection:Prevent creation of new executable files in the Windows folder	Action blocked : Create
7/30/2013	10:18:02 AM	Would be blocked by port blocking rule  (rule is currently not enforced) 	C:\Windows\SysWOW64\Macromed\Flash\FlashPlayerUpdateService.exe	Common Maximum Protection:Prevent HTTP communication	23.56.2.70:443
7/30/2013	10:22:48 AM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	NT AUTHORITY\SYSTEM	C:\Windows\system32\svchost.exe	C:\Users\tron\AppData\Roaming\Mozilla\Firefox\prfD430.tmp	Common Standard Protection:Protect Mozilla & FireFox files and settings	Action blocked : Create
7/30/2013	10:22:48 AM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	NT AUTHORITY\SYSTEM	C:\Windows\system32\svchost.exe	C:\Users\tron\AppData\Roaming\Mozilla\Firefox\Profiles\w77xlhgl.default\webapps\prfD432.tmp	Common Standard Protection:Protect Mozilla & FireFox files and settings	Action blocked : Delete
7/30/2013	10:22:48 AM	Would be blocked by Access Protection rule  (rule is currently not enforced) 	NT AUTHORITY\SYSTEM	C:\Windows\system32\svchost.exe	C:\Users\tron\AppData\Roaming\Mozilla\Firefox\Profiles\w77xlhgl.default\prfD431.tmp	Common Standard Protection:Protect Mozilla & FireFox files and settings	Action blocked : Create
"""

with tempfile.NamedTemporaryFile(delete=False) as fh:
  testfile_path = fh.name
  fh.write(text_to_enter)

print u'Test file created: {}'.format(testfile_path)

[optional code segment] If you want to see the actual content of the file below, that is that you successfully saved it, you can use the code below:


In [ ]:
lines_in_file = 0

with open(testfile_path, 'rb') as fh:
  for line in fh:
    print line,
    lines_in_file +=1
    
print u'With a total number of lines: {0:d}'.format(lines_in_file)

Parse the File

We can use the code below to test our parsing, to see if the parser is capable of parsing the text file we provided it with.


In [ ]:
from plaso.engine import knowledge_base
from plaso.engine import queue
from plaso.engine import single_process

from plaso.lib import errors
from dfvfs.lib import definitions

from plaso.parsers import mediator as parsers_mediator

from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver


class CodelabEventObjectQueueConsumer(queue.ItemQueueConsumer):
  """Class that implements a list event object queue consumer."""

  def __init__(self, event_queue):
    """Initializes the list event object queue consumer.

    Args:
      event_queue: the event object queue (instance of Queue).
    """
    super(CodelabEventObjectQueueConsumer, self).__init__(event_queue)
    self.event_objects = []

  def _ConsumeItem(self, event_object, **unused_kwargs):
    """Consumes an event object callback for ConsumeEventObjects."""
    self.event_objects.append(event_object)


# Create a mock knowledgebase object.
knowledge_base_object = knowledge_base.KnowledgeBase()

# Set the config to none at the moment.
config = None

# Create the parser object.
test_parser = McafeeAccessProtectionParserFoo()

# Open the file, or get a file entry.
path_spec = path_spec_factory.Factory.NewPathSpec(
    definitions.TYPE_INDICATOR_OS, location=testfile_path)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)

print u'Parsing file using: {}'.format(test_parser.NAME)

# Create a parser mediator and the necessary queues.
event_queue = single_process.SingleProcessQueue()
event_queue_producer = single_process.SingleProcessItemQueueProducer(event_queue)

parser_error_queue = single_process.SingleProcessQueue()
parser_error_queue_producer = single_process.SingleProcessItemQueueProducer(
      parser_error_queue)

this_parser_mediator = parsers_mediator.ParserMediator(
    event_queue_producer, parser_error_queue_producer, knowledge_base_object)

# Parser the file using the parser.
this_parser_mediator.SetFileEntry(file_entry)
test_parser.Parse(this_parser_mediator)

test_consumer = CodelabEventObjectQueueConsumer(event_queue)
test_consumer.ConsumeItems()

event_objects = test_consumer.event_objects

print u'Processing of file is done.'
print u'Able to extract: {} events from the file.'.format(len(event_objects))
print u'There are in total: {0:d} lines in the file, which should match the number of extracted events'.format(
    lines_in_file)
if lines_in_file != len(event_objects):
  print u'MISMATCH!'

You should see 14 events being extracted from the file.

We can also play a bit with the test file to see if everything parsed correctly.


In [ ]:
from dfvfs.helpers import text_file

# Open the file and compare the content to what was extracted.
# This would only work for a simple text file that is a "single line per record" type of file
# and is just for demonstration purpose here in this codelab.
# AND this also depends on the previous code has been executed.
file_object = file_entry.GetFileObject()
text_file_object = text_file.TextFile(file_object)

for index, event_object in enumerate(event_objects):
  line = text_file_object.readline()
  print u'*' * 80
  print u'    EVENT NUMBER: {}'.format(index)
  print u'-'*80
  print u'Line:'
  print line
  print u'Event:'
  print event_object.GetString()
  print u''

line = text_file_object.readline()

if line:
  print u'Unparsed lines still in file.'
  counter = 0
  while line:
    # Uncomment if you want the actual line printed out that wasn't parsed.
    # print line
    line = text_file_object.readline()
    counter += 1

  print u'Lines not parsed: {}'.format(counter)

Writing the Tests

Unit tests are designed to make sure your code is doing what you intended it to do, as well as to let other people know when their refactor broke your code. This will also assist you when you are writing your code by doing a sanity check on your parser to make sure it works the way you expect it to.

The tests go in their own file, so create tests/parsers/mcafee.py.

Then fill in the rest of the header -- Doc String and imports. 'unittest' makes this a unit test file.

You need to import the formatter, and likely rename is since it will conflict with the parser name, but you don't actually use the formatter directly in the file, so you end up with the pylint statement. The other imports you'll see through out this code lab. But since everything is in the same namespace here we don't really need to import the formatter, but this is typically needed to be added:

# pylint: disable-msg=unused-import
    from plaso.formatters import mcafeeav as mcafeeav_formatter

The pylint statement needs to be there to make sure that pylint does not complain about an unused import since we are not directly using the formatter, we are just importing it so that it gets registered (otherwise it will not work).

TestCase and setUp()

For a parser test we will use the test_lib.ParserTestCase test cases. This is a simple class that inherits from the base unittest.TestCase class but adds few functions to make it easier to test Plaso parsers. You may want to add a setUp() function to open the sample file and set any other variables you expect in the background, like a copy of the parser object.

For a text parser, you just need to get the file path of the test file in question with the self._GetTestFilePath([filename]) function. This may depend on the test library assistant, for instance you have access to the self._GetKeyFromFile(self, path, key_path) if you are writing a test for a Windows Registry plugin. Please consult the test library for current available functions (below is a simple code to print out the help for the parser test_lib).

Also for all tests that should run automatically, the function name needs to start with a lower case "test", eg; "testParsing", "testFoo", "testBar".

However since the tests are kept in a separate directory (tests vs. inside plaso) they are not part of the packaged version of plaso.

Thus we need to include the test lib here in the codelab.


In [ ]:
import os
import unittest

from dfvfs.lib import definitions
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver

from plaso.engine import knowledge_base
from plaso.engine import queue
from plaso.engine import single_process
from plaso.formatters import manager as formatters_manager
from plaso.formatters import mediator as formatters_mediator
from plaso.lib import event
from plaso.parsers import mediator


class TestItemQueueConsumer(queue.ItemQueueConsumer):
  """Class that implements a list event object queue consumer."""

  def __init__(self, event_queue):
    """Initializes the list event object queue consumer.

    Args:
      event_queue: the event object queue (instance of Queue).
    """
    super(TestItemQueueConsumer, self).__init__(event_queue)
    self.event_objects = []

  def _ConsumeItem(self, event_object, **unused_kwargs):
    """Consumes an item callback for ConsumeItems."""
    self.event_objects.append(event_object)


class ParserTestCase(unittest.TestCase):
  """The unit test case for a parser."""

  _DATA_PATH = os.path.join(os.getcwd(), u'data')
  _TEST_DATA_PATH = os.path.join(os.getcwd(), u'test_data')

  # Show full diff results, part of TestCase so does not follow our naming
  # conventions.
  maxDiff = None

  def _GetEventObjects(self, event_generator):
    """Retrieves the event objects from the event generator.

    This function will extract event objects from a generator.

    Args:
      event_generator: the event generator as returned by the parser.

    Returns:
      A list of event objects (instances of EventObject).
    """
    event_objects = []

    for event_object in event_generator:
      self.assertIsInstance(event_object, event.EventObject)
      # Every event needs to have its parser and pathspec fields set, so that
      # it's possible to trace its provenance.
      self.assertIsNotNone(event_object.pathspec)
      self.assertIsNotNone(event_object.parser)
      event_objects.append(event_object)

    return event_objects

  def _GetEventObjectsFromQueue(self, event_queue_consumer):
    """Retrieves the event objects from the queue consumer.

    Args:
      event_queue_consumer: the event object queue consumer object (instance of
                            TestItemQueueConsumer).

    Returns:
      A list of event objects (instances of EventObject).
    """
    event_queue_consumer.ConsumeItems()

    event_objects = []
    for event_object in event_queue_consumer.event_objects:
      self.assertIsInstance(event_object, event.EventObject)
      event_objects.append(event_object)

    return event_objects

  def _GetParserMediator(
      self, event_queue, parse_error_queue, knowledge_base_values=None,
      file_entry=None, parser_chain=None):
    """Retrieves a parser context object.

    Args:
      event_queue: the event queue (instance of Queue).
      parse_error_queue: the parse error queue (instance of Queue).
      knowledge_base_values: optional dict containing the knowledge base
                             values. The default is None.
      file_entry: optional dfVFS file_entry object (instance of dfvfs.FileEntry)
                  being parsed.
      parser_chain: Optional string containing the parsing chain up to this
                    point. The default is None.

    Returns:
      A parser context object (instance of ParserMediator).
    """
    event_queue_producer = queue.ItemQueueProducer(event_queue)
    parse_error_queue_producer = queue.ItemQueueProducer(parse_error_queue)

    knowledge_base_object = knowledge_base.KnowledgeBase()
    if knowledge_base_values:
      for identifier, value in knowledge_base_values.iteritems():
        knowledge_base_object.SetValue(identifier, value)

    new_mediator = mediator.ParserMediator(
        event_queue_producer, parse_error_queue_producer,
        knowledge_base_object)
    if file_entry:
      new_mediator.SetFileEntry(file_entry)

    if parser_chain:
      new_mediator.parser_chain = parser_chain
    return new_mediator

  def _GetShortMessage(self, message_string):
    """Shortens a message string to a maximum of 80 character width.

    Args:
      message_string: the message string.

    Returns:
      The same short message string, if it is longer than 80 characters it will
      be shortened to it's first 77 characters followed by a "...".
    """
    if len(message_string) > 80:
      return u'{0:s}...'.format(message_string[0:77])

    return message_string

  def _GetTestFilePath(self, path_segments):
    """Retrieves the path of a test file relative to the test data directory.

    Args:
      path_segments: the path segments inside the test data directory.

    Returns:
      A path of the test file.
    """
    # Note that we need to pass the individual path segments to os.path.join
    # and not a list.
    return os.path.join(self._TEST_DATA_PATH, *path_segments)

  def _GetTestFileEntryFromPath(self, path_segments):
    """Creates a file entry that references a file in the test dir.

    Args:
      path_segments: the path segments inside the test data directory.

    Returns:
      A file entry object (instance of dfvfs.FileEntry).
    """
    path = self._GetTestFilePath(path_segments)
    path_spec = path_spec_factory.Factory.NewPathSpec(
        definitions.TYPE_INDICATOR_OS, location=path)
    return path_spec_resolver.Resolver.OpenFileEntry(path_spec)

  def _ParseFile(self, parser_object, path, knowledge_base_values=None):
    """Parses a file using the parser object.

    Args:
      parser_object: the parser object.
      path: the path of the file to parse.
      knowledge_base_values: optional dict containing the knowledge base
                             values. The default is None.

    Returns:
      An event object queue consumer object (instance of
      TestItemQueueConsumer).
    """
    path_spec = path_spec_factory.Factory.NewPathSpec(
        definitions.TYPE_INDICATOR_OS, location=path)
    return self._ParseFileByPathSpec(
        parser_object, path_spec, knowledge_base_values=knowledge_base_values)

  def _ParseFileByPathSpec(
      self, parser_object, path_spec, knowledge_base_values=None):
    """Parses a file using the parser object.

    Args:
      parser_object: the parser object.
      path_spec: the path specification of the file to parse.
      knowledge_base_values: optional dict containing the knowledge base
                             values. The default is None.

    Returns:
      An event object queue consumer object (instance of
      TestItemQueueConsumer).
    """
    event_queue = single_process.SingleProcessQueue()
    event_queue_consumer = TestItemQueueConsumer(event_queue)

    parse_error_queue = single_process.SingleProcessQueue()

    parser_mediator = self._GetParserMediator(
        event_queue, parse_error_queue,
        knowledge_base_values=knowledge_base_values)
    file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)
    parser_mediator.SetFileEntry(file_entry)

    # AppendToParserChain needs to be run after SetFileEntry.
    parser_mediator.AppendToParserChain(parser_object)

    parser_object.Parse(parser_mediator)

    return event_queue_consumer

  def _TestGetMessageStrings(
      self, event_object, expected_message, expected_message_short):
    """Tests the formatting of the message strings.

       This function invokes the GetMessageStrings function of the event
       formatter on the event object and compares the resulting messages
       strings with those expected.

    Args:
      event_object: the event object (instance of EventObject).
      expected_message: the expected message string.
      expected_message_short: the expected short message string.
    """
    formatter_mediator = formatters_mediator.FormatterMediator(
        data_location=self._DATA_PATH)
    message, message_short = (
        formatters_manager.FormattersManager.GetMessageStrings(
            formatter_mediator, event_object))
    self.assertEqual(message, expected_message)
    self.assertEqual(message_short, expected_message_short)

  def _TestGetSourceStrings(
      self, event_object, expected_source, expected_source_short):
    """Tests the formatting of the source strings.

       This function invokes the GetSourceStrings function of the event
       formatter on the event object and compares the resulting source
       strings with those expected.

    Args:
      event_object: the event object (instance of EventObject).
      expected_source: the expected source string.
      expected_source_short: the expected short source string.
    """
    # TODO: change this to return the long variant first so it is consistent
    # with GetMessageStrings.
    source_short, source = (
        formatters_manager.FormattersManager.GetSourceStrings(event_object))
    self.assertEqual(source, expected_source)
    self.assertEqual(source_short, expected_source_short)

To see further details about the parser tests assistant


In [ ]:
PrintClassHelp(ParserTestCase)

Write the setUp() function for this class definition:

class McafeeAccessProtectionUnitTest(unittest.TestCase):
  """A unit test for the McAfee AV Access Protection Log parser."""

  def setUp(self):

While we're setting up the boilerplate of the test, let's add the main function to the bottom of the file. Then we can run the test on it's own.

if __name__ == '__main__':
  unittest.main()

Writing the Test

The outline of the main test is to create and run the parser, then check that the parser results are correct. You should check a variety of attributes in one row and something about the file in general.

The test needs to start with the word "test". Let's use testParsing(). The assertions should include:

  • How many entries were created?
  • For entry[1], is the timestame, username, and full_path correct?
  • For entry[1], are the message strings formatted correctly?

In [ ]:
class McafeeAccessProtectionUnitTestFoo(ParserTestCase):
  """Tests for the McAfee AV Log parser."""

  def setUp(self):
    """Sets up the needed objects used throughout the test."""
    self._parser = McafeeAccessProtectionParserFoo()

  def testParse(self):
    """Tests the Parse function."""
    # When the file is checked in, it shold be stored in the test_data folder
    # but for the purposes of this codelab we are storing the content of the
    # test file in a temporary file created earlier so we comment out the
    # actual call to location and use the temporary file instead.
    #test_file = self._GetTestFilePath(['AccessProtectionLog.txt'])
    test_file = testfile_path
    event_queue_consumer = self._ParseFile(self._parser, test_file)
    event_objects = self._GetEventObjectsFromQueue(event_queue_consumer)

    # The file contains 14 lines which results in 14 event objects.
    self.assertEqual(len(event_objects), 14)

    # Test that the UTF-8 byte order mark gets removed from the first line.
    event_object = event_objects[0]

    self.assertEqual(event_object.timestamp, 1380292946000000)

    # Test this entry:
    # 9/27/2013 2:42:26 PM  Blocked by Access Protection rule
    #   SOMEDOMAIN\someUser C:\Windows\System32\procexp64.exe C:\Program Files
    # (x86)\McAfee\Common Framework\UdaterUI.exe  Common Standard
    # Protection:Prevent termination of McAfee processes  Action blocked :
    # Terminate

    event_object = event_objects[1]

    self.assertEqual(event_object.timestamp, 1380292959000000)
    self.assertEqual(event_object.username, u'SOMEDOMAIN\\someUser')
    self.assertEqual(
        event_object.full_path, u'C:\\Windows\\System32\\procexp64.exe')

    expected_msg = (
        u'File Name: C:\\Windows\\System32\\procexp64.exe '
        u'User: SOMEDOMAIN\\someUser '
        u'C:\\Program Files (x86)\\McAfee\\Common Framework\\Frame'
        u'workService.exe '
        u'Blocked by Access Protection rule  '
        u'Common Standard Protection:Prevent termination of McAfee processes '
        u'Action blocked : Terminate')
    expected_msg_short = (
        u'C:\\Windows\\System32\\procexp64.exe '
        u'Action blocked : Terminate')

    self._TestGetMessageStrings(event_object, expected_msg, expected_msg_short)

Running the Test

How will you know what the format string should look like? Well, it's time to run the code we have. Typically the parser needs to be "compiled" before the test will be able to import it. Which means that we need to make sure the parser gets picked up for compilation.

Open plaso/parsers/__init__.py. Add an import statement for your new parser. Save the file. Repeat for the formatters.

Normally to run the tests you would either need to run:

$ python run_tests.py

Or to compile:

$ python setup.py build && sudo python setup.py install

And then you can run the test directly using:

$ python tests/parsers/mcafeeav.py

Rinse and repeat as you write the tests. If you change the parser, you need to recompile. If you just change the test, you don't.

However since we are writing this in our notebook we just need to make sure we've run all the previous code segments, and if you make changes, just re-run it.

To run the test itself, execute the below code:


In [ ]:
import unittest

my_suite = unittest.TestSuite()
my_suite.addTest(McafeeAccessProtectionUnitTestFoo('testParse'))

results = unittest.TextTestRunner(verbosity=3).run(my_suite)

if results.errors:
  print u'Errors came up while trying to run test.'
  for error in results.errors:
    if isinstance(error, basestring):
      print error
    else:
      for sub_error in error:
        print sub_error
elif results.failures:
  print u'Failures came up while trying to run test.'
  for failure in results.failures:
    if isinstance(failure, basestring):
      print failure
    else:
      for sub_failure in failure:
        print sub_failure
else:
  print u'All came out clean.'
  print results

If all went well you should have a fully functioning parser by now, ready to parse every text file matching the parser profile.

You can start playing around and making changes to the parser, to see what happens when changes are introduced, or continue and create a new parser.

The Assignment

HERNA - THE REVIEW PROCESS STOPS HERE NO MORE CHANGES ADDED BELOW HERE

Now we have gone through step-by-step how a simple text based parser is created. It is time to take what we've learned so far and create a new parser.

The remainder of the codelab revolves around writing a text parser that parses a simple log format. Write the parser and the unittest to successfully parse that text file.

Let's start with a simple text file.


In [ ]:
mystery_path = u''
text_to_enter = r"""date,user,request,result,action,extra
03/04/2013 21:20:32.3411,foobar,get all the data,no data here,GET,some weird stuff happening here
03/12/2013 13:41:52.1231,foobar,get all the data,"data loading,stuff happening, here it is",GET,1235 bytes transferred
06/05/2013 02:02:12.3421,john,insert data,success,PUT,1023 bytes inserted into attribute database
07/12/2013 06:23:42.7019,john,get all the data,no data here,GET,seems to be an empty database
12/24/2013 18:00:00.1234,dude,get some data,"gathering data, please wait",GET,10 bytes transferred
01/14/2014 12:24:10.562,mike,get some data,"gathering data,please wait",GET,123155324 bytes transferred
"""

with tempfile.NamedTemporaryFile(delete=False) as fh:
  mystery_path = fh.name
  fh.write(text_to_enter)

print u'Test file created: {}'.format(mystery_path)

Fill in all the details in the parser so that it successfully parses this file.


In [ ]:
class MysteryParser(text_parser.TextCSVParser):
    """Parses the mysterious text file created for this codelab."""
    
    # Need to fill in this value, remember to have it descriptive, lower-case
    # and short (can use underscore characters, eg: "super_text").
    NAME = ''
    
    # Need to fill in the value for the parser description.
    DESCRIPTION = 'This parser parses foo for bar'
    
    # If this value is necessary, then uncomment and set.
    #VALUE_SEPARATOR = ''
    
    # If there is a header before the lines start it can be defined here, and
    # the number of header lines that need to be skipped before the parsing
    # starts.
    #NUMBER_OF_HEADER_LINES = 0
    
    # If there is a special quote character used inside the structured text
    # it can be defined here.
    #QUOTE_CHAR = '"'
    
    # Define the columns of the log file.
    #COLUMNS = []
    
    def VerifyRow(self, parser_context, row):
        """Verify that this is truly our mystery file."""
        return False
    
    def ParseRow(self, parser_context, row_offset, row, file_entry=None):
        """Parse a single row from the mysterious log file."""
        pass

[optional code segment] Remember if you make changes to the parser you need to de-register it before you run the code segment again (to register it):


In [ ]:
# OPTIONAL DO NOT EXECUTE UNLESS YOU'VE MADE CHANGES TO THE PARSER CODE ABOVE AND
# NEED TO REGISTER THOSE CHANGES!

manager.ParsersManager.DeregisterParser(MysteryParser)

You may need to create a convenience event object, a small little boiler plate is put here, but it may be modified as you wish. For instance the TimestampEvent may not be the most suitable, look at the available classes in the plaso.lib.event


In [ ]:
from plaso.events import time_events

print ''
for member_name, member_value in inspect.getmembers(time_events):
  if inspect.isclass(member_value):
    if event.EventObject in inspect.getmro(member_value):
      print member_name
      PrintClassHelp(member_value, '__init__')

Here is the skeleton for the convenience event object.


In [ ]:
class MysteryEvent(time_events.TimestampEvent):
  """Insert a nice little doc string here."""
    
  DATA_TYPE = 'text:mystery:entry'
    
  def __init__(self, timestamp, foo, bar):
    """Initializes the mysterious log file event.

    Args:
      timestamp: The timestamp value, whatever format it comes in.
      foo: A nice little attribute.
      bar: Another nice little attribute
    """
    # Call the parent class, at this time the timestamp needs to be in the proper
    # format, perhaps we need to modify it here so that it confirms to the number
    # of milliseconds since Epoch UTC.
    super(MysteryEvent, self).__init__(
        timestamp, timestamp_desc, self.DATA_TYPE)
        
    # Set the other attributes
    self.foo = foo
    self.bar = bar

For the message string to be properly formatted we need to provide a formatter for the extracted event objects.


In [ ]:
class MysteryFormatter(formatter_interface.ConditionalEventFormatter):
  """Class that formats events from the mysterious log file."""
    
  DATA_TYPE = 'text:mystery:entry'
    
  # The format string.
  FORMAT_STRING_PIECES = [
      u'Foo: {foo}',
      u'Bar or even BARRR: {bar:20s}']
    
  FORMAT_STRING_SHORT_PIECES = [u'BAR: {bar}']
    
  SOURCE_LONG = 'Mysterious Log File'
  SOURCE_SHORT = 'LOG'
    
# This is not part of the "regular code", this is just added to make this codelab work.
formatter = MysteryFormatter()
formatter_manager.EventFormatterManager.event_formatters[formatter.DATA_TYPE] = formatter

[optional code segment] If you make changes to the formatter, you need to remove it from the registration before you register it again.


In [ ]:
# This is OPTIONAL code, no need to run unless you make changes to the formatter and want to update
# the registration (as in get the changes checked in).
# You may need to change this to reflect the class name of the formatter.
formatter_name = u'MysteryFormatter'

# No need to change the code below.
formatter_data_type = MysteryFormatter.DATA_TYPE
if formatter_data_type in formatter_manager.EventFormatterManager.event_formatters:
  print 'Formatter registered, removing from manager.'
  del formatter_manager.EventFormatterManager.event_formatters[formatter_data_type]
  print 'Formatter deleted from manager.'
else:
  print 'Formatter not found in manager.'
    
if formatter_name in formatter_manager.DefaultFormatter.classes:
  print 'Removing formatter from class registration.'
  del formatter_manager.DefaultFormatter.classes[formatter_name]
  print 'Formatter removed from class registration.'

And finally we need to create a unit test to make sure we are parsing the file properly.


In [ ]:
class MysteryParserTest(test_lib.ParserTestCase):
  """Tests for the mysterious log file parser."""

  def setUp(self):
    """Sets up the needed objects used throughout the test."""
    self._parser = MysteryParser()

  def testParse(self):
    """Tests the Parse function."""
    test_file = mystery_path
    event_generator = self._ParseFile(self._parser, test_file)
    event_objects = self._GetEventObjects(event_generator)

    # At bare minimum we need to test that the parser successfully parsed
    # all the lines.
    self.assertEquals(len(event_objects), 6)

    # Read in at least one (or more) event objects and make sure we are parsing
    # them correctly. For instance let's test time parsing.
    event_object = event_objects[1]

    self.assertEquals(event_object.timestamp, 1363095712123100)
    # Add here some tests to make sure we are parsing/extracting attribute names.

    # And now we need to test our formatter, create a message string and test it.
    expected_msg = u'This is a message string.'
    expected_msg_short = u'The short version.'

    self._TestGetMessageStrings(event_object, expected_msg, expected_msg_short)

And run these tests to make sure we have everything covered.


In [ ]:
my_suite = unittest.TestSuite()
my_suite.addTest(MysteryParserTest('testParse'))

results = unittest.TextTestRunner(verbosity=3).run(my_suite)

if results.errors:
  print u'Errors came up while trying to run test.'
  for error in results.errors:
    if isinstance(error, basestring):
      print error
    else:
      for sub_error in error:
        print sub_error
elif results.failures:
  print u'Failures came up while trying to run test.'
  for failure in results.failures:
    if isinstance(failure, basestring):
      print failure
    else:
      for sub_failure in failure:
        print sub_failure
else:
  print u'All came out clean.'
  print results

We also want to make sure the code doesn't trigger on the other text file.


In [ ]:
# Create a mock preprocess object.
pre_obj = event.PreprocessObject()

# Create the parser object.
test_parser = MysteryParser(pre_obj, None)

# Open the file, or get a file entry.
path_spec = path_spec_factory.Factory.NewPathSpec(
    definitions.TYPE_INDICATOR_OS, location=testfile_path)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)

# Parser the file using the parser.
event_generator = test_parser.Parse(file_entry)

try:
  # This will trigger the Parse function to reach the first yield statement
  # thus trigger the verification routine.
  _ = next(event_generator)
    
  print u'We were able to parse the McAfee text log using the {}. Please correct the parser.'.format(
      test_parser.parser_name)                                                                                                           
except errors.UnableToParseFile as exception:
  print u'We failed parsing the McAfee text log using the {}, as it should be.'.format(test_parser.parser_name)

And remember there may be some interesting code segments that were introduced in the overview of the McAfee parser that might help you (just change the appropriate calls to the McAfee parser to the newly created one and execute them again).

Clean Up

During our test code we created a temporary file, that we may want to delete. To delete it, use the code below:


In [ ]:
import os

if testfile_path:
  os.remove(testfile_path)

if mystery_path:
  os.remove(mystery_path)