AlienVault USM event visualizer

  • Search
  • Graphistry helper
  • Plot

Plug in your credentials in the first cells, put in a custom search, then plot.


In [ ]:
!pip install requests_oauthlib -q

In [ ]:
from unimatrix import UnimatrixClient
import graphistry

#graphistry.register(key='MY_API_KEY', server='labs.graphistry.com')
client = UnimatrixClient("my_subdomain", "client_id", "client_secret")

In [ ]:
params = {'plugin': 'AlienVault Agent - Windows EventLog', 'event_name':'Network connection'}
events_df = client.get_events_df(num_items=25000, params=params)

print('# rows', len(events_df))
events_df.sample(3)

Helper


In [ ]:
def graph(events_df):
    red_events = events_df
    red_events = red_events.assign(source_process= red_events['source_process'].apply(lambda v: v.split('\\')[-1]),
                                   destination_organisation=red_events.destination_organisation.fillna('INTERNAL'))
    red_events = red_events.assign(source_process_id=red_events.apply(lambda row: str(row['source_process_id']) + '::' + str(row['source_canonical']), axis=1)).reset_index()

    edges = {
        'source_username': ['source_canonical', 'source_process_id', 'source_process_id'],
        'source_process_id': ['source_process', 'iocs', 'event_category', 'watchlist'],
        'source_canonical': ['source_address', 'source_process', 'source_name', 'source_process_id', 
                           'destination_name', 'destination_canonical', 'destination_address'],
        'destination_canonical': ['destination_name', 'destination_address']
    }

    node_types = list(edges.keys())
    for dsts in edges.values():
        node_types.extend(dsts)
    node_types = list(set(node_types))    

    g = graphistry.hypergraph(
        red_events.astype(str), 
        entity_types=node_types, 
        direct=True,
        opts={
            'EDGES': edges,
            'CATEGORIES': {
                'asset': ['source_canonical', 'source_address', 'source_canonical', 'destination_name', 'destination_canonical', 'destination_address' ]
            }
        })['graph']

    types = list(g._nodes['category'].unique())
    type_to_color = {t: types.index(t) % 10 for t in types}

    events = list(g._edges['event_category'].unique())
    event_to_color = {t: events.index(t) % 10 for t in events}

    g = g.nodes(g._nodes.assign(p_color=g._nodes['category'].apply(lambda t: type_to_color[t]))).bind(point_color='p_color')
    g = g.edges(g._edges[~(g._edges.dst.str.match('.*::nan') | g._edges.src.str.match('.*::nan'))])
    g = g.edges(g._edges.assign(e_color=g._edges['event_category'].apply(lambda t: event_to_color[t]))).bind(edge_color='e_color')

    return g

Visualize


In [ ]:
graph(events_df).plot()

In [ ]: