SQLite Plugin Code Lab

Intended Audience

This lab is for people who want to learn how to write and execute a Plaso plugin in Python. This tutorial assumes:

  • You have a functional development environment
  • You have used Plaso
  • You are familiar with the Python programming language
  • You are looking to write a plugin (as an opposed to a parser, which is covered in a separate codelab)

Objective

This lab will teach you how to write a SQLite database plugin with tests for the Plaso framework. By the end you will be able to:

  • Write a SQLite database plugin for plaso
  • Write unit tests for the plugin
  • Run the plugin as part of plaso/log2timeline

Expectations

This lab should take you a couple hours to complete. Some of this is dependent entirely on strange build issues you might have. We are not attempting to get you to check in code yet, this is more to demonstrate how a plugin is written. For this to be a checked in plugin you need to write a plugin against a SQLite database that is not already parsed and split the code here into several files (layout explained below).

Introduction

Welcome to writing a Plaso plugin! From the outside, writing a plugin can be daunting, but once you get your dev environment going, you've fought half the battle. This code lab features a simple SQLite database plugin, but the formula can be used for any type of plugins (another codelab will demonstrate a Windows Registry plugin and the hope is that with these two codelabs we should have the plugin writing more or less covered). You may also be interested in the text parser codelab.

Before Starting

Get familiar with the developers guide and more specifically the style guide. To make the code easier to maintain we follow a style guide, partially based on the Google Python Style Guide but slightly modified to fit our needs.

We also follow a code review process that is discussed on the style guide site.

This is an iPython notebook, and if you are not familiar with it then here is the brief introduction. This is basically an iPython shell wrapped up in a pretty GUI (browser window). You can execute any Python code you wish, and quickly go back, edit and re-run code. To run the code, click the window with the code segment and press "Shift+Enter", that way you will see that the bracket on the left will change to indicate it has been executed and you may see some output below (if the code segment produced any output).

One thing to make note of is that some of the code segments depend on previous code segments having been executed. So in order for this codelab to work properly you need to execute EVERY code segment that is presented here, especially all class declarations and import statements, but to be sure just execute them all [except those explicitly stated as optional].

To make it easier to get documentation about various classes we may use in the codelab we'll start with defining a simple function to print out help (so to execute, click the code segment below and press "SHIFT+ENTER").


In [1]:
import inspect

# Let's put this in a method so we can easily call it from other parts of the codelab.
def PrintClassHelp(class_object, filter_string=''):
  """Prints a help string for a given class object.

  Args:
    class_object: The class that we are about to inspect.
    filter_string: Filter class members that start with a particular string.
  """
  # Print the docstring of the class.
  print u''
  print class_object.__doc__
   
  # Print information for every member function.
  additional_members = []
  for member_name, member_value in inspect.getmembers(class_object):
    # Check to see if we are filtering out members starting with
    # a particular string.
    if filter_string and not member_name.startswith(filter_string):
      continue
    if inspect.ismethod(member_value):
      args = inspect.getargspec(member_value)
      doc_string = member_value.__doc__
        
      print u'{0}{1:>20s}({2}){0:>10}\n\n{3}\n{4}\n\n'.format(
          '*'*5, member_name, u','.join(args.args), doc_string, '-'*80)
    else:
      if member_name.startswith('_'):
        continue
      if member_name in ['classes', 'parent_class', 'plugin_feature', 'top_level_class']:
        continue
        
      additional_members.append(u'{} = {}'.format(member_name, repr(member_value)))

  if additional_members:
    print '\n'
    print '*** Additional Members of Class ***\n\n ',
    print u'\n  '.join(additional_members)

Disclaimer

During this codelab we will be using the iPython notebook interface for everything, which means we have all the classes and code in a single file. Once we deploy the code to the actual codebase we would need to save the code in several places, typically something like:

  • plaso/parsers/sqlite_plugins/myplugin.py
  • plaso/parsers/sqlite_plugins/myplugin_test.py
  • plaso/formatters/myplugin.py

And make necessary changes to:

  • plaso/parsers/sqlite_plugins/__init__.py
  • plaso/formatters/__init__.py

to include the new plugin and formatter in the tool. We may also want to change the plaso/frontend/presets.py to include the plugin in a preset. This still really depends on the plugin itself, sometimes you want to include the parser and all its plugins (like the case of Windows registry plugins), sometimes you only want to load specific plugin(s), which is typically the case with SQLite plugins.

We are however omitting all these details to make the codelab easier to follow along. This can also be used for people to test their plugins and play with them without the need to mess with the codebase and once the plugin is fully functional then create the necessary files and start the code review process.

There are also a lot of comments in the code in this codelab that would typically be omitted from a released plugin. To see the actual code that is used as an example here click on one of the below links (we will only be using parts of that code for demonstration):

Writing the Plugin

We are going to write the plugin completely in this iPython notebook, and test it there too. There is no need for anything else than this notebook, a sample registry file and the plaso libs available.

Before writing a plugin, and now we are assuming we are attempting parse a particular SQLite database, ask yourself these questions:

  • Examine the database itself, what are the table names?
  • What tables provide the information I'm trying to extract?
  • Has there been a change in table names, and schema in different versions?
  • Do I need to support older versions?
  • What does the schema look like for the tables that I'm interested in?
  • Are there any relations between the tables that I need to be aware of?
  • How are the timestamps formatted? And where are they stored?
  • Are there any defined "VIEWS" in the table that can help me understand the schema and how it is used?
  • Create the SQL commands and execute them using something like sqlite3 to test them first.

Remember that we are not about to submit this plugin in for review, since it is already checked in, this is only for demonstration purposes, please refer to the plaso roadmap for open parser/plugin assignments (or add your own).

Before we start looking at the code we need to download the SQLite database to a temporary location so that we can use it for the remainder of this codelab. For this you need an Internet connection.


In [ ]:
# Import a library to make the HTTP connection.
import urllib2

# Import a library so that we can create a temporary file.
import tempfile

# The URL to the SYSTEM hive we are about to use for our testing.
url = u'https://github.com/log2timeline/plaso/raw/master/test_data/skype_main.db'

# Download the file.
response = urllib2.urlopen(url)
data = response.read()

# Save it in a temporary file (we don't want it to be deleted).
test_file = tempfile.NamedTemporaryFile(delete=False)

# Save the name since that is what we will refer to later in the code.
test_database_path = test_file.name

# Write data to it.
test_file.write(data)

# Close the file.
test_file.close()

The header

First things first, every file checked into the project needs a header. That header contains among other copyright information as well as import statements.

The first line after the copyright statement is the doc string that needs to be changed, it should not be longer than 80 characters in width. If you need more than a single line to describe the parser please still only use max 80 characters as the first line, ending with a dot. Then you can create a more detailed description two lines down (an example of that can be seen below).

The import order is defined in the style guide:

Imports are always put at the top of the file, just after any module comments and doc strings and before module globals and constants. 
Imports should be grouped with the order being most generic to least generic:

+ standard library imports
+ third-party imports
+ application-specific imports

Within each grouping, imports should be sorted lexicographically, ignoring case, according to each module's full package path.

In [ ]:
#!/usr/bin/python
#
# Copyright 2013 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This file contains a basic Skype SQLite parser."""
import logging

# We need to be able to create new event objects, and more specifically
# get access to timestamped events.
from plaso.events import time_events

# Import the SQLite parser itself.
from plaso.parsers import sqlite as sqlite_parser

# Import the interface for all SQLite database plugins.
from plaso.parsers.sqlite_plugins import interface

The Plugin Class

We know need to know what kind of plugin you are trying to implement. The "Write A Plugin" section at the plaso documentation site does go slightly into the generic plugin interface and what needs to be done in order to write a plugin. But for now we know we are trying to parse a specific SQLite database using a plugin. If we look at the SQLite database plugin section we notice that the plugin interface is set up relatively simple.


In [ ]:
print PrintClassHelp(interface.SQLitePlugin)

Let's create a database file object from the file we just downloaded.


In [ ]:
# Import necessary libraries from dfVFS so we can open up the file.
from dfvfs.lib import definitions
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver

# Find the file and get a handle to it.
path_spec = path_spec_factory.Factory.NewPathSpec(
    definitions.TYPE_INDICATOR_OS, location=test_database_path)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)

# Open up the SQLite database.
database = sqlite_parser.SQLiteDatabase(file_entry)
database.Open()

# Create a SQLite cache object.
database_cache = sqlite_parser.SQLiteCache()

Now we've got the database opened for future use and we can start exploring it. Let's look at what tables are in the database:


In [ ]:
# The SQL command we need to issue to get table names.
table_sql = 'SELECT name FROM sqlite_master WHERE type="table"'

# Create a small little method that we can later re-use here to execute SQL commands and get the results back.
def QueryForResults(sql_command):
    """Execute a SQLite database query and return back a generator of results."""
    cursor = database.cursor
    try:
        results = cursor.execute(sql_command)
    except sqlite3.DatabaseError as exception:
      print u'SQLite error occured: <{0:s}>'.format(exception)
      raise

    for row in results:
        yield row
        
print '*'*40 + ' TABLES ' + '*'*40
for row in QueryForResults(table_sql):
    print u'  + ',
    print row[0]

There are quite a few tables there that could be of an assistance to us. Let's look at the table definition, or the schema of these tables.


In [ ]:
for row in QueryForResults('SELECT name, sql FROM sqlite_master WHERE type="table"'):
    print u'{0}  {1}  {0}'.format('*'*10, row[0])
    print row[1]
    print ''

Let's look for any values there that may have timestamps associated to them or otherwise could provide us with value.

We can also look for VIEW tables:


In [ ]:
for row in QueryForResults('SELECT name, sql FROM sqlite_master WHERE type="view"'):
    print u'{0}  {1}  {0}'.format('*', row[0])
    print row[1]
    print ''

Looking at the tables above we may have spotted the tables Chats and Messages... let's review them a bit more.


In [ ]:
for row in QueryForResults('SELECT name, sql FROM sqlite_master WHERE type = "table" AND (name = "Chats" OR name = "Messages")'):
    print u'{0}  Table: {1}  {0}'.format('*'*10, row[0])
    #white_space = len('CREATE TABLE {} ('.format(row[0])) - 1
    sql_string = row[1].replace('(', '(\n    ')
    print u'\n   '.join(sql_string.split(','))
    print ''

Inspecting these above tables can lead us to a SQL query like the one below:


In [ ]:
chat_query = (
    'SELECT c.id, c.participants, c.friendlyname AS title, '
    'm.author AS author, m.from_dispname AS from_displayname, '
    'm.body_xml, m.timestamp, c.dialog_partner FROM Chats c, Messages m '
    'WHERE c.name = m.chatname')

for index, row in enumerate(QueryForResults(chat_query)):
  print '-'*80
  print '      ROW RESULT: {}'.format(index)
  print '-'*80
  for key in row.keys():
    print u'{} -> {}'.format(key, row[key])
  print '*'*80
  print ''

One important disclaimer, since we are taking an already checked in plugin to use as an example, to avoid all namespace collitions we are appending the word "Foo" or "foo" to many of the class names and other fields.

Important Class Constants

In plaso terms as soon as we've got more than a single "parser" that attempts to parse a particular file format we convert that to a plugin system. Then a very generic parser can be created that takes care of all file format parsing, leaving the plugins to do minimal work, just defining few class constants that are used to match the particular file or file segments to what the plugin is designed to parse and then a function to process the data collected.

For SQLite database plugins we need to define the following class attributes:

  • NAME: Name of the plugin, this should be short and concise but still descriptive. Something like "skype" or "chrome", etc.
  • DESCRIPTION: A short description of what the plugin does, eg: "SQLite plugin for Skype main.db SQLite database file."
  • QUERIES: This is a list of all the SQL commands that should be run on the database and the name of the call back method we need to call with the query results.
  • REQUIRED_TABLES: The parser itself will determine if we are trying to parse a SQLite database. However, since there are so many SQLite databases out there we need some additional information to determine if this particular plugin should be executed. For that we need a list (actually a frozenset) of the table names that need to be defined in the database for this plugin to be considered.
  • URLS: [OPTIONAL] This is a list of URL's that can be used to read additional information about this particular registry key. This could be a link to some blogs discussing how to interpret the database, or a link to the developer of the database discussing the structure, etc.

Call back functions

The underlying parser takes care of openinp up the database, query the available tables and compare that list to the list you provided in the REQUIRED_TABLES. If that list is determined as a subset of the actual tables the parser takes the queries defined in QUERIES and executes them one-by-one on the database.

For every row that comes out of each query the parser calls the named call back function that is defined in the QUERIES list with the row, cache object and potentially other objects as well.

For instance, if we define the SQL command:

QUERIES = [('SELECT foo FROM bar', 'ParseFoo')]

Then the parser executes the command "SELECT foo FROM bar" and for every result that comes from that the function "ParseFoo" is called. This call back function needs to be defined in the plugin and accept the correct parameters:

def ParseFoo(self, parser_context, row, query=None, **unused_kwargs):

Or if you need access to a database cache object:

def ParseFoo(self, parser_context, row, query=None, cache, **unused_kwargs):

The SQLite database cache object that gets passed as a parameter to each SQLite call back function is an object that can be used to store cached data that you may need. Let's imagine a scenario...

A database table defines a list of paths and filenames. Each row in the database contains the file name and an identification value for the parent value. To fully construct the path one would need to follow that parent id, query the database for that ID value, and see if that entry had a parent, etc. Some of this could be achieved using complex JOIN statements in SQLite, however sometimes it is just easier to run a single SQL command to get all these values, store that as a cache and then quickly look up values in the cache when needed.

That is what the cache object can be used for. Let's look at the definition for the cache:


In [ ]:
PrintClassHelp(sqlite_parser.SQLiteCache)

Let's look at one example on how to use the cache:


In [ ]:
# Create new cache instance.
another_cache = sqlite_parser.SQLiteCache()

# Create a SQL command we would like to issue to the database and cache the results of.
sql_command = u'SELECT parent_id, partner_handle AS skypeid, partner_dispname AS skypename FROM transfers'

# Get a cursor so we can execute the SQL command.
cursor = database.cursor
# Execute the SQL command and get back a result set.
results = cursor.execute(sql_command)

# And now we can build the cache based on that result set.
# This get's explained a bit more later on.
another_cache.CacheQueryResults(
    results, 'destination', 'parent_id', ('skypeid', 'skypename'))

# And fetch the recently added entry here.
destination_dict  = another_cache.GetResults('destination')

print destination_dict

To be able to better understand the CacheQueryResults function let's print the docstring:


In [ ]:
PrintClassHelp(another_cache.CacheQueryResults)

OK, so let's go back to that code of ours and see how that builds up that cache dict object:

cache.CacheQueryResults(
    results, 'destination', 'parent_id', ('skypeid', 'skypename'))

We call the function with the parameters set as:

  • sql_results: results. We need to first issue the database query, get the results back and pass that object in.
  • attribute_name: 'destination'. This is the name of the attribute we would like to store the results as in the cache.
  • key_name: 'parent_id'. This is the name of the row key we would like to use as key in the resulting dict that will be created.
  • values: ('skypeid', 'skypname'). This is the list (or single value) of row keys that we would like to be used as values in the resulting dict that is created.

To go over this specific example, what we are asking the cache to do is to create a dict object that is called "destination" and store the results from each row in that in the following way:

cache.destination = {}
for row in results:
    cache.destination[row['parent_id']] = (row['skypeid'], row['skypename'])

This is roughly what happens. What we end up is a dict that we can use based on the "parent_id" and it will give us the appropriate values for "skypid" and "skypename".

An example here:


In [ ]:
skype_id, skype_name =  destination_dict.get(23445435)

print u'ID: {}\nNAME: {}'.format(skype_id, skype_name)

Timestamp

When dealing with the SQLite databases timestamps come in all shapes and formats. One of the more common format is POSIX time or Epoch time in UTC. However that may or may not be the case for the database you are examining.

Let's examine what our options are:


In [ ]:
# Import the library we are about to inspect.
from plaso.lib import timelib

# You can easily change the name of the class here if you want to explore a different
# class and it's members.
PrintClassHelp(timelib.Timestamp)

Event Object

Each timestamped event is described as an EventObject. Often it may be easier to create a convenience class to make it easier to create the EventObject.

Let's examine one such example:


In [ ]:
class SkypeChatEvent(time_events.PosixTimeEvent):
  """Convenience class for a Skype event."""

  # Define the data type, this is very important and needs to have a 1:1
  # mapping to the formatter for the event.
  DATA_TYPE = 'skype:event:chat'

  def __init__(self, row, to_account):
    """Build a Skype Event from a single row.

    Args:
      row: A row object (instance of sqlite3.Row) that contains the
           extracted data from a single row in the database.
      to_account: A string containing the accounts (excluding the
                  author) of the conversation.
    """
    # First thing we need to do is to call the "super" class or the parent
    # of the event. This particular event object inherits from a class
    # called event.PosixTimeEvent, which expects the timestamp sent to
    # it to be POSIX or Epoch in UTC.
    # The parameters that need to be passed to the PosixTimeEvent are:
    #    timestamp: Timestamp in POSIX or Epoch since UTC.
    #    timestamp_desc: Description of the timestamp, eg "Last Written Time".
    #    data_type: The data type of this event object.
    super(SkypeChatEvent, self).__init__(
        row['timestamp'], 'Chat from Skype', self.DATA_TYPE)

    # We can now set other attributes that we need in order to better format
    # the message. These attributes vary depending on the source they come from.
    self.title = row['title']
    self.text = row['body_xml']
    self.from_account = u'{0:s} <{1:s}>'.format(
        row['from_displayname'], row['author'])
    self.to_account = to_account

The event object is pretty simple really. We need to set the following keys:

  • row: FOO
  • to_account: FOO

This inherits from event.PosixTimeEvent:


In [ ]:
PrintClassHelp(time_events.PosixTimeEvent, '__init__')

The parent class needs the following parameters:

  • posix_time: The timestamp in POSIX time, or Epoch time (in UTC).
  • usage: This is the description of the meaning of the timestamp, eg: "Last Written", "Entry Created".
  • data_type: The data type of the EventObject, that needs to be a 1:1 mapping between what is defined in the formatter for this event object.

