This lab is for people who want to learn how to write and execute a Plaso plugin in Python. This tutorial assumes:
This lab will teach you how to write a SQLite database plugin with tests for the Plaso framework. By the end you will be able to:
This lab should take you a couple hours to complete. Some of this is dependent entirely on strange build issues you might have. We are not attempting to get you to check in code yet, this is more to demonstrate how a plugin is written. For this to be a checked in plugin you need to write a plugin against a SQLite database that is not already parsed and split the code here into several files (layout explained below).
Welcome to writing a Plaso plugin! From the outside, writing a plugin can be daunting, but once you get your dev environment going, you've fought half the battle. This code lab features a simple SQLite database plugin, but the formula can be used for any type of plugins (another codelab will demonstrate a Windows Registry plugin and the hope is that with these two codelabs we should have the plugin writing more or less covered). You may also be interested in the text parser codelab.
Get familiar with the developers guide and more specifically the style guide. To make the code easier to maintain we follow a style guide, partially based on the Google Python Style Guide but slightly modified to fit our needs.
We also follow a code review process that is discussed on the style guide site.
This is an iPython notebook, and if you are not familiar with it then here is the brief introduction. This is basically an iPython shell wrapped up in a pretty GUI (browser window). You can execute any Python code you wish, and quickly go back, edit and re-run code. To run the code, click the window with the code segment and press "Shift+Enter", that way you will see that the bracket on the left will change to indicate it has been executed and you may see some output below (if the code segment produced any output).
One thing to make note of is that some of the code segments depend on previous code segments having been executed. So in order for this codelab to work properly you need to execute EVERY code segment that is presented here, especially all class declarations and import statements, but to be sure just execute them all [except those explicitly stated as optional].
To make it easier to get documentation about various classes we may use in the codelab we'll start with defining a simple function to print out help (so to execute, click the code segment below and press "SHIFT+ENTER").
In [1]:
import inspect
# Let's put this in a method so we can easily call it from other parts of the codelab.
def PrintClassHelp(class_object, filter_string=''):
"""Prints a help string for a given class object.
Args:
class_object: The class that we are about to inspect.
filter_string: Filter class members that start with a particular string.
"""
# Print the docstring of the class.
print u''
print class_object.__doc__
# Print information for every member function.
additional_members = []
for member_name, member_value in inspect.getmembers(class_object):
# Check to see if we are filtering out members starting with
# a particular string.
if filter_string and not member_name.startswith(filter_string):
continue
if inspect.ismethod(member_value):
args = inspect.getargspec(member_value)
doc_string = member_value.__doc__
print u'{0}{1:>20s}({2}){0:>10}\n\n{3}\n{4}\n\n'.format(
'*'*5, member_name, u','.join(args.args), doc_string, '-'*80)
else:
if member_name.startswith('_'):
continue
if member_name in ['classes', 'parent_class', 'plugin_feature', 'top_level_class']:
continue
additional_members.append(u'{} = {}'.format(member_name, repr(member_value)))
if additional_members:
print '\n'
print '*** Additional Members of Class ***\n\n ',
print u'\n '.join(additional_members)
During this codelab we will be using the iPython notebook interface for everything, which means we have all the classes and code in a single file. Once we deploy the code to the actual codebase we would need to save the code in several places, typically something like:
And make necessary changes to:
to include the new plugin and formatter in the tool. We may also want to change the plaso/frontend/presets.py to include the plugin in a preset. This still really depends on the plugin itself, sometimes you want to include the parser and all its plugins (like the case of Windows registry plugins), sometimes you only want to load specific plugin(s), which is typically the case with SQLite plugins.
We are however omitting all these details to make the codelab easier to follow along. This can also be used for people to test their plugins and play with them without the need to mess with the codebase and once the plugin is fully functional then create the necessary files and start the code review process.
There are also a lot of comments in the code in this codelab that would typically be omitted from a released plugin. To see the actual code that is used as an example here click on one of the below links (we will only be using parts of that code for demonstration):
We are going to write the plugin completely in this iPython notebook, and test it there too. There is no need for anything else than this notebook, a sample registry file and the plaso libs available.
Before writing a plugin, and now we are assuming we are attempting parse a particular SQLite database, ask yourself these questions:
Remember that we are not about to submit this plugin in for review, since it is already checked in, this is only for demonstration purposes, please refer to the plaso roadmap for open parser/plugin assignments (or add your own).
Before we start looking at the code we need to download the SQLite database to a temporary location so that we can use it for the remainder of this codelab. For this you need an Internet connection.
In [ ]:
# Import a library to make the HTTP connection.
import urllib2
# Import a library so that we can create a temporary file.
import tempfile
# The URL to the SYSTEM hive we are about to use for our testing.
url = u'https://github.com/log2timeline/plaso/raw/master/test_data/skype_main.db'
# Download the file.
response = urllib2.urlopen(url)
data = response.read()
# Save it in a temporary file (we don't want it to be deleted).
test_file = tempfile.NamedTemporaryFile(delete=False)
# Save the name since that is what we will refer to later in the code.
test_database_path = test_file.name
# Write data to it.
test_file.write(data)
# Close the file.
test_file.close()
First things first, every file checked into the project needs a header. That header contains among other copyright information as well as import statements.
The first line after the copyright statement is the doc string that needs to be changed, it should not be longer than 80 characters in width. If you need more than a single line to describe the parser please still only use max 80 characters as the first line, ending with a dot. Then you can create a more detailed description two lines down (an example of that can be seen below).
The import order is defined in the style guide:
Imports are always put at the top of the file, just after any module comments and doc strings and before module globals and constants.
Imports should be grouped with the order being most generic to least generic:
+ standard library imports
+ third-party imports
+ application-specific imports
Within each grouping, imports should be sorted lexicographically, ignoring case, according to each module's full package path.
In [ ]:
#!/usr/bin/python
#
# Copyright 2013 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This file contains a basic Skype SQLite parser."""
import logging
# We need to be able to create new event objects, and more specifically
# get access to timestamped events.
from plaso.events import time_events
# Import the SQLite parser itself.
from plaso.parsers import sqlite as sqlite_parser
# Import the interface for all SQLite database plugins.
from plaso.parsers.sqlite_plugins import interface
We know need to know what kind of plugin you are trying to implement. The "Write A Plugin" section at the plaso documentation site does go slightly into the generic plugin interface and what needs to be done in order to write a plugin. But for now we know we are trying to parse a specific SQLite database using a plugin. If we look at the SQLite database plugin section we notice that the plugin interface is set up relatively simple.
In [ ]:
print PrintClassHelp(interface.SQLitePlugin)
Let's create a database file object from the file we just downloaded.
In [ ]:
# Import necessary libraries from dfVFS so we can open up the file.
from dfvfs.lib import definitions
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver
# Find the file and get a handle to it.
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=test_database_path)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)
# Open up the SQLite database.
database = sqlite_parser.SQLiteDatabase(file_entry)
database.Open()
# Create a SQLite cache object.
database_cache = sqlite_parser.SQLiteCache()
Now we've got the database opened for future use and we can start exploring it. Let's look at what tables are in the database:
In [ ]:
# The SQL command we need to issue to get table names.
table_sql = 'SELECT name FROM sqlite_master WHERE type="table"'
# Create a small little method that we can later re-use here to execute SQL commands and get the results back.
def QueryForResults(sql_command):
"""Execute a SQLite database query and return back a generator of results."""
cursor = database.cursor
try:
results = cursor.execute(sql_command)
except sqlite3.DatabaseError as exception:
print u'SQLite error occured: <{0:s}>'.format(exception)
raise
for row in results:
yield row
print '*'*40 + ' TABLES ' + '*'*40
for row in QueryForResults(table_sql):
print u' + ',
print row[0]
There are quite a few tables there that could be of an assistance to us. Let's look at the table definition, or the schema of these tables.
In [ ]:
for row in QueryForResults('SELECT name, sql FROM sqlite_master WHERE type="table"'):
print u'{0} {1} {0}'.format('*'*10, row[0])
print row[1]
print ''
Let's look for any values there that may have timestamps associated to them or otherwise could provide us with value.
We can also look for VIEW tables:
In [ ]:
for row in QueryForResults('SELECT name, sql FROM sqlite_master WHERE type="view"'):
print u'{0} {1} {0}'.format('*', row[0])
print row[1]
print ''
Looking at the tables above we may have spotted the tables Chats and Messages... let's review them a bit more.
In [ ]:
for row in QueryForResults('SELECT name, sql FROM sqlite_master WHERE type = "table" AND (name = "Chats" OR name = "Messages")'):
print u'{0} Table: {1} {0}'.format('*'*10, row[0])
#white_space = len('CREATE TABLE {} ('.format(row[0])) - 1
sql_string = row[1].replace('(', '(\n ')
print u'\n '.join(sql_string.split(','))
print ''
Inspecting these above tables can lead us to a SQL query like the one below:
In [ ]:
chat_query = (
'SELECT c.id, c.participants, c.friendlyname AS title, '
'm.author AS author, m.from_dispname AS from_displayname, '
'm.body_xml, m.timestamp, c.dialog_partner FROM Chats c, Messages m '
'WHERE c.name = m.chatname')
for index, row in enumerate(QueryForResults(chat_query)):
print '-'*80
print ' ROW RESULT: {}'.format(index)
print '-'*80
for key in row.keys():
print u'{} -> {}'.format(key, row[key])
print '*'*80
print ''
One important disclaimer, since we are taking an already checked in plugin to use as an example, to avoid all namespace collitions we are appending the word "Foo" or "foo" to many of the class names and other fields.
In plaso terms as soon as we've got more than a single "parser" that attempts to parse a particular file format we convert that to a plugin system. Then a very generic parser can be created that takes care of all file format parsing, leaving the plugins to do minimal work, just defining few class constants that are used to match the particular file or file segments to what the plugin is designed to parse and then a function to process the data collected.
For SQLite database plugins we need to define the following class attributes:
The underlying parser takes care of openinp up the database, query the available tables and compare that list to the list you provided in the REQUIRED_TABLES. If that list is determined as a subset of the actual tables the parser takes the queries defined in QUERIES and executes them one-by-one on the database.
For every row that comes out of each query the parser calls the named call back function that is defined in the QUERIES list with the row, cache object and potentially other objects as well.
For instance, if we define the SQL command:
QUERIES = [('SELECT foo FROM bar', 'ParseFoo')]
Then the parser executes the command "SELECT foo FROM bar" and for every result that comes from that the function "ParseFoo" is called. This call back function needs to be defined in the plugin and accept the correct parameters:
def ParseFoo(self, parser_context, row, query=None, **unused_kwargs):
Or if you need access to a database cache object:
def ParseFoo(self, parser_context, row, query=None, cache, **unused_kwargs):
The SQLite database cache object that gets passed as a parameter to each SQLite call back function is an object that can be used to store cached data that you may need. Let's imagine a scenario...
A database table defines a list of paths and filenames. Each row in the database contains the file name and an identification value for the parent value. To fully construct the path one would need to follow that parent id, query the database for that ID value, and see if that entry had a parent, etc. Some of this could be achieved using complex JOIN statements in SQLite, however sometimes it is just easier to run a single SQL command to get all these values, store that as a cache and then quickly look up values in the cache when needed.
That is what the cache object can be used for. Let's look at the definition for the cache:
In [ ]:
PrintClassHelp(sqlite_parser.SQLiteCache)
Let's look at one example on how to use the cache:
In [ ]:
# Create new cache instance.
another_cache = sqlite_parser.SQLiteCache()
# Create a SQL command we would like to issue to the database and cache the results of.
sql_command = u'SELECT parent_id, partner_handle AS skypeid, partner_dispname AS skypename FROM transfers'
# Get a cursor so we can execute the SQL command.
cursor = database.cursor
# Execute the SQL command and get back a result set.
results = cursor.execute(sql_command)
# And now we can build the cache based on that result set.
# This get's explained a bit more later on.
another_cache.CacheQueryResults(
results, 'destination', 'parent_id', ('skypeid', 'skypename'))
# And fetch the recently added entry here.
destination_dict = another_cache.GetResults('destination')
print destination_dict
To be able to better understand the CacheQueryResults function let's print the docstring:
In [ ]:
PrintClassHelp(another_cache.CacheQueryResults)
OK, so let's go back to that code of ours and see how that builds up that cache dict object:
cache.CacheQueryResults(
results, 'destination', 'parent_id', ('skypeid', 'skypename'))
We call the function with the parameters set as:
To go over this specific example, what we are asking the cache to do is to create a dict object that is called "destination" and store the results from each row in that in the following way:
cache.destination = {}
for row in results:
cache.destination[row['parent_id']] = (row['skypeid'], row['skypename'])
This is roughly what happens. What we end up is a dict that we can use based on the "parent_id" and it will give us the appropriate values for "skypid" and "skypename".
An example here:
In [ ]:
skype_id, skype_name = destination_dict.get(23445435)
print u'ID: {}\nNAME: {}'.format(skype_id, skype_name)
In [ ]:
# Import the library we are about to inspect.
from plaso.lib import timelib
# You can easily change the name of the class here if you want to explore a different
# class and it's members.
PrintClassHelp(timelib.Timestamp)
In [ ]:
class SkypeChatEvent(time_events.PosixTimeEvent):
"""Convenience class for a Skype event."""
# Define the data type, this is very important and needs to have a 1:1
# mapping to the formatter for the event.
DATA_TYPE = 'skype:event:chat'
def __init__(self, row, to_account):
"""Build a Skype Event from a single row.
Args:
row: A row object (instance of sqlite3.Row) that contains the
extracted data from a single row in the database.
to_account: A string containing the accounts (excluding the
author) of the conversation.
"""
# First thing we need to do is to call the "super" class or the parent
# of the event. This particular event object inherits from a class
# called event.PosixTimeEvent, which expects the timestamp sent to
# it to be POSIX or Epoch in UTC.
# The parameters that need to be passed to the PosixTimeEvent are:
# timestamp: Timestamp in POSIX or Epoch since UTC.
# timestamp_desc: Description of the timestamp, eg "Last Written Time".
# data_type: The data type of this event object.
super(SkypeChatEvent, self).__init__(
row['timestamp'], 'Chat from Skype', self.DATA_TYPE)
# We can now set other attributes that we need in order to better format
# the message. These attributes vary depending on the source they come from.
self.title = row['title']
self.text = row['body_xml']
self.from_account = u'{0:s} <{1:s}>'.format(
row['from_displayname'], row['author'])
self.to_account = to_account
The event object is pretty simple really. We need to set the following keys:
This inherits from event.PosixTimeEvent:
In [ ]:
PrintClassHelp(time_events.PosixTimeEvent, '__init__')
The parent class needs the following parameters:
And now we just need to define the rest of the needed EventObjects for the plugin to work:
In [ ]:
class SkypeAccountEvent(time_events.PosixTimeEvent):
"""Convenience class for account information."""
DATA_TYPE = 'skype:event:account'
def __init__(
self, timestamp, usage, identifier, full_name, display_name, email,
country):
"""Initialize the event.
Args:
timestamp: The POSIX timestamp value.
usage: A string containing the description string of the timestamp.
identifier: The row identifier.
full_name: A string containing the full name of the Skype account holder.
display_name: A string containing the chosen display name of the account
holder.
email: A string containing the registered email address of the account
holder.
country: A string containing the chosen home country of the account
holder.
"""
super(SkypeAccountEvent, self).__init__(timestamp, usage)
self.offset = identifier
self.username = u'{0:s} <{1:s}>'.format(full_name, display_name)
self.display_name = display_name
self.email = email
self.country = country
self.data_type = self.DATA_TYPE
class SkypeSMSEvent(time_events.PosixTimeEvent):
"""Convenience EventObject for SMS."""
DATA_TYPE = 'skype:event:sms'
def __init__(self, row, dst_number):
"""Read the information related with the SMS.
Args:
row: row form the sql query.
row['time_sms']: timestamp when the sms was send.
row['dstnum_sms']: number which receives the sms.
row['msg_sms']: text send to this sms.
dst_number: phone number where the user send the sms.
"""
super(SkypeSMSEvent, self).__init__(
row['time_sms'], 'SMS from Skype', self.DATA_TYPE)
self.number = dst_number
self.text = row['msg_sms']
class SkypeCallEvent(time_events.PosixTimeEvent):
"""Convenience EventObject for the calls."""
DATA_TYPE = 'skype:event:call'
def __init__(self, timestamp, call_type, user_start_call,
source, destination, video_conference):
"""Contains information if the call was cancelled, accepted or finished.
Args:
timestamp: the timestamp of the event.
call_type: WAITING, STARTED, FINISHED.
user_start_call: boolean, true indicates that the owner
account started the call.
source: the account which started the call.
destination: the account which gets the call.
video_conference: boolean, if is true it was a videoconference.
"""
super(SkypeCallEvent, self).__init__(
timestamp, 'Call from Skype', self.DATA_TYPE)
self.call_type = call_type
self.user_start_call = user_start_call
self.src_call = source
self.dst_call = destination
self.video_conference = video_conference
class SkypeTransferFileEvent(time_events.PosixTimeEvent):
"""Evaluate the action of send a file."""
DATA_TYPE = 'skype:event:transferfile'
def __init__(self, row, timestamp, action_type, source, destination):
"""Actions related with sending files.
Args:
row:
filepath: path from the file.
filename: name of the file.
filesize: size of the file.
timestamp: when the action happens.
action_type: GETSOLICITUDE, SENDSOLICITUDE, ACCEPTED, FINISHED.
source: The account that sent the file.
destination: The account that received the file.
"""
super(SkypeTransferFileEvent, self).__init__(
timestamp, 'File transfer from Skype', self.DATA_TYPE)
self.offset = row['id']
self.action_type = action_type
self.source = source
self.destination = destination
self.transferred_filepath = row['filepath']
self.transferred_filename = row['filename']
try:
self.transferred_filesize = int(row['filesize'])
except ValueError:
logging.debug(u'Unknown filesize {0:s}'.format(
self.transferred_filename))
self.transferred_filesize = 0
And that's it really, we only need to worry about filling in the values for the class constants and then create the appropriate call back functions (and potentially create some assistant event objects).
Here is part of the code for the Skype parser, it is longer on the site, however for simplicity reasons it only shows one query and the resulting call back function for it.
In [ ]:
class SkypePluginFoo(interface.SQLitePlugin):
"""SQLite plugin for Skype main.db SQlite database file."""
# Append foo to the name so this can be registered on the side with the other Skype
# parser. This is also only a partial parser (for simplicity reasons).
NAME = 'skype_foo'
# Provide the description field.
DESCRIPTION = u'Parser for Skype SQLite database files.'
# Queries for building cache. Since we do want to use cache for the Skype plugin
# we define the SQLite commands for it here. There are two cache queries we want
# to perform, one to get destinations from the transfers table and the other to
# get source from the same table.
QUERY_DEST_FROM_TRANSFER = (
u'SELECT parent_id, partner_handle AS skypeid, '
u'partner_dispname AS skypename FROM transfers')
QUERY_SOURCE_FROM_TRANSFER = (
u'SELECT pk_id, partner_handle AS skypeid, '
u'partner_dispname AS skypename FROM transfers')
# Define the needed queries. We could only have one query here to show the
# plugin and how it works without the extra complexity, but instead all are
# incldued, to show how a plugin can be expanded to include queries to several
# tables.
QUERIES = [
(('SELECT c.id, c.participants, c.friendlyname AS title, '
'm.author AS author, m.from_dispname AS from_displayname, '
'm.body_xml, m.timestamp, c.dialog_partner FROM Chats c, Messages m '
'WHERE c.name = m.chatname'), 'ParseChat'),
(('SELECT id, fullname, given_displayname, emails, '
'country, profile_timestamp, authreq_timestamp, '
'lastonline_timestamp, mood_timestamp, sent_authrequest_time, '
'lastused_timestamp FROM Accounts'), 'ParseAccountInformation'),
(('SELECT id, target_numbers AS dstnum_sms, timestamp AS time_sms, '
'body AS msg_sms FROM SMSes'), 'ParseSMS'),
(('SELECT id, partner_handle, partner_dispname, offer_send_list, '
'starttime, accepttime, finishtime, filepath, filename, filesize, '
'status, parent_id, pk_id FROM Transfers'), 'ParseFileTransfer'),
(('SELECT c.id, cm.guid, c.is_incoming, '
'cm.call_db_id, cm.videostatus, c.begin_timestamp AS try_call, '
'cm.start_timestamp AS accept_call, cm.call_duration '
'FROM Calls c, CallMembers cm '
'WHERE c.id = cm.call_db_id;'), 'ParseCall')]
# The required tables.
REQUIRED_TABLES = frozenset(
['Chats', 'Accounts', 'Conversations', 'Contacts', 'SMSes', 'Transfers',
'CallMembers', 'Calls'])
def ParseAccountInformation(
self, parser_context, row, query=None, **unused_kwargs):
"""Parses the Accounts database.
Args:
parser_context: A parser context object (instance of ParserContext).
row: The row resulting from the query.
query: Optional query string. The default is None.
"""
if row['profile_timestamp']:
event_object = SkypeAccountEvent(
row['profile_timestamp'], u'Profile Changed', row['id'],
row['fullname'], row['given_displayname'], row['emails'],
row['country'])
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['authreq_timestamp']:
event_object = SkypeAccountEvent(
row['authreq_timestamp'], u'Authenticate Request', row['id'],
row['fullname'], row['given_displayname'], row['emails'],
row['country'])
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['lastonline_timestamp']:
event_object = SkypeAccountEvent(
row['lastonline_timestamp'], u'Last Online', row['id'],
row['fullname'], row['given_displayname'], row['emails'],
row['country'])
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['mood_timestamp']:
event_object = SkypeAccountEvent(
row['mood_timestamp'], u'Mood Event', row['id'],
row['fullname'], row['given_displayname'], row['emails'],
row['country'])
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['sent_authrequest_time']:
event_object = SkypeAccountEvent(
row['sent_authrequest_time'], u'Auth Request Sent', row['id'],
row['fullname'], row['given_displayname'], row['emails'],
row['country'])
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['lastused_timestamp']:
event_object = SkypeAccountEvent(
row['lastused_timestamp'], u'Last Used', row['id'],
row['fullname'], row['given_displayname'], row['emails'],
row['country'])
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
def ParseChat(self, parser_context, row, query=None, **unused_kwargs):
"""Parses a chat message row.
Args:
parser_context: A parser context object (instance of ParserContext).
row: The row resulting from the query.
query: Optional query string. The default is None.
"""
to_account = ''
accounts = []
participants = row['participants'].split(' ')
for participant in participants:
if participant != row['author']:
accounts.append(participant)
to_account = u', '.join(accounts)
if not to_account:
if row['dialog_partner']:
to_account = row['dialog_partner']
else:
to_account = u'Unknown User'
event_object = SkypeChatEvent(row, to_account)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
def ParseSMS(self, parser_context, row, query=None, **unused_kwargs):
"""Parse SMS.
Args:
parser_context: A parser context object (instance of ParserContext).
row: The row resulting from the query.
query: Optional query string. The default is None.
"""
dst_number = row['dstnum_sms'].replace(' ', '')
event_object = SkypeSMSEvent(row, dst_number)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
def ParseCall(self, parser_context, row, query=None, **unused_kwargs):
"""Parse the calls taking into accounts some rows.
Args:
parser_context: A parser context object (instance of ParserContext).
row: The row resulting from the query.
query: Optional query string. The default is None.
"""
try:
aux = row['guid']
if aux:
aux_list = aux.split('-')
src_aux = aux_list[0]
dst_aux = aux_list[1]
else:
src_aux = u'Unknown [no GUID]'
dst_aux = u'Unknown [no GUID]'
except IndexError:
src_aux = u'Unknown [{0:s}]'.format(row['guid'])
dst_aux = u'Unknown [{0:s}]'.format(row['guid'])
if row['is_incoming'] == '0':
user_start_call = True
source = src_aux
if row['ip_address']:
destination = u'{0:s} <{1:s}>'.format(dst_aux, row['ip_address'])
else:
destination = dst_aux
else:
user_start_call = False
source = src_aux
destination = dst_aux
if row['videostatus'] == '3':
video_conference = True
else:
video_conference = False
event_object = SkypeCallEvent(
row['try_call'], 'WAITING', user_start_call, source, destination,
video_conference)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['accept_call']:
event_object = SkypeCallEvent(
row['accept_call'], 'ACCEPTED', user_start_call, source, destination,
video_conference)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['call_duration']:
try:
timestamp = int(row['accept_call']) + int(row['call_duration'])
event_object = SkypeCallEvent(
timestamp, 'FINISHED', user_start_call, source, destination,
video_conference)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
except ValueError:
logging.debug((
u'[{0:s}] Unable to determine when the call {0:s} was '
u'finished.').format(self.NAME, row['id']))
def ParseFileTransfer(
self, parser_context, row, cache=None, database=None, query=None,
**unused_kwargs):
"""Parse the transfer files.
There is no direct relationship between who sends the file and
who accepts the file.
Args:
parser_context: A parser context object (instance of ParserContext).
row: the row with all information related with the file transfers.
query: Optional query string. The default is None.
cache: a cache object (instance of SQLiteCache).
database: A database object (instance of SQLiteDatabase).
"""
source_dict = cache.GetResults('source')
if not source_dict:
cursor = database.cursor
results = cursor.execute(self.QUERY_SOURCE_FROM_TRANSFER)
cache.CacheQueryResults(
results, 'source', 'pk_id', ('skypeid', 'skypename'))
source_dict = cache.GetResults('source')
dest_dict = cache.GetResults('destination')
if not dest_dict:
cursor = database.cursor
results = cursor.execute(self.QUERY_DEST_FROM_TRANSFER)
cache.CacheQueryResults(
results, 'destination', 'parent_id', ('skypeid', 'skypename'))
dest_dict = cache.GetResults('destination')
source = u'Unknown'
destination = u'Unknown'
if row['parent_id']:
destination = u'{0:s} <{1:s}>'.format(
row['partner_handle'], row['partner_dispname'])
skype_id, skype_name = source_dict.get(row['parent_id'], [None, None])
if skype_name:
source = u'{0:s} <{1:s}>'.format(skype_id, skype_name)
else:
source = u'{0:s} <{1:s}>'.format(
row['partner_handle'], row['partner_dispname'])
if row['pk_id']:
skype_id, skype_name = dest_dict.get(row['pk_id'], [None, None])
if skype_name:
destination = u'{0:s} <{1:s}>'.format(skype_id, skype_name)
if row['status'] == 8:
if row['starttime']:
event_object = SkypeTransferFileEvent(
row, row['starttime'], 'GETSOLICITUDE', source, destination)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['accepttime']:
event_object = SkypeTransferFileEvent(
row, row['accepttime'], 'ACCEPTED', source, destination)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
if row['finishtime']:
event_object = SkypeTransferFileEvent(
row, row['finishtime'], 'FINISHED', source, destination)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
elif row['status'] == 2 and row['starttime']:
event_object = SkypeTransferFileEvent(
row, row['starttime'], 'SENDSOLICITUDE', source, destination)
parser_context.ProduceEvent(
event_object, plugin_name=self.NAME, query=query)
# And finally we need to register this plugin.
sqlite_parser.SQLiteParser.RegisterPlugin(SkypePluginFoo)
[optional code segment] If you make some modifications to the class above you need to first de-register it before you can register it again. To be able to de-register it use the code block below:
In [ ]:
# OPTIONAL CODE BLOCK !! DON'T EXECUTE UNLESS YOU HAVE MADE SOME
# CHANGES TO THE PARSER CODE AND WANT TO REGISTER IT AGAIN!!!
sqlite_parser.SQLiteParser.DeregisterPlugin(SkypePluginFoo)
In [ ]:
from plaso.formatters import manager as formatter_manager
# Import the skype formatter.
from plaso.formatters import skype as skype_formatter
timestamp_now = timelib.Timestamp.GetNow()
timestamp_posix = timestamp_now // 1000000
# Let's create a dummy event.
# For that we need to mimick the "row" object:
dummy_row = {
'timestamp': timestamp_posix,
'title': 'This is a good title.',
'body_xml': 'Please pick up now...',
'from_displayname': 'Secret Caller',
'author': 'Me, myself and Irene'}
dummy_event = SkypeChatEvent(dummy_row, 'le baron')
# And print the string.
print dummy_event.GetString()
# And to re-iterate, let's print the message string.
message_string, _ = formatter_manager.EventFormatterManager.GetMessageStrings(dummy_event)
print u'MESSAGE STRING: {}'.format(message_string)
Did you notice that in the event above you never really told it how to construct this message string? How does the tool then know how to print it out?
That is the purpose of the formatter. The formatter is a simple class that defines what attributes to use and how they are put together to form this message string.
You're going to need one for any plugin or parser you create (or more precisely any data type that there exist).
The way the formatter works is that it looks at the data_type attribute in the EventObject and matches that to the formatters DATA_TYPE attribute. If they are the same, then the formatter proceeds to processing that EventObject and construct the messsage string.
Formatters go in separate files under plaso/formatters. Don't forget the copyright at the top!
For the most part, you're just setting some values with formats. You'll want to set up structures that you want to see in your timeline.
Most importantly (to re-iterate), the DATA_TYPE must match the data_type attribute from the EventObject from the last section. Watch out for typos here -- there is no warning.
There are two formatters that you can use, the simple EventFormatter and the ConditionalEventFormatter. The former should only be used if you are absolutely sure all the attributes mentioned there are going to be set for each and every event object created. That means that for the vast majority of the formatters the ConditionalEventFormatter should be the formatter of choice.
There are two class constants that should always be set, irrelevant of the choice of formatters:
For the simple EventFormatter two class constants have to be set (or at least one):
If you use the conditional formatter you need to define the following class constants:
For our purposes we use the skype_formatter here:
In [ ]:
PrintClassHelp(skype_formatter.SkypeChatFormatter)
In [ ]:
from plaso.artifacts import knowledge_base
from plaso.lib import errors
from plaso.lib import queue
from plaso.parsers import context
from plaso.parsers import test_lib
# Create the plugin object.
test_plugin = SkypePluginFoo()
print u'Parsing file using: {}'.format(test_plugin.plugin_name)
# Create a parser context object to handle key input/event output.
event_queue = queue.SingleThreadedQueue()
event_queue_producer = queue.EventObjectQueueProducer(event_queue)
parse_error_queue = queue.SingleThreadedQueue()
parse_error_queue_producer = queue.ParseErrorQueueProducer(
parse_error_queue)
knowledge_base_object = knowledge_base.KnowledgeBase()
context_obj = context.ParserContext(
event_queue_producer, parse_error_queue_producer,
knowledge_base_object)
# Now we can start parsing the file using the plugin.
test_plugin.Process(parser_context=context_obj, cache=database_cache, database=database)
# Set up a consumer to read events emited by our plugin.
event_queue_consumer = test_lib.TestEventObjectQueueConsumer(event_queue)
# Read all the events.
event_queue_consumer.ConsumeEventObjects()
event_objects = event_queue_consumer.event_objects
print u'Processing of SQLite database is done.'
print u'Able to extract: {} events from the database.'.format(len(event_objects))
Let's print out the event objects that we managed to extract from this plugin.
In [ ]:
# Print out the content of the extracted events.
for index, event_object in enumerate(event_objects):
print u'*' * 80
print u' EVENT NUMBER: {}'.format(index)
print u'-'*80
print u'Event:'
print event_object.GetString()
print u''
Unit tests are designed to make sure your code is doing what you intended it to do, as well as to let other people know when their refactor broke your code. This will also assist you when you are writing your code by doing a sanity check on your plugin to make sure it works the way you expect it to.
The test go in their own file, in this case the file plaso/sqlite_plugins/skype_test.py.
For the tests to work the formatter needs to imported. However since the formatter is typically named the same name as the actual plugin (or parser) we may need to import the formatter as a separate name. And since you don't actually use the formatter directly in the file you end up with needing a pylint statement to suppress error messages during linting. The other imports you'll see through out this code lab. But since everything is in the same namespace here we don't really need to import the formatter, but this is typically added:
# pylint: disable-msg=unused-import
from plaso.formatters import skype as skype_formatter
The pylint statement needs to be there to make sure that pylint does not complain about an unused import since we are not directly using the formatter, we are just importing it so that it gets registered (othwerise it will not work).
For a plugin test we will use the appropriate plugin test library, in this case the plaso/parsers/sqlite_plugins/test_lib (or test_lib.SQLitePluginTestCase). This is a simple class that inherits from the plaso parser test lib (which in turn inherits from the unittest.TestCase class), and adds a few functions to make it easier to test SQLite database plugins. You may want to add a setUp() function to open the sample file and set any other variables you expect in the background.
For a SQlite database plugin you typically need to build a cache (if one is needed for the plugin), open the test file from a path (using _GetTestFilePath) and call the "_ParseDatabaseFileWithPlugin" function of the test lib.
Let's first look at what functions are available to us in the SQLite test library.
In [ ]:
from plaso.parsers.sqlite_plugins import test_lib as sqlite_test_lib
PrintClassHelp(sqlite_test_lib.SQLitePluginTestCase)
Write the setUp() function for this class definition:
class SkypePluginTest(test_lib.SQLitePluginTestCase):
"""Tests for the Skype main.db history database plugin."""
def setUp(self):
While we're setting up the boilerplate of the test, let's add the main function to the bottom of the file. Then we can run the test on its own.
if __name__ == '__main__':
unittest.main()
The outline of the main test is to create and run the plugin, then check that the plugin results are correct. You should check a variety of attributes in one row and something about the extracted events in general.
The test needs to start with the word "test". Let's use testProcess() (since that is what we are testing, the Process function fo the plugin). The assertions should include:
In [ ]:
from plaso.formatters import skype as skype_formatter
from plaso.lib import timelib_test
from plaso.parsers.sqlite_plugins import skype as skype_plugin
class SkypePluginTest(sqlite_test_lib.SQLitePluginTestCase):
"""Tests for the Skype main.db history database plugin."""
def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._plugin = skype_plugin.SkypePlugin()
def testProcess(self):
"""Tests the Process function on a Skype History database file.
The History file contains 24 events:
4 call events
4 transfers file events
1 sms events
15 chat events
Events used:
id = 16 -> SMS
id = 22 -> Call
id = 18 -> File
id = 1 -> Chat
id = 14 -> ChatRoom
"""
# In the actual test file we would use the _GetTestFilePath but here we already
# have a test file.
# test_file = self._GetTestFilePath(['skype_main.db'])
test_file = test_database_path
cache = sqlite_parser.SQLiteCache()
event_queue_consumer = self._ParseDatabaseFileWithPlugin(
self._plugin, test_file, cache)
event_objects = self._GetEventObjectsFromQueue(event_queue_consumer)
calls = 0
files = 0
sms = 0
chats = 0
for event_object in event_objects:
if event_object.data_type == 'skype:event:call':
calls += 1
if event_object.data_type == 'skype:event:transferfile':
files += 1
if event_object.data_type == 'skype:event:sms':
sms += 1
if event_object.data_type == 'skype:event:chat':
chats += 1
self.assertEquals(len(event_objects), 24)
self.assertEquals(files, 4)
self.assertEquals(sms, 1)
self.assertEquals(chats, 15)
self.assertEquals(calls, 3)
# TODO: Split this up into separate functions for testing each type of
# event, eg: testSMS, etc.
sms_event_object = event_objects[16]
call_event_object = event_objects[22]
event_file = event_objects[18]
chat_event_object = event_objects[1]
chat_room_event_object = event_objects[14]
# Test cache processing and format strings.
expected_msg = (
u'Source: gen.beringer <Gen Beringer> Destination: '
u'european.bbq.competitor <European BBQ> File: secret-project.pdf '
u'[SENDSOLICITUDE]')
self._TestGetMessageStrings(
event_objects[17], expected_msg, expected_msg[0:77] + '...')
expected_timestamp = timelib_test.CopyStringToTimestamp(
'2013-07-01 22:14:22')
self.assertEquals(sms_event_object.timestamp, expected_timestamp)
text_sms = (u'If you want I can copy '
u'some documents for you, '
u'if you can pay it... ;)')
self.assertEquals(sms_event_object.text, text_sms)
number = u'+34123456789'
self.assertEquals(sms_event_object.number, number)
expected_timestamp = timelib_test.CopyStringToTimestamp(
'2013-10-24 21:49:35')
self.assertEquals(event_file.timestamp, expected_timestamp)
action_type = u'GETSOLICITUDE'
self.assertEquals(event_file.action_type, action_type)
source = u'gen.beringer <Gen Beringer>'
self.assertEquals(event_file.source, source)
destination = u'european.bbq.competitor <European BBQ>'
self.assertEquals(event_file.destination, destination)
transferred_filename = u'secret-project.pdf'
self.assertEquals(event_file.transferred_filename, transferred_filename)
filepath = u'/Users/gberinger/Desktop/secret-project.pdf'
self.assertEquals(event_file.transferred_filepath, filepath)
self.assertEquals(event_file.transferred_filesize, 69986)
expected_timestamp = timelib_test.CopyStringToTimestamp(
'2013-07-30 21:27:11')
self.assertEquals(chat_event_object.timestamp, expected_timestamp)
title = u'European Competitor | need to know if you got it..'
self.assertEquals(chat_event_object.title, title)
expected_msg = u'need to know if you got it this time.'
self.assertEquals(chat_event_object.text, expected_msg)
from_account = u'Gen Beringer <gen.beringer>'
self.assertEquals(chat_event_object.from_account, from_account)
self.assertEquals(chat_event_object.to_account, u'european.bbq.competitor')
expected_timestamp = timelib_test.CopyStringToTimestamp(
'2013-10-27 15:29:19')
self.assertEquals(chat_room_event_object.timestamp, expected_timestamp)
title = u'European Competitor, Echo123'
self.assertEquals(chat_room_event_object.title, title)
expected_msg = u'He is our new employee'
self.assertEquals(chat_room_event_object.text, expected_msg)
from_account = u'European Competitor <european.bbq.competitor>'
self.assertEquals(chat_room_event_object.from_account, from_account)
to_account = u'gen.beringer, echo123'
self.assertEquals(chat_room_event_object.to_account, to_account)
expected_timestamp = timelib_test.CopyStringToTimestamp(
'2013-07-01 22:12:17')
self.assertEquals(call_event_object.timestamp, expected_timestamp)
self.assertEquals(call_event_object.dst_call, u'european.bbq.competitor')
self.assertEquals(call_event_object.src_call, u'gen.beringer')
self.assertEquals(call_event_object.user_start_call, False)
self.assertEquals(call_event_object.video_conference, False)
How will you know what the format string should look like? Well, it's time to run the code we have. Typically the plugin needs to be "compiled" before the test will be able to import it, so we need to make sure the plugin gets picked up for compilation.
Open plaso/parsers/sqlite_plugins/__init__.py. Add an import statement for your new plugin. Save the file.
Normally to run the tests you would either need to run:
python run_tests.py
Or to compile:
python setup.py build && sudo python setup.py install
And then you can run the test directly using:
python plaso/parsers/sqlite_plugins/skype_test.py
Rinse and repeat as you write the tests. If you change the parser, you need to recompile. If you just change the test, you don't.
However since we are writing this in our notebook we just need to make sure we've run all the previous code segments, and if you make changes, just re-run it.
To run the test itself, execute the below code:
In [ ]:
import unittest
my_suite = unittest.TestSuite()
my_suite.addTest(SkypePluginTest('testProcess'))
results = unittest.TextTestRunner(verbosity=3).run(my_suite)
if results.errors:
print u'Errors came up while trying to run test.'
for error in results.errors:
if isinstance(error, basestring):
print error
else:
for sub_error in error:
print sub_error
elif results.failures:
print u'Failures came up while trying to run test.'
for failure in results.failures:
if isinstance(failure, basestring):
print failure
else:
for sub_failure in failure:
print sub_failure
else:
print u'All came out clean.'
print results
If all went well you should have a fully functioning plugin by now, ready to parse every boot execut registry key you may encounter.
You can start playing around and making changes to the plugin, to see what happens when changes are introduced, or continue and create a new plugin.
Now we have gone through step-by-step how an example SQLite database plugin is created. It is time to take what we've learned so far and create a new plugin.
ATM there is no example assignement for this codelab, like there is for the other codelabs. The assignement is therefore to create a new plugin from scratch for a SQLite database of your choice.
In [ ]:
import os
if test_database_path:
os.remove(test_database_path)