Windows Registry Plugin Code Lab

Intended Audience

This lab is for people who want to learn how to write and execute a Plaso plugin in Python. This tutorial assumes:

  • You have a functional development environment
  • You have used Plaso
  • You are familiar with the Python programming language
  • You are looking to write a plugin (as an opposed to a parser, which is covered in a separate codelab)

Objective

This lab will teach you how to write a Windows Registry plugin with tests for the Plaso framework. By the end you will be able to:

  • Write a Windows Registry plugin for plaso
  • Write unit tests for the plugin
  • Run the plugin as part of plaso/log2timeline

Expectations

This lab should take you a couple hours to complete. Some of this is dependent entirely on strange build issues you might have. We are not attempting to get you to check in code yet, this is more to demonstrate how a plugin is written. For this to be a checked in plugin you need to write a plugin against a registry key/value that is not already parsed and split the code here into several files (layout explained below).

Introduction

Welcome to writing a Plaso plugin! From the outside, writing a plugin can be daunting, but once you get your dev environment going, you've fought half the battle. This code lab features a simple Windows Registry plugin, but the formula can be used for any type of plugins (another codelab will demonstrate a SQLite database plugin and the hope is that with these two codelabs we should have the plugin writing more or less covered). You may also be interested in the text parser codelab.

Before Starting

Get familiar with the developers guide and more specifically the style guide. To make the code easier to maintain we follow a style guide, partially based on the Google Python Style Guide but slightly modified to fit our needs.

We also follow a code review process that is discussed on the style guide site.

This is an iPython notebook, and if you are not familiar with it then here is the brief introduction. This is basically an iPython shell wrapped up in a pretty GUI (browser window). You can execute any Python code you wish, and quickly go back, edit and re-run code. To run the code, click the window with the code segment and press "Shift+Enter", that way you will see that the bracket on the left will change to indicate it has been executed and you may see some output below (if the code segment produced any output).

One thing to make note of is that some of the code segments depend on previous code segments having been executed. So in order for this codelab to work properly you need to execute EVERY code segment that is presented here, especially all class declarations and import statements, but to be sure just execute them all [except those explicitly stated as optional].

To make it easier to get documentation about various classes we may use in the codelab we'll start with defining a simple function to print out help (so to execute, click the code segment below and press "SHIFT+ENTER").


In [ ]:
import inspect

# Let's put this in a method so we can easily call it from other parts of the codelab.
def PrintClassHelp(class_object, filter_string=''):
  """Prints a help string for a given class object.

  Args:
    class_object: The class that we are about to inspect.
    filter_string: Filter class members that start with a particular string.
  """
  # Print the docstring of the class.
  print u''
  print class_object.__doc__
   
  # Print information for every member function.
  additional_members = []
  for member_name, member_value in inspect.getmembers(class_object):
    # Check to see if we are filtering out members starting with
    # a particular string.
    if filter_string and not member_name.startswith(filter_string):
      continue
    if inspect.ismethod(member_value):
      args = inspect.getargspec(member_value)
      doc_string = member_value.__doc__
        
      print u'{0}{1:>20s}({2}){0:>10}\n\n{3}\n{4}\n\n'.format(
          '*'*5, member_name, u','.join(args.args), doc_string, '-'*80)
    else:
      if member_name.startswith('_'):
        continue
      if member_name in ['classes', 'parent_class', 'plugin_feature', 'top_level_class']:
        continue
        
      additional_members.append(u'{} = {}'.format(member_name, repr(member_value)))

  if additional_members:
    print '\n'
    print '*** Additional Members of Class ***\n\n ',
    print u'\n  '.join(additional_members)

Disclaimer

During this codelab we will be using the iPython notebook interface for everything, which means we have all the classes and code in a single file. Once we deploy the code to the actual codebase we would need to save the code in several places, typically something like:

  • plaso/parsers/winreg_plugins/myplugin.py
  • plaso/parsers/winreg_plugins/myplugin_test.py
  • plaso/formatters/myplugin.py [this is likely not needed in the case of a Windows Registry plugin]

And make necessary changes to:

  • plaso/parsers/winreg_plugins/__init__.py
  • plaso/formatters/__init__.py [again, likely not needed]

to include the new plugin and formatter in the tool. We may also want to change the plaso/frontend/presets.py to include the plugin in a preset [this depends on the plugin itself, sometimes you want to include the parser and all its plugins, sometimes a specific plugin -- which is the case here, you don't want to specifically load one registry plugin; you want to load them all typically].

We are however omitting all these details to make the codelab easier to follow along. This can also be used for people to test their plugins and play with them without the need to mess with the codebase and once the plugin is fully functional then create the necessary files and start the code review process.

There are also a lot of comments in the code in this codelab that would typically be omitted from a released plugin. To see the actual code that is used as an example here click on one of the below links:

Writing the Plugin

We are going to write the plugin completely in this iPython notebook, and test it there too. There is no need for anything else than this notebook, a sample registry file and the plaso libs available.

Before writing a plugin, and now we are assuming we are attempting parse a particular registry key, ask yourself these questions:

  • Examine the registry key itself. What values does it have?
  • Are there any subkeys that need to be included?
  • How are the timestamps formatted? Is the only timestamp the the last time the registry key was modified or are there other timestamps embedded in the content of one of the keys/values?
  • Are there any binary values that need to be intepreted?
  • Are there any keys/values that need some decoding?
  • What does the registry's key last written time mean? What does it signify.

Remember that we are not about to submit this plugin in for review, since it is already checked in, this is only for demonstration purposes, please refer to the plaso roadmap for open parser/plugin assignments (or add your own).

Before we start looking at the code we need to download the registry hive to a temporary location so that we can use it for the remainder of this codelab. For this you need an Internet connection.


In [ ]:
# Import a library to make the HTTP connection.
import urllib2

# Import a library so that we can create a temporary file.
import tempfile

# The URL to the SYSTEM hive we are about to use for our testing.
url = u'https://github.com/log2timeline/plaso/raw/master/test_data/SYSTEM'

# Download the file.
response = urllib2.urlopen(url)
data = response.read()

# Save it in a temporary file (we don't want it to be deleted).
test_file = tempfile.NamedTemporaryFile(delete=False)

# Save the name since that is what we will refer to later in the code.
test_registry_file_name = test_file.name

# Write data to it.
test_file.write(data)

# Close the file.
test_file.close()

# Print a confirmation.
print u'File downloaded and saved'

The header

First things first, every file checked into the project needs a header. That header contains among other copyright information as well as import statements.

The first line after the copyright statement is the doc string that needs to be changed, it should not be longer than 80 characters in width. If you need more than a single line to describe the parser please still only use max 80 characters as the first line, ending with a dot. Then you can create a more detailed description two lines down (an example of that can be seen below).

The import order is defined in the style guide:

Imports are always put at the top of the file, just after any module comments and doc strings and before module globals and constants. 
Imports should be grouped with the order being most generic to least generic:

+ standard library imports
+ third-party imports
+ application-specific imports

Within each grouping, imports should be sorted lexicographically, ignoring case, according to each module's full package path.

In [ ]:
#!/usr/bin/python
#
# Copyright 2014 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Plug-in to collect the Less Frequently Used Keys.

This demonstrates how a longer line should be broken up."""

import logging

# We need to be able to create new event objects, specifically
# for a Windows registry event we would like to get access to the 
# WindowsRegistryEvent EventObject.
from plaso.events import windows_events
# Import the registry plugin interface.
from plaso.parsers.winreg_plugins import interface

The Plugin Class

We know need to know what kind of plugin you are trying to implement. The "Write A Plugin" section at the plaso documentation site does go slightly into the generic plugin interface and what needs to be done in order to write a plugin. But for now we know we are trying to parse a specific Windows Registry key using a registry plugin. If we look at the registry plugin section we notice that there are two different types of plugins available:

  • KeyPlugin: Used when we are parsing a specific Windows Registry key (path to the registry key is used).
  • ValuePlugin: When the plugin works against any Windows Registry key that has specific sets of values.

Which plugin is best for our example of a BootExecute Value from the Session Manager key?

What key is that, well it is in the current control set of the SYSTEM hive, inside the key: \CURRENT_CONTROL\Control\Session Manager. And since we do have a fixed key it makes sense to use the key based plugin to parse it. Let's look at some of the other things we need to consider.

We need to examine the registry key itself and its values. We need the file and some way of examining the content. Let's open up the registry file for future use.


In [ ]:
# Import necessary libraries from dfVFS so we can open up the file.
from dfvfs.lib import definitions
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver

# Import the Windows Registry cache library so we can calculate
# the current control set among other things.
from plaso.winreg import cache
# Import the winregistry library so we can open up the registry hive.
from plaso.winreg import winregistry


# Find the file and get a handle to it.
path_spec = path_spec_factory.Factory.NewPathSpec(
    definitions.TYPE_INDICATOR_OS, location=test_registry_file_name)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)

# Open the registry file.
registry = winregistry.WinRegistry(winregistry.WinRegistry.BACKEND_PYREGF)
winreg_file = registry.OpenFile(file_entry, codepage='cp1252')

# Build a registry cache.
# This will among others calculate the current control set.
registry_cache = cache.WinRegistryCache()
registry_cache.BuildCache(winreg_file, 'SYSTEM')

Now we've got the registry file opened for future use and we can start exploring the registry. Since the key is in the current control set we may want to know which control set that is. We already built up the registry cache, which should automatically detect that, let's see what it found.


In [ ]:
print registry_cache.attributes.get('current_control_set', 'Not found')

As you can see the registry cache detected the current control set as "ControlSet001". We can manually verify that ourselves:


In [ ]:
select_key = winreg_file.GetKeyByPath(r'\Select')
current_value = select_key.GetValue('Current')

print u'ControlSet{:03d}'.format(current_value.data)

We've manually verified the current control set, so the key that we are interested in is therefore: \ControlSet001\Control\Session Manager

Let's open that up and look at the values of the key.


In [ ]:
lfu_key = winreg_file.GetKeyByPath('\\ControlSet001\\Control\\Session Manager')

print u'Key has {} values.'.format(lfu_key.number_of_values)
print u'Key has {} subkeys.'.format(lfu_key.number_of_subkeys)

print u'Values:'
for value in lfu_key.GetValues():
    print u'  {} = {}'.format(value.name, value.data)

print u'*'*80
print u'Sub keys:'
for sub_key in lfu_key.GetSubkeys():
    print u'  {} [{} keys, {} values]'.format(sub_key.path, sub_key.number_of_values, sub_key.number_of_subkeys)

There is a value there called BootExecute, and that is the value that we are interested in, see this Windows documentation.

We don't need any subkeys for this particular registry plugin. There is no timestamp stored in the data set; the only timestamp we are interested in is the last written time of the registry key itself.

We should be ready to start writing the plugin itself, so we can start going over the actual Windows Registry plugin interface and what needs to be implemented.

One important disclaimer, since we are taking an already checked in plugin to use as an example, to avoid all namespace collitions we are appending the word "Foo" or "foo" to many of the class names and other fields.

Important Class Constants

In plaso terms as soon as we've got more than a single "parser" that attempts to parse a particular file format we convert that to a plugin system. Then a very generic parser can be created that takes care of all file format parsing, leaving the plugins to do minimal work, just defining few class constants that are used to match the particular file or file segments to what the plugin is designed to parse and then a function to process the data collected.

For Windows Registry KeyPlugin we need to define the following class attributes:

  • NAME: Name of the plugin, this should be short and concise but still descriptive. Naming convention for Windows Registry plugins is "winreg_PLUGIN".
  • REG_KEYS: This is a list of all registry keys this plugin is designed to parse. The plugin does not need to define software redirects (like Wow6432, that's done automatically).
  • REG_TYPE: Defining which registry hive this plugin is written for, values are NTUSER, SAM, SOFTWARE, SYSTEM, etc. If the plugin should work against every registry hive, then it should have the value of 'any'.
  • URLS: This is a list of URLs that can be used to read additional information about this particular registry key. This could be a link to Technet or some blogs discussing how to interpret the values (or both).

For our plugin's purposes we have these values:

NAME = 'winreg_boot_execute'

REG_KEYS = [u'\\{current_control_set}\\Control\\Session Manager']
REG_TYPE = 'SYSTEM'
URLS = ['http://technet.microsoft.com/en-us/library/cc963230.aspx']

One thing to note is the fact that this registry plugin depends on the current control set. This is a key that may differ between registry hives. If you remember the registry cache code that we executed in one of the previous steps... that registry cache actually detected the current control set, and it saved it as "current_control_set". This is one of the attributes we can use (just put the attribute name inside curly brackets{} ).

Let's look at other attributes calculated by the registry cache object.


In [ ]:
for class_name, cache_class in cache.WinRegCachePlugin.classes.items():
    print '*'*80
    print class_name
    print '*'*80
    print PrintClassHelp(cache_class)
    print '-'*80

GetEntries

The underlying parser takes care of opening up the registry hive, reading in every registry key there is and then comparing that key to the available registry plugins. The parser uses the REG_KEYS list value to determine if a registry plugin should be used against a particular key. Once a plugin is found the GetEntries() function of the plugin is called. The purpose of this function is to extract all potential event objects available from that registry key and yield them.

There may be several objects passed to the GetEntries function, objects that depend upon each plugin implementation. One of which is a general purpose plugin cache that is not used in the Windows registry plugins. Some plugins may define a cache object that is passed to them via this function.

Let's examine the Windows Registry class plugin implementation:


In [ ]:
PrintClassHelp(interface.RegistryPlugin, 'GetEntries')

By convention, a method named "GetEntries" is defined by each for each plugin implementation (Registry, SQLite, Bencode etc.) as the method called to invoke a plugin against a particular file. To explain the arguments in a little more detail:

The parser_context object is an object passed to each plugin (and parser) that provides access to shared Plaso functionality, like the event output queue, system knowledge base and error signalling functionality. This part of Plaso is undergoing extensive development at the moment, and more features will be added in near future.

The object called "key", is the registry key the plugin should attempt to parse, the file_entry object is a reference to the registry file that contains the key being parsed, the codepage is the ASCII codepage that should be used to interpret the key data, and finally we need to catch other attributes that may be sent to the plugin but we are not using, thus the **unused_kwargs.

The proper definition of the GetEntries function for a Windows registry plugin is therefore:

def GetEntries(self, parser_context, key=None, registry_type=None, **unused_kwargs):

In [ ]:
from plaso.parsers import plugins

PrintClassHelp(plugins.BasePlugin, 'GetEntries')

Timestamp

When dealing with the Windows Registry the most common timestamp is the Windows FILETIME timestamp. To translate that into the internal microseconds since Epoch that Plaso uses we use the timelib library, and more specifically the timelib.Timestamp.FromFiletime(file_time).

However, there may be other timestamps embedded in a key value. Let's examine what other options we've got.


In [ ]:
# Import the library we are about to inspect.
from plaso.lib import timelib

# You can easily change the name of the class here if you want to explore a different
# class and it's members.
PrintClassHelp(timelib.Timestamp)

Event Object - WindowsRegistryEvent

Each timestamped event is described as an EventObject. It's almost always easier to create a convenience class to make it easier to create an EventObject specific to the events that your plugin or parser is producing. However since we are writing a Windows Registry plugin we already have a convenience EventObject that we can take advantage of, called WindowsRegistryEvent.

Let's examine the WindowsRegistryEvent:


In [ ]:
from plaso.events import windows_events

PrintClassHelp(windows_events.WindowsRegistryEvent, '__init__')

The event object is pretty simple really. We need to set the following arguments:

  • key_name: The full path to the registry key, stored in the reg_key.path value.
  • value_dict: This is a dict object that contains all the text that we would like to be contained in the message string. By default the formatter prints this out as "key_1 = value_1, key_2 = value_2, etc" however this can be easily changed so the registry event is presented differently.
  • timestamp: The timestamp, either of the registry key itself or extracted from the value itself.
  • usage: This is the description of the meaning of the timestamp, by default this is LAST_WRITTEN.
  • offset: An offset in bytes into the registry file where this value is, typically this value does not need to be filled in.
  • registry_type: The type of the registry file that the event was extracted from. Almost always, the right thing to do is to parser along the registry_type that GetEntries() was called with, as described above.
  • urls: A list of URLs that provide more information or context about the event.
  • source_append: By default the source attribute is "REGISTRY_TYPE key", eg: "SOFTWARE key". However some plugins may want to add additional information to the source, further describing it, eg: "NTUSER key UserAssist" or something like that. This value is designed for that.

Typically a plugin only needs to worry about filling in these values:

  • key
  • value_dict
  • timestamp

And that's it really, we only need to worry about filling in the values for the class constants and then fill in the GetEntries function.

Here is the entire code for the BootExecutePlugin:


In [ ]:
class BootExecutePluginFoo(interface.KeyPlugin):
  """Plug-in to collect the BootExecute Value from the Session Manager key."""

  # Change the name to "foo" to distinguish it from the checked in version.
  NAME = 'winreg_boot_execute_foo'
  DESCRIPTION = u'Parser for Boot Execution Registry data.'

  # Define the path to the registry key (notice the use of the current_control_set).
  # Here we define a list of all the registry keys this plugin supports, since registry plugins
  # sometimes support more than a single key. In this particular case we are only interested in
  # a single key, so we define a single entry in the list.
  REG_KEYS = [u'\\{current_control_set}\\Control\\Session Manager']
  # REG_KEYS = [u'\\ControlSet001\\Control\\Session Manager']
    
  # This is only designed to parse a registry key from the SYSTEM hive.
  REG_TYPE = 'SYSTEM'
    
  URLS = ['http://technet.microsoft.com/en-us/library/cc963230.aspx']

  def GetEntries(
      self, parser_context, file_entry=None, key=None, registry_type=None,
      **unused_kwargs):
    """Gather the BootExecute Value, compare to default, return event. 
    
    The rest of the values in the Session Manager key are in a separate event.

    Args:
      parser_context: A parser context object (instance of ParserContext).
      file_entry: optional file entry object (instance of dfvfs.FileEntry).
                  The default is None.
      key: Optional Registry key (instance of winreg.WinRegKey).
           The default is None.
      registry_type: Optional Registry type string. The default is None.
    """
    text_dict = {}
    for value in key.GetValues():
      if value.name == 'BootExecute':
        # MSDN: claims that the data type of this value is REG_BINARY
        # although REG_MULTI_SZ has been observed to be used as well.
        if value.DataIsString():
          value_string = value.data
        elif value.DataIsMultiString():
          value_string = u''.join(value.data)
        elif value.DataIsBinaryData():
          value_string = value.data
        else:
          value_string = u''
          error_string = (
              u'Key: {0:s}, value: {1:s}: unuspported value data type: '
              u'{2:s}.').format(key.path, value.name, value.data_type_string)
          # Output an error for Plaso to store.
          parser_context.ProduceParseError(
              self.NAME, error_string, file_entry=file_entry)

        value_dict = {'BootExecute': value_string}
        event_object = windows_events.WindowsRegistryEvent(
            key.last_written_timestamp, key.path, value_dict, offset=key.offset,
            registry_type=registry_type, urls=self.URLS)
        parser_context.ProduceEvent(event_object, plugin_name=self.NAME)

      else:
        text_dict[value.name] = value.data
    
    event_object = windows_events.WindowsRegistryEvent(
        key.last_written_timestamp, key.path, text_dict, offset=key.offset,
        registry_type=registry_type, urls=self.URLS)
    parser_context.ProduceEvent(event_object, plugin_name=self.NAME)
  
from plaso.parsers import winreg
winreg.WinRegistryParser.RegisterPlugin(BootExecutePluginFoo)

[optional code segment] If you make some modifications to the class above you need to first de-register it before you can register it again. To be able to de-register it use the code block below:


In [ ]:
# OPTIONAL CODE BLOCK !! DON'T EXECUTE UNLESS YOU HAVE MADE SOME 
# CHANGES TO THE PARSER CODE AND WANT TO REGISTER IT AGAIN!!!
winreg.WinRegistryParser.DeregisterPlugin(BootExecutePluginFoo)

Writing the Formatter

Have you ever noticed the message string when you print out an event?


In [ ]:
import datetime

from plaso.formatters import manager as formatters_manager

# Import the Windows Registry formatter.
from plaso.formatters import winreg

datetime_now = datetime.datetime.utcnow()

# Let's create a dummy event.
demo_event = windows_events.WindowsRegistryEvent(
    timelib.Timestamp.FromPythonDatetime(datetime_now),
    '\\dummy\\path\\key',
    {'foo': 'bar', 'stuff': 'more stuff'})

# And print the string.
print demo_event.GetString()

# And to re-iterate, let's print the message string.
message_string, _ = formatters_manager.EventFormatterManager.GetMessageStrings(demo_event)

print u'MESSAGE STRING: {}'.format(message_string)

Did you notice that in the event above you never really told it how to construct this message string? How does the tool then know how to print it out?

That is the purpose of the formatter. The formatter is a simple class that defines what attributes to use and how they are put together to form this message string.

You're going to need one for any parser or plugin you create (or more importantly any data type that you might use).

The way the formatter works is that it looks at the data_type attribute in the EventObject and matches that to the formatters DATA_TYPE attribute. If they are the same, then the formatter proceeds to processing that EventObject and construct the messsage string.

Formatters go in separate files under plaso/formatters. Don't forget the copyright at the top!

For the most part, you're just setting some values with formats. You'll want to set up structures that you want to see in your timeline.

Most importantly (to re-iterate), the DATA_TYPE must match the data_type attribute from the EventObject from the last section. Watch out for typos here -- there is no warning.

There are two formatters that you can use, the simple EventFormatter and the ConditionalEventFormatter. The former should only be used if you are absolutely sure all the attributes mentioned there are going to be set for each and every event object created. That means that for the vast majority of the formatters the ConditionalEventFormatter should be the formatter of choice.

There are two class constants that should always be set, irrelevant of the choice of formatters:

  • SOURCE_SHORT: This should match one of the common sources, eg. LOG, WEBHIST, etc. This should closely correspond to the TLN format by H. Carvey as a short description of the source, almost like a short name for the category of the source.
  • SOURCE_LONG: Since the category itself is not sufficient to describe the source we have an extra field called SOURCE_LONG that further defines that, for instance a browser history extracted from Chrome browser will have the source short set to WEBHIST, indicating that this comes from a web history, but the SOURCE_LONG contains the text "Chrome History", setting that apart from other browsers.

For the simple EventFormatter two class constants have to be set (or at least one):

  • FORMAT_STRING: An unicode string that contains formatting information, place all attribute names in {}. This is just a typical Python formatting string, so all typical rules apply. Timestamp, filename/path, username, hostname, etc information is presented in other fields and should not be a part of the message string.
  • FORMAT_STRING_SHORT: This is only needed when you think that the resulting message string may exceed 80 characters in with and you don't want that to be shorten, as in you don't want the short message string to just contain the first 77 characters of the longer version you can construct your own condensed one.

If you use the conditional formatter you need to define the following class constants:

  • FORMAT_STRING_PIECES: The same as the FORMAT_STRING, except that this is a list and only one attribute name should be defined per entry. If an attribute is not set in the event object then that particular entry in the list will be omitted.
  • FORMAT_STRING_SHORT_PIECES: Same as the FORMAT_STRING_SHORT except in the same format as FORMAT_STRING_PIECES, that is as a list.

HOWEVER for our purposes we don't need to write a formatter since we are using an already existing event object, the WindowsRegistryEvent, and for that event object there is an already existing event formatter.


In [ ]:
from plaso.formatters import winreg as winreg_formatter

print PrintClassHelp(winreg_formatter.WinRegistryGenericFormatter)

Test The Plugin

It is very important to test the plugin, to see if it can at least handle our sample dataset.

Parse the Key Using The Plugin

We can use the code below to test our parsing, to see if the plugin is capable of parsing registry key we provided it with.


In [ ]:
from plaso.artifacts import knowledge_base
from plaso.lib import errors
from plaso.lib import queue
from plaso.parsers import context
from plaso.parsers import test_lib


# Create the plugin object, using the registry cache
# we made earlier.
test_plugin = BootExecutePluginFoo(reg_cache=registry_cache)

# Create a parser context object to handle key input/event output.
event_queue = queue.SingleThreadedQueue()
event_queue_producer = queue.EventObjectQueueProducer(event_queue)
parse_error_queue = queue.SingleThreadedQueue()
parse_error_queue_producer = queue.ParseErrorQueueProducer(
        parse_error_queue)
knowledge_base_object = knowledge_base.KnowledgeBase()
context_obj = context.ParserContext(
        event_queue_producer, parse_error_queue_producer,
        knowledge_base_object)

print u'Parsing key using: {}'.format(test_plugin.plugin_name)

# We already have the registry key extracted from a previous step.
# We can just pass that to the plugin.
print lfu_key.path
test_plugin.Process(parser_context=context_obj, key=lfu_key)

# Set up a consumer to read events emited by our plugin.
event_queue_consumer = test_lib.TestEventObjectQueueConsumer(event_queue)
# Read all the events.
event_queue_consumer.ConsumeEventObjects()
event_objects = event_queue_consumer.event_objects

print u'Processing of registry key is done.'
print u'Able to extract: {} events from the key.'.format(len(event_objects))

Let's print out the event objects that we managed to extract from this plugin.


In [ ]:
# Print out the content of the extracted events.
for index, event_object in enumerate(event_objects):
  print u'*' * 80
  print u'    EVENT NUMBER: {}'.format(index)
  print u'-'*80
  print u'Event:'
  print event_object.GetString()
  print u''

Writing the Tests

Unit tests are designed to make sure your code is doing what you intended it to do, as well as to let other people know when their refactor broke your code. This will also assist you when you are writing your code by doing a sanity check on your parser to make sure it works the way you expect it to.

The test go in their own file, in this case the file plaso/winreg_plugins/lfu_test.py.

For the tests to work the formatter needs to imported. However since the formatter is typically named the same name as the actual parser (not in the case of a Windows registry plugin) we may need to import the formatter as a separate name. And since you don't actually use the formatter directly in the file you end up with needing a pylint statement to suppress error messages during linting. The other imports you'll see through out this code lab. But since everything is in the same namespace here we don't really need to import the formatter, but this is typically added:

# pylint: disable-msg=unused-import
from plaso.formatters import winreg as winreg_formatter

The pylint statement needs to be there to make sure that pylint does not complain about an unused import since we are not directly using the formatter, we are just importing it so that it gets registered (othwerise it will not work).

TestCase and setUp()

For a plugin test we will use the appropriate plugin test library, in this case the plaso/winreg_plugins/test_lib (or test_lib.RegistryPluginTestCase). This is a simple class that inherits from the plaso parser test lib (which in turn inherits from the unittest.TestCase class), and adds a few functions to make it easier to test Windows registry plugins. You may want to add a setUp() function to open the sample file and set any other variables you expect in the background, like pre-processor results. Preprocess will discover system variables such as Timezone and CurrentControlSet. You can create a local preprocess object to mimic what you expect in a global one.

For a Windows Registry plugin you typically need to open the registry hive using self._GetTestFilePath, define the registry key and use the self._GetKeyFromFile to get the key.

In the lfu_test we are however working with a mock registry key, so in that case the registry cache needs to be pre-built and the key created.

Let's first look at what functions are available to us in the registry test library.


In [ ]:
from plaso.parsers.winreg_plugins import test_lib

PrintClassHelp(test_lib.RegistryPluginTestCase)

Write the setUp() function for this class definition:

class TestBootExecutePlugin(test_lib.RegistryPluginTestCase):
  """Tests for the LFU BootExecute Windows Registry plugin."""

  def setUp(self):

While we're setting up the boilerplate of the test, let's add the main function to the bottom of the file. Then we can run the test on its own.

if __name__ == '__main__':
 unittest.main()

Writing the Test

The outline of the main test is to create and run the plugin, then check that the plugin results are correct. You should check a variety of attributes in one row and something about the extracted events in general.

The test needs to start with the word "test". Let's use testProcess() (since that is what we are testing, the Process function fo the plugin). The assertions should include:

  • How many entries were created?
  • For entry[1], is the timestame, username, and full_path correct?
  • For entry[1], are the message strings formatted correctly?

In [ ]:
# from plaso.lib import eventdata
from plaso.winreg import test_lib as winreg_test_lib


class TestBootExecutePluginFoo(test_lib.RegistryPluginTestCase):
  """Tests for the LFU BootExecute Windows Registry plugin."""

  def setUp(self):
    """Sets up the needed objects used throughout the test."""
    registry_cache = cache.WinRegistryCache()
    registry_cache.attributes['current_control_set'] = 'ControlSet001'
    # This would have to be changed to reflect the actual plugin, but since we
    # are in the same namespace we can do this.
    self._plugin = BootExecutePluginFoo(reg_cache=registry_cache)

  def testProcess(self):
    """Tests the Process function."""
    # In tests we put in the full path (as in not use the current_control_set
    # attribute).
    key_path = u'\\ControlSet001\\Control\\Session Manager'
    values = []

    # Here we are creating a test key, a fake registry key instead of reading
    # one from an actual registry file (which is preferred).
    values.append(winreg_test_lib.TestRegValue(
        'BootExecute', 'autocheck autochk *\x00'.encode('utf_16_le'), 7, 123))
    values.append(winreg_test_lib.TestRegValue(
        'CriticalSectionTimeout', '2592000'.encode('utf_16_le'), 1, 153))
    values.append(winreg_test_lib.TestRegValue(
        'ExcludeFromKnownDlls', '\x00'.encode('utf_16_le'), 7, 163))
    values.append(winreg_test_lib.TestRegValue(
        'GlobalFlag', '0'.encode('utf_16_le'), 1, 173))
    values.append(winreg_test_lib.TestRegValue(
        'HeapDeCommitFreeBlockThreshold', '0'.encode('utf_16_le'), 1, 183))
    values.append(winreg_test_lib.TestRegValue(
        'HeapDeCommitTotalFreeThreshold', '0'.encode('utf_16_le') , 1, 203))
    values.append(winreg_test_lib.TestRegValue(
        'HeapSegmentCommit', '0'.encode('utf_16_le'), 1, 213))
    values.append(winreg_test_lib.TestRegValue(
        'HeapSegmentReserve', '0'.encode('utf_16_le'), 1, 223))
    values.append(winreg_test_lib.TestRegValue(
        'NumberOfInitialSessions', '2'.encode('utf_16_le'), 1, 243))

    winreg_key = winreg_test_lib.TestRegKey(
        key_path, 1346445929000000, values, 153)

    event_queue_consumer = self._ParseKeyWithPlugin(self._plugin, winreg_key)
    event_objects = self._GetEventObjectsFromQueue(event_queue_consumer)

    self.assertEquals(len(event_objects), 2)

    event_object = event_objects[0]

    # Timestamp is: Fri, 31 Aug 2012 20:45:29 GMT
    self.assertEquals(event_object.timestamp, 1346445929000000)

    expected_string = (
        u'[{0:s}] BootExecute: autocheck autochk *').format(key_path)

    self._TestGetMessageStrings(event_object, expected_string, expected_string)

    event_object = event_objects[1]

    expected_msg = (
        u'[{0:s}] '
        u'CriticalSectionTimeout: 2592000 '
        u'ExcludeFromKnownDlls: [] '
        u'GlobalFlag: 0 '
        u'HeapDeCommitFreeBlockThreshold: 0 '
        u'HeapDeCommitTotalFreeThreshold: 0 '
        u'HeapSegmentCommit: 0 '
        u'HeapSegmentReserve: 0 '
        u'NumberOfInitialSessions: 2').format(key_path)

    expected_msg_short = (
        u'[{0:s}] CriticalSectionTimeout: 2592000 Excl...').format(key_path)

    self._TestGetMessageStrings(event_object, expected_msg, expected_msg_short)

Running the Test

How will you know what the format string should look like? Well, it's time to run the code we have. Typically the plugin needs to be "compiled" before the test will be able to import it, so we need to make sure the plugin gets picked up for compilation.

Open plaso/parsers/winregplugins/\_init__.py. Add an import statement for your new plugin. Save the file.

Normally to run the tests you would either need to run:

python run_tests.py

Or to compile:

python setup.py build && sudo python setup.py install

And then you can run the test directly using:

python plaso/parsers/winreg_plugins/lfu_test.py

Rinse and repeat as you write the tests. If you change the parser, you need to recompile. If you just change the test, you don't.

However since we are writing this in our notebook we just need to make sure we've run all the previous code segments, and if you make changes, just re-run it.

To run the test itself, execute the below code:


In [ ]:
import unittest
my_suite = unittest.TestSuite()
my_suite.addTest(TestBootExecutePluginFoo('testProcess'))

results = unittest.TextTestRunner(verbosity=3).run(my_suite)

if results.errors:
  print u'Errors came up while trying to run test.'
  for error in results.errors:
    if isinstance(error, basestring):
      print error
    else:
      for sub_error in error:
        print sub_error
elif results.failures:
  print u'Failures came up while trying to run test.'
  for failure in results.failures:
    if isinstance(failure, basestring):
      print failure
    else:
      for sub_failure in failure:
        print sub_failure
else:
  print u'All came out clean.'
  print results

If all went well you should have a fully functioning plugin by now, ready to parse every boot execut registry key you may encounter.

You can start playing around and making changes to the plugin, to see what happens when changes are introduced, or continue and create a new plugin.

The Assignment

Now we have gone through step-by-step how a simple Windows Registry plugin is created. It is time to take what we've learned so far and create a new plugin.

The remainder of the codelab revolves around writing a registry plugin that parses a simple registry key. Write the plugin and the unittest to successfully parse it.

Let's start with a simple registry key from our test registry hive.


In [ ]:
mystery_key_path = '\\ControlSet001\\Control\\ProductOptions'
mystery_key = winreg_file.GetKeyByPath(mystery_key_path)

print mystery_key

Fill in all the details in the parser so that it successfully parses this file.


In [ ]:
class MysteryPlugin(interface.KeyPlugin):
  """Plug-in to collect something from something."""

  # Change the name to something more descriptive.
  NAME = 'mystery_plugin'

  # Change the description field.
  DESCRIPTION = 'describe me'

  # Define the path to the registry key (notice the use of the current_control_set).
  REG_KEYS = [u'\\{current_control_set}\\Control\\ProductOptions']
    
  # This is only designed to parse a registry key from the SYSTEM hive.
  REG_TYPE = 'SYSTEM'
    
  # Fill in here.
  URLS = []

  def GetEntries(
      self, parser_context, file_entry=None, key=None, registry_type=None,
      **unused_kwargs):
    """Gather data from our plugin, please rewrite me."""
    
from plaso.parsers import winreg
winreg.WinRegistryParser.RegisterPlugin(MysteryPlugin)

[optional code segment] Remember if you make changes to the parser you need to de-register it before you run the code segment again (to register it):


In [ ]:
# OPTIONAL DO NOT EXECUTE UNLESS YOU'VE MADE CHANGES TO THE PARSER CODE ABOVE AND
# NEED TO REGISTER THOSE CHANGES!

winreg.WinRegistryParser.DeregisterPlugin(MysteryPlugin)

And finally we need to create a unit test to make sure we are parsing the key properly.


In [ ]:
class MysteryPluginTest(test_lib.RegistryPluginTestCase):
  """Tests for the mysterious registry key."""

  def setUp(self):
    """Sets up the needed objects used throughout the test."""
    # Once we would actually commit the code in we would use a real file
    # that is inside our test directory. But for now we call the temporary
    # file that we defined earlier.
    #test_file = self._GetTestFilePath(['SYSTEM'])
    self.test_file = test_registry_file_name
        
    registry_cache = cache.WinRegistryCache()
    registry_cache.attributes['current_control_set'] = 'ControlSet001'
    
    self._plugin = MysteryPlugin(reg_cache=registry_cache)

  def testProcess(self):
    """Tests the Process function and change me, pretty please."""
    # Put in the actual path, test files do not expand the key using attributes.
    key_path = '\\ControlSet001\\Control\\ProductOptions'
    winreg_key = self._GetKeyFromFile(self.test_file, key_path)

    event_queue_consumer = self._ParseKeyWithPlugin(self._plugin, winreg_key)
    event_objects = self._GetEventObjectsFromQueue(event_queue_consumer)

    # At bare minimum we need to test that the parser successfully parsed
    # all the lines.
    self.assertEquals(len(event_objects), 1)

    # Let's read in this event object and verify some of it's fields.
    event_object = event_objects[0]

    # Make sure the timestamp is the correct one.
    # date -u -d "2011-09-17T13:43:39.129177" +"%s%N" / 1000
    self.assertEquals(event_object.timestamp, 1316267019129177)
    # Add here some tests to make sure we are parsing/extracting attribute names.

    # And now we need to test our formatter, create a message string and test it.
    expected_msg = u'This is a message string.'
    expected_msg_short = u'The short version.'

    self._TestGetMessageStrings(event_object, expected_msg, expected_msg_short)

And run these tests to make sure we have everything covered.


In [ ]:
my_suite = unittest.TestSuite()
my_suite.addTest(MysteryPluginTest('testProcess'))

results = unittest.TextTestRunner(verbosity=3).run(my_suite)

if results.errors:
  print u'Errors came up while trying to run test.'
  for error in results.errors:
    if isinstance(error, basestring):
      print error
    else:
      for sub_error in error:
        print sub_error
elif results.failures:
  print u'Failures came up while trying to run test.'
  for failure in results.failures:
    if isinstance(failure, basestring):
      print failure
    else:
      for sub_failure in failure:
        print sub_failure
else:
  print u'All came out clean.'
  print results

And remember there may be some interesting code segments that were introduced in the overview of the boot execute plugin that might help you during this assignment (just change the appropriate calls to the boot execute plugin to the newly created one and execute them again).

And you can also start playing a bit with this registry hive to find other interesting keys (since the key in the assignment is not really that interesting).

This code segment below is necessary to set up our "exploration" environment.


In [ ]:
# Import the libraries from preg, so we can start exploring the registry.
from plaso.frontend import preg

# Open up the registry hive.
print test_registry_file_name
preg.OpenHive(test_registry_file_name, None)

# Get access to the iPython shell we are in, so we can start modifying it a bit.
shell = get_ipython()

# Adding "magic" functions.
shell.register_magics(preg.MyMagics)

# Registering command completion for the magic commands.
shell.set_hook('complete_command', preg.CdCompleter, str_key='%cd')
shell.set_hook('complete_command', preg.VerboseCompleter, str_key='%ls')
shell.set_hook('complete_command', preg.VerboseCompleter, str_key='%parse')
shell.set_hook('complete_command', preg.PluginCompleter, str_key='%plugin')

And now we can start exploring this registry file.

Try commands like:

  • cd KEY
  • ls [-v]
  • parse [-v]
  • plugin PLUGIN_NAME

Tab completion for key names should work (please try it out and play with it). Which means you can type something like:

cd C<TAB>

And the tab completion will try to complete that registry key.

Try to begin with something like the code segment below, and then change it at will (the % in front of commands is to specify these are "magic" commands, it is not necessary to have there, but it's kept just in case since some setups of ipython/notebook don't seem to like not having it there)


In [ ]:
%ls
%cd \ControlSet001\Enum\USBSTOR
%ls
%cd Disk&Ven_HP&Prod_v100w&Rev_1024\AA951D0000007252&0
%ls -v
%parse
%pwd

Clean Up

During our test code we created a temporary file, that we may want to delete. To delete it, use the code below:


In [ ]:
import os

if test_registry_file_name:
  os.remove(test_registry_file_name)