And now we just need to define the rest of the needed EventObjects for the plugin to work:


In [ ]:
class SkypeAccountEvent(time_events.PosixTimeEvent):
  """Convenience class for account information."""

  DATA_TYPE = 'skype:event:account'

  def __init__(
      self, timestamp, usage, identifier, full_name, display_name, email,
      country):
    """Initialize the event.

    Args:
      timestamp: The POSIX timestamp value.
      usage: A string containing the description string of the timestamp.
      identifier: The row identifier.
      full_name: A string containing the full name of the Skype account holder.
      display_name: A string containing the chosen display name of the account
                    holder.
      email: A string containing the registered email address of the account
             holder.
      country: A string containing the chosen home country of the account
               holder.
    """
    super(SkypeAccountEvent, self).__init__(timestamp, usage)

    self.offset = identifier
    self.username = u'{0:s} <{1:s}>'.format(full_name, display_name)
    self.display_name = display_name
    self.email = email
    self.country = country
    self.data_type = self.DATA_TYPE


class SkypeSMSEvent(time_events.PosixTimeEvent):
  """Convenience EventObject for SMS."""

  DATA_TYPE = 'skype:event:sms'

  def __init__(self, row, dst_number):
    """Read the information related with the SMS.

      Args:
        row: row form the sql query.
          row['time_sms']: timestamp when the sms was send.
          row['dstnum_sms']: number which receives the sms.
          row['msg_sms']: text send to this sms.
        dst_number: phone number where the user send the sms.
    """
    super(SkypeSMSEvent, self).__init__(
        row['time_sms'], 'SMS from Skype', self.DATA_TYPE)

    self.number = dst_number
    self.text = row['msg_sms']


class SkypeCallEvent(time_events.PosixTimeEvent):
  """Convenience EventObject for the calls."""

  DATA_TYPE = 'skype:event:call'

  def __init__(self, timestamp, call_type, user_start_call,
               source, destination, video_conference):
    """Contains information if the call was cancelled, accepted or finished.

      Args:
        timestamp: the timestamp of the event.
        call_type: WAITING, STARTED, FINISHED.
        user_start_call: boolean, true indicates that the owner
                         account started the call.
        source: the account which started the call.
        destination: the account which gets the call.
        video_conference: boolean, if is true it was a videoconference.
    """

    super(SkypeCallEvent, self).__init__(
        timestamp, 'Call from Skype', self.DATA_TYPE)

    self.call_type = call_type
    self.user_start_call = user_start_call
    self.src_call = source
    self.dst_call = destination
    self.video_conference = video_conference


class SkypeTransferFileEvent(time_events.PosixTimeEvent):
  """Evaluate the action of send a file."""

  DATA_TYPE = 'skype:event:transferfile'

  def __init__(self, row, timestamp, action_type, source, destination):
    """Actions related with sending files.

      Args:
        row:
          filepath: path from the file.
          filename: name of the file.
          filesize: size of the file.
        timestamp: when the action happens.
        action_type: GETSOLICITUDE, SENDSOLICITUDE, ACCEPTED, FINISHED.
        source: The account that sent the file.
        destination: The account that received the file.
    """

    super(SkypeTransferFileEvent, self).__init__(
        timestamp, 'File transfer from Skype', self.DATA_TYPE)

    self.offset = row['id']
    self.action_type = action_type
    self.source = source
    self.destination = destination
    self.transferred_filepath = row['filepath']
    self.transferred_filename = row['filename']
    try:
      self.transferred_filesize = int(row['filesize'])
    except ValueError:
      logging.debug(u'Unknown filesize {0:s}'.format(
          self.transferred_filename))
      self.transferred_filesize = 0

And that's it really, we only need to worry about filling in the values for the class constants and then create the appropriate call back functions (and potentially create some assistant event objects).

Here is part of the code for the Skype parser, it is longer on the site, however for simplicity reasons it only shows one query and the resulting call back function for it.


In [ ]:
class SkypePluginFoo(interface.SQLitePlugin):
  """SQLite plugin for Skype main.db SQlite database file."""

  # Append foo to the name so this can be registered on the side with the other Skype
  # parser. This is also only a partial parser (for simplicity reasons).
  NAME = 'skype_foo'

  # Provide the description field.
  DESCRIPTION = u'Parser for Skype SQLite database files.'

  # Queries for building cache. Since we do want to use cache for the Skype plugin
  # we define the SQLite commands for it here. There are two cache queries we want
  # to perform, one to get destinations from the transfers table and the other to
  # get source from the same table.
  QUERY_DEST_FROM_TRANSFER = (
      u'SELECT parent_id, partner_handle AS skypeid, '
      u'partner_dispname AS skypename FROM transfers')
  QUERY_SOURCE_FROM_TRANSFER = (
      u'SELECT pk_id, partner_handle AS skypeid, '
      u'partner_dispname AS skypename FROM transfers')

  # Define the needed queries. We could only have one query here to show the
  # plugin and how it works without the extra complexity, but instead all are
  # incldued, to show how a plugin can be expanded to include queries to several
  # tables.
  QUERIES = [
      (('SELECT c.id, c.participants, c.friendlyname AS title, '
        'm.author AS author, m.from_dispname AS from_displayname, '
        'm.body_xml, m.timestamp, c.dialog_partner FROM Chats c, Messages m '
        'WHERE c.name = m.chatname'), 'ParseChat'),
      (('SELECT id, fullname, given_displayname, emails, '
        'country, profile_timestamp, authreq_timestamp, '
        'lastonline_timestamp, mood_timestamp, sent_authrequest_time, '
        'lastused_timestamp FROM Accounts'), 'ParseAccountInformation'),
      (('SELECT id, target_numbers AS dstnum_sms, timestamp AS time_sms, '
        'body AS msg_sms FROM SMSes'), 'ParseSMS'),
      (('SELECT id, partner_handle, partner_dispname, offer_send_list, '
        'starttime, accepttime, finishtime, filepath, filename, filesize, '
        'status, parent_id, pk_id FROM Transfers'), 'ParseFileTransfer'),
      (('SELECT c.id, cm.guid, c.is_incoming, '
        'cm.call_db_id, cm.videostatus, c.begin_timestamp AS try_call, '
        'cm.start_timestamp AS accept_call, cm.call_duration '
        'FROM Calls c, CallMembers cm '
        'WHERE c.id = cm.call_db_id;'), 'ParseCall')]

  # The required tables.
  REQUIRED_TABLES = frozenset(
      ['Chats', 'Accounts', 'Conversations', 'Contacts', 'SMSes', 'Transfers',
       'CallMembers', 'Calls'])

  def ParseAccountInformation(
      self, parser_context, row, query=None, **unused_kwargs):
    """Parses the Accounts database.

    Args:
      parser_context: A parser context object (instance of ParserContext).
      row: The row resulting from the query.
      query: Optional query string. The default is None.
    """
    if row['profile_timestamp']:
      event_object = SkypeAccountEvent(
          row['profile_timestamp'], u'Profile Changed', row['id'],
          row['fullname'], row['given_displayname'], row['emails'],
          row['country'])
      parser_context.ProduceEvent(
          event_object, plugin_name=self.NAME, query=query)

    if row['authreq_timestamp']:
      event_object = SkypeAccountEvent(
          row['authreq_timestamp'], u'Authenticate Request', row['id'],
          row['fullname'], row['given_displayname'], row['emails'],
          row['country'])
      parser_context.ProduceEvent(
          event_object, plugin_name=self.NAME, query=query)

    if row['lastonline_timestamp']:
      event_object = SkypeAccountEvent(
          row['lastonline_timestamp'], u'Last Online', row['id'],
          row['fullname'], row['given_displayname'], row['emails'],
          row['country'])
      parser_context.ProduceEvent(
          event_object, plugin_name=self.NAME, query=query)

    if row['mood_timestamp']:
      event_object = SkypeAccountEvent(
          row['mood_timestamp'], u'Mood Event', row['id'],
          row['fullname'], row['given_displayname'], row['emails'],
          row['country'])
      parser_context.ProduceEvent(
          event_object, plugin_name=self.NAME, query=query)

    if row['sent_authrequest_time']:
      event_object = SkypeAccountEvent(
          row['sent_authrequest_time'], u'Auth Request Sent', row['id'],
          row['fullname'], row['given_displayname'], row['emails'],
          row['country'])
      parser_context.ProduceEvent(
          event_object, plugin_name=self.NAME, query=query)

    if row['lastused_timestamp']:
      event_object = SkypeAccountEvent(
          row['lastused_timestamp'], u'Last Used', row['id'],
          row['fullname'], row['given_displayname'], row['emails'],
          row['country'])
      parser_context.ProduceEvent(
          event_object, plugin_name=self.NAME, query=query)

  def ParseChat(self, parser_context, row, query=None, **unused_kwargs):
    """Parses a chat message row.

    Args:
      parser_context: A parser context object (instance of ParserContext).
      row: The row resulting from the query.
      query: Optional query string. The default is None.
    """
    to_account = ''
    accounts = []
    participants = row['participants'].split(' ')
    for participant in participants:
      if participant != row['author']:
        accounts.append(participant)
    to_account = u', '.join(accounts)

    if not to_account:
      if row['dialog_partner']:
        to_account = row['dialog_partner']
      else:
        to_account = u'Unknown User'

    event_object = SkypeChatEvent(row, to_account)
    parser_context.ProduceEvent(
        event_object, plugin_name=self.NAME, query=query)

  def ParseSMS(self, parser_context, row, query=None, **unused_kwargs):
    """Parse SMS.

    Args:
      parser_context: A parser context object (instance of ParserContext).
      row: The row resulting from the query.
      query: Optional query string. The default is None.
    """
    dst_number = row['dstnum_sms'].replace(' ', '')

    event_object = SkypeSMSEvent(row, dst_number)
    parser_context.ProduceEvent(
        event_object, plugin_name=self.NAME, query=query)

  def ParseCall(self, parser_context, row, query=None, **unused_kwargs):
    """Parse the calls taking into accounts some rows.

    Args:
      parser_context: A parser context object (instance of ParserContext).
      row: The row resulting from the query.
      query: Optional query string. The default is None.
    """
    try:
      aux = row['guid']
      if aux:
        aux_list = aux.split('-')
        src_aux = aux_list[0]
        dst_aux = aux_list[1]
      else:
        src_aux = u'Unknown [no GUID]'
        dst_aux = u'Unknown [no GUID]'
    except IndexError:
      src_aux = u'Unknown [{0:s}]'.format(row['guid'])
      dst_aux = u'Unknown [{0:s}]'.format(row['guid'])

    if row['is_incoming'] == '0':
      user_start_call = True
      source = src_aux
      if row['ip_address']:
        destination = u'{0:s} <{1:s}>'.format(dst_aux, row['ip_address'])
      else:
        destination = dst_aux
    else:
      user_start_call = False
      source = src_aux
      destination = dst_aux

    if row['videostatus'] == '3':
      video_conference = True
    else:
      video_conference = False

    event_object = SkypeCallEvent(
        row['try_call'], 'WAITING', user_start_call, source, destination,
        video_conference)
    parser_context.ProduceEvent(
        event_object, plugin_name=self.NAME, query=query)

    if row['accept_call']:
      event_object = SkypeCallEvent(
          row['accept_call'], 'ACCEPTED', user_start_call, source, destination,
          video_conference)
      parser_context.ProduceEvent(
          event_object, plugin_name=self.NAME, query=query)

      if row['call_duration']:
        try:
          timestamp = int(row['accept_call']) + int(row['call_duration'])
          event_object = SkypeCallEvent(
              timestamp, 'FINISHED', user_start_call, source, destination,
              video_conference)
          parser_context.ProduceEvent(
              event_object, plugin_name=self.NAME, query=query)

        except ValueError:
          logging.debug((
              u'[{0:s}] Unable to determine when the call {0:s} was '
              u'finished.').format(self.NAME, row['id']))

  def ParseFileTransfer(
      self, parser_context, row, cache=None, database=None, query=None,
      **unused_kwargs):
    """Parse the transfer files.

     There is no direct relationship between who sends the file and
     who accepts the file.

    Args:
      parser_context: A parser context object (instance of ParserContext).
      row: the row with all information related with the file transfers.
      query: Optional query string. The default is None.
      cache: a cache object (instance of SQLiteCache).
      database: A database object (instance of SQLiteDatabase).
    """
    source_dict = cache.GetResults('source')
    if not source_dict:
      cursor = database.cursor
      results = cursor.execute(self.QUERY_SOURCE_FROM_TRANSFER)
      cache.CacheQueryResults(
          results, 'source', 'pk_id', ('skypeid', 'skypename'))
      source_dict = cache.GetResults('source')

    dest_dict = cache.GetResults('destination')
    if not dest_dict:
      cursor = database.cursor
      results = cursor.execute(self.QUERY_DEST_FROM_TRANSFER)
      cache.CacheQueryResults(
          results, 'destination', 'parent_id', ('skypeid', 'skypename'))
      dest_dict = cache.GetResults('destination')

    source = u'Unknown'
    destination = u'Unknown'

    if row['parent_id']:
      destination = u'{0:s} <{1:s}>'.format(
          row['partner_handle'], row['partner_dispname'])
      skype_id, skype_name = source_dict.get(row['parent_id'], [None, None])
      if skype_name:
        source = u'{0:s} <{1:s}>'.format(skype_id, skype_name)
    else:
      source = u'{0:s} <{1:s}>'.format(
          row['partner_handle'], row['partner_dispname'])

      if row['pk_id']:
        skype_id, skype_name = dest_dict.get(row['pk_id'], [None, None])
        if skype_name:
          destination = u'{0:s} <{1:s}>'.format(skype_id, skype_name)

    if row['status'] == 8:
      if row['starttime']:
        event_object = SkypeTransferFileEvent(
            row, row['starttime'], 'GETSOLICITUDE', source, destination)
        parser_context.ProduceEvent(
            event_object, plugin_name=self.NAME, query=query)

      if row['accepttime']:
        event_object = SkypeTransferFileEvent(
            row, row['accepttime'], 'ACCEPTED', source, destination)
        parser_context.ProduceEvent(
            event_object, plugin_name=self.NAME, query=query)

      if row['finishtime']:
        event_object = SkypeTransferFileEvent(
            row, row['finishtime'], 'FINISHED', source, destination)
        parser_context.ProduceEvent(
            event_object, plugin_name=self.NAME, query=query)

    elif row['status'] == 2 and row['starttime']:
      event_object = SkypeTransferFileEvent(
          row, row['starttime'], 'SENDSOLICITUDE', source, destination)
      parser_context.ProduceEvent(
          event_object, plugin_name=self.NAME, query=query)


# And finally we need to register this plugin.
sqlite_parser.SQLiteParser.RegisterPlugin(SkypePluginFoo)

[optional code segment] If you make some modifications to the class above you need to first de-register it before you can register it again. To be able to de-register it use the code block below:


In [ ]:
# OPTIONAL CODE BLOCK !! DON'T EXECUTE UNLESS YOU HAVE MADE SOME 
# CHANGES TO THE PARSER CODE AND WANT TO REGISTER IT AGAIN!!!
sqlite_parser.SQLiteParser.DeregisterPlugin(SkypePluginFoo)

Writing the Formatter

Have you ever noticed the message string when you print out an event?


In [ ]:
from plaso.formatters import manager as formatter_manager

# Import the skype formatter.
from plaso.formatters import skype as skype_formatter

timestamp_now = timelib.Timestamp.GetNow()
timestamp_posix = timestamp_now // 1000000

# Let's create a dummy event.

# For that we need to mimick the "row" object:
dummy_row = {
    'timestamp': timestamp_posix,
    'title': 'This is a good title.',
    'body_xml': 'Please pick up now...',
    'from_displayname': 'Secret Caller',
    'author': 'Me, myself and Irene'}
    
dummy_event = SkypeChatEvent(dummy_row, 'le baron')

# And print the string.
print dummy_event.GetString()

# And to re-iterate, let's print the message string.
message_string, _ = formatter_manager.EventFormatterManager.GetMessageStrings(dummy_event)

print u'MESSAGE STRING: {}'.format(message_string)

Did you notice that in the event above you never really told it how to construct this message string? How does the tool then know how to print it out?

That is the purpose of the formatter. The formatter is a simple class that defines what attributes to use and how they are put together to form this message string.

You're going to need one for any plugin or parser you create (or more precisely any data type that there exist).

The way the formatter works is that it looks at the data_type attribute in the EventObject and matches that to the formatters DATA_TYPE attribute. If they are the same, then the formatter proceeds to processing that EventObject and construct the messsage string.

Formatters go in separate files under plaso/formatters. Don't forget the copyright at the top!

For the most part, you're just setting some values with formats. You'll want to set up structures that you want to see in your timeline.

Most importantly (to re-iterate), the DATA_TYPE must match the data_type attribute from the EventObject from the last section. Watch out for typos here -- there is no warning.

There are two formatters that you can use, the simple EventFormatter and the ConditionalEventFormatter. The former should only be used if you are absolutely sure all the attributes mentioned there are going to be set for each and every event object created. That means that for the vast majority of the formatters the ConditionalEventFormatter should be the formatter of choice.

There are two class constants that should always be set, irrelevant of the choice of formatters:

  • SOURCE_SHORT: This should match one of the common sources, eg. LOG, WEBHIST, etc. This should closely correspond to the TLN format by H. Carvey as a short description of the source, almost like a short name for the category of the source.
  • SOURCE_LONG: Since the category itself is not sufficient to describe the source we have an extra field called SOURCE_LONG that further defines that, for instance a browser history extracted from Chrome browser will have the source short set to WEBHIST, indicating that this comes from a web history, but the SOURCE_LONG contains the text "Chrome History", setting that apart from other browsers.

For the simple EventFormatter two class constants have to be set (or at least one):

  • FORMAT_STRING: An unicode string that contains formatting information, place all attribute names in {}. This is just a typical Python formatting string, so all typical rules apply. Timestamp, filename/path, username, hostname, etc information is presented in other fields and should not be a part of the message string.
  • FORMAT_STRING_SHORT: This is only needed when you think that the resulting message string may exceed 80 characters in with and you don't want that to be shorten, as in you don't want the short message string to just contain the first 77 characters of the longer version you can construct your own condensed one.

If you use the conditional formatter you need to define the following class constants:

  • FORMAT_STRING_PIECES: The same as the FORMAT_STRING, except that this is a list and only one attribute name should be defined per entry. If an attribute is not set in the event object then that particular entry in the list will be omitted.
  • FORMAT_STRING_SHORT_PIECES: Same as the FORMAT_STRING_SHORT except in the same format as FORMAT_STRING_PIECES, that is as a list.

For our purposes we use the skype_formatter here:


In [ ]:
PrintClassHelp(skype_formatter.SkypeChatFormatter)

Test The Plugin

It is very important to test the plugin, to see if it can at least parse our sample dataset.

Parse the File Using The Plugin

We can use the code below to test our parsing, to see if the plugin is capable of parsing registry key we provided it with.


In [ ]:
from plaso.artifacts import knowledge_base
from plaso.lib import errors
from plaso.lib import queue
from plaso.parsers import context
from plaso.parsers import test_lib

# Create the plugin object.
test_plugin = SkypePluginFoo()

print u'Parsing file using: {}'.format(test_plugin.plugin_name)

# Create a parser context object to handle key input/event output.
event_queue = queue.SingleThreadedQueue()
event_queue_producer = queue.EventObjectQueueProducer(event_queue)
parse_error_queue = queue.SingleThreadedQueue()
parse_error_queue_producer = queue.ParseErrorQueueProducer(
        parse_error_queue)
knowledge_base_object = knowledge_base.KnowledgeBase()
context_obj = context.ParserContext(
        event_queue_producer, parse_error_queue_producer,
        knowledge_base_object)

# Now we can start parsing the file using the plugin.
test_plugin.Process(parser_context=context_obj, cache=database_cache, database=database)

# Set up a consumer to read events emited by our plugin.
event_queue_consumer = test_lib.TestEventObjectQueueConsumer(event_queue)
# Read all the events.
event_queue_consumer.ConsumeEventObjects()
event_objects = event_queue_consumer.event_objects

print u'Processing of SQLite database is done.'
print u'Able to extract: {} events from the database.'.format(len(event_objects))

Let's print out the event objects that we managed to extract from this plugin.


In [ ]:
# Print out the content of the extracted events.
for index, event_object in enumerate(event_objects):
  print u'*' * 80
  print u'    EVENT NUMBER: {}'.format(index)
  print u'-'*80
  print u'Event:'
  print event_object.GetString()
  print u''

Writing the Tests

Unit tests are designed to make sure your code is doing what you intended it to do, as well as to let other people know when their refactor broke your code. This will also assist you when you are writing your code by doing a sanity check on your plugin to make sure it works the way you expect it to.

The test go in their own file, in this case the file plaso/sqlite_plugins/skype_test.py.

For the tests to work the formatter needs to imported. However since the formatter is typically named the same name as the actual plugin (or parser) we may need to import the formatter as a separate name. And since you don't actually use the formatter directly in the file you end up with needing a pylint statement to suppress error messages during linting. The other imports you'll see through out this code lab. But since everything is in the same namespace here we don't really need to import the formatter, but this is typically added:

# pylint: disable-msg=unused-import
from plaso.formatters import skype as skype_formatter

The pylint statement needs to be there to make sure that pylint does not complain about an unused import since we are not directly using the formatter, we are just importing it so that it gets registered (othwerise it will not work).

TestCase and setUp()

For a plugin test we will use the appropriate plugin test library, in this case the plaso/parsers/sqlite_plugins/test_lib (or test_lib.SQLitePluginTestCase). This is a simple class that inherits from the plaso parser test lib (which in turn inherits from the unittest.TestCase class), and adds a few functions to make it easier to test SQLite database plugins. You may want to add a setUp() function to open the sample file and set any other variables you expect in the background.

For a SQlite database plugin you typically need to build a cache (if one is needed for the plugin), open the test file from a path (using _GetTestFilePath) and call the "_ParseDatabaseFileWithPlugin" function of the test lib.

Let's first look at what functions are available to us in the SQLite test library.


In [ ]:
from plaso.parsers.sqlite_plugins import test_lib as sqlite_test_lib

PrintClassHelp(sqlite_test_lib.SQLitePluginTestCase)

Write the setUp() function for this class definition:

class SkypePluginTest(test_lib.SQLitePluginTestCase):
  """Tests for the Skype main.db history database plugin."""

  def setUp(self):

While we're setting up the boilerplate of the test, let's add the main function to the bottom of the file. Then we can run the test on its own.

if __name__ == '__main__':
 unittest.main()

Writing the Test

The outline of the main test is to create and run the plugin, then check that the plugin results are correct. You should check a variety of attributes in one row and something about the extracted events in general.

The test needs to start with the word "test". Let's use testProcess() (since that is what we are testing, the Process function fo the plugin). The assertions should include:

  • How many entries were created?
  • For entry[1], is the timestame, username, and full_path correct?
  • For entry[1], are the message strings formatted correctly?

In [ ]:
from plaso.formatters import skype as skype_formatter
from plaso.lib import timelib_test
from plaso.parsers.sqlite_plugins import skype as skype_plugin

class SkypePluginTest(sqlite_test_lib.SQLitePluginTestCase):
  """Tests for the Skype main.db history database plugin."""

  def setUp(self):
    """Sets up the needed objects used throughout the test."""
    self._plugin = skype_plugin.SkypePlugin()

  def testProcess(self):
    """Tests the Process function on a Skype History database file.

      The History file contains 24 events:
          4 call events
          4 transfers file events
          1 sms events
         15 chat events

      Events used:
        id = 16 -> SMS
        id = 22 -> Call
        id = 18 -> File
        id =  1 -> Chat
        id = 14 -> ChatRoom
    """
    # In the actual test file we would use the _GetTestFilePath but here we already
    # have a test file.
    # test_file = self._GetTestFilePath(['skype_main.db'])
    test_file = test_database_path
    
    cache = sqlite_parser.SQLiteCache()
    event_queue_consumer = self._ParseDatabaseFileWithPlugin(
        self._plugin, test_file, cache)
    event_objects = self._GetEventObjectsFromQueue(event_queue_consumer)

    calls = 0
    files = 0
    sms = 0
    chats = 0
    for event_object in event_objects:
      if event_object.data_type == 'skype:event:call':
        calls += 1
      if event_object.data_type == 'skype:event:transferfile':
        files += 1
      if event_object.data_type == 'skype:event:sms':
        sms += 1
      if event_object.data_type == 'skype:event:chat':
        chats += 1

    self.assertEquals(len(event_objects), 24)
    self.assertEquals(files, 4)
    self.assertEquals(sms, 1)
    self.assertEquals(chats, 15)
    self.assertEquals(calls, 3)

    # TODO: Split this up into separate functions for testing each type of
    # event, eg: testSMS, etc.
    sms_event_object = event_objects[16]
    call_event_object = event_objects[22]
    event_file = event_objects[18]
    chat_event_object = event_objects[1]
    chat_room_event_object = event_objects[14]

    # Test cache processing and format strings.
    expected_msg = (
        u'Source: gen.beringer <Gen Beringer> Destination: '
        u'european.bbq.competitor <European BBQ> File: secret-project.pdf '
        u'[SENDSOLICITUDE]')

    self._TestGetMessageStrings(
        event_objects[17], expected_msg, expected_msg[0:77] + '...')

    expected_timestamp = timelib_test.CopyStringToTimestamp(
        '2013-07-01 22:14:22')
    self.assertEquals(sms_event_object.timestamp, expected_timestamp)
    text_sms = (u'If you want I can copy '
                u'some documents for you, '
                u'if you can pay it... ;)')
    self.assertEquals(sms_event_object.text, text_sms)
    number = u'+34123456789'
    self.assertEquals(sms_event_object.number, number)

    expected_timestamp = timelib_test.CopyStringToTimestamp(
        '2013-10-24 21:49:35')
    self.assertEquals(event_file.timestamp, expected_timestamp)

    action_type = u'GETSOLICITUDE'
    self.assertEquals(event_file.action_type, action_type)
    source = u'gen.beringer <Gen Beringer>'
    self.assertEquals(event_file.source, source)
    destination = u'european.bbq.competitor <European BBQ>'
    self.assertEquals(event_file.destination, destination)
    transferred_filename = u'secret-project.pdf'
    self.assertEquals(event_file.transferred_filename, transferred_filename)
    filepath = u'/Users/gberinger/Desktop/secret-project.pdf'
    self.assertEquals(event_file.transferred_filepath, filepath)
    self.assertEquals(event_file.transferred_filesize, 69986)

    expected_timestamp = timelib_test.CopyStringToTimestamp(
        '2013-07-30 21:27:11')
    self.assertEquals(chat_event_object.timestamp, expected_timestamp)

    title = u'European Competitor | need to know if you got it..'
    self.assertEquals(chat_event_object.title, title)
    expected_msg = u'need to know if you got it this time.'
    self.assertEquals(chat_event_object.text, expected_msg)
    from_account = u'Gen Beringer <gen.beringer>'
    self.assertEquals(chat_event_object.from_account, from_account)
    self.assertEquals(chat_event_object.to_account, u'european.bbq.competitor')

    expected_timestamp = timelib_test.CopyStringToTimestamp(
        '2013-10-27 15:29:19')
    self.assertEquals(chat_room_event_object.timestamp, expected_timestamp)

    title = u'European Competitor, Echo123'
    self.assertEquals(chat_room_event_object.title, title)
    expected_msg = u'He is our new employee'
    self.assertEquals(chat_room_event_object.text, expected_msg)
    from_account = u'European Competitor <european.bbq.competitor>'
    self.assertEquals(chat_room_event_object.from_account, from_account)
    to_account = u'gen.beringer, echo123'
    self.assertEquals(chat_room_event_object.to_account, to_account)

    expected_timestamp = timelib_test.CopyStringToTimestamp(
        '2013-07-01 22:12:17')
    self.assertEquals(call_event_object.timestamp, expected_timestamp)

    self.assertEquals(call_event_object.dst_call, u'european.bbq.competitor')
    self.assertEquals(call_event_object.src_call, u'gen.beringer')
    self.assertEquals(call_event_object.user_start_call, False)
    self.assertEquals(call_event_object.video_conference, False)

Running the Test

How will you know what the format string should look like? Well, it's time to run the code we have. Typically the plugin needs to be "compiled" before the test will be able to import it, so we need to make sure the plugin gets picked up for compilation.

Open plaso/parsers/sqlite_plugins/__init__.py. Add an import statement for your new plugin. Save the file.

Normally to run the tests you would either need to run:

python run_tests.py

Or to compile:

python setup.py build && sudo python setup.py install

And then you can run the test directly using:

python plaso/parsers/sqlite_plugins/skype_test.py

Rinse and repeat as you write the tests. If you change the parser, you need to recompile. If you just change the test, you don't.

However since we are writing this in our notebook we just need to make sure we've run all the previous code segments, and if you make changes, just re-run it.

To run the test itself, execute the below code:


In [ ]:
import unittest
my_suite = unittest.TestSuite()
my_suite.addTest(SkypePluginTest('testProcess'))

results = unittest.TextTestRunner(verbosity=3).run(my_suite)

if results.errors:
  print u'Errors came up while trying to run test.'
  for error in results.errors:
    if isinstance(error, basestring):
      print error
    else:
      for sub_error in error:
        print sub_error
elif results.failures:
  print u'Failures came up while trying to run test.'
  for failure in results.failures:
    if isinstance(failure, basestring):
      print failure
    else:
      for sub_failure in failure:
        print sub_failure
else:
  print u'All came out clean.'
  print results

If all went well you should have a fully functioning plugin by now, ready to parse every boot execut registry key you may encounter.

You can start playing around and making changes to the plugin, to see what happens when changes are introduced, or continue and create a new plugin.

The Assignment

Now we have gone through step-by-step how an example SQLite database plugin is created. It is time to take what we've learned so far and create a new plugin.

ATM there is no example assignement for this codelab, like there is for the other codelabs. The assignement is therefore to create a new plugin from scratch for a SQLite database of your choice.

Clean Up

During our test code we created a temporary file, that we may want to delete. To delete it, use the code below:


In [ ]:
import os

if test_database_path:
  os.remove(test_database_path)