In [1]:
# Delete this cell to re-enable tracebacks
import sys
ipython = get_ipython()

def hide_traceback(exc_tuple=None, filename=None, tb_offset=None,
                   exception_only=False, running_compiled_code=False):
    etype, value, tb = sys.exc_info()
    value.__cause__ = None  # suppress chained exceptions
    return ipython._showtraceback(etype, value, ipython.InteractiveTB.get_exception_only(etype, value))

#ipython.showtraceback = hide_traceback

In [2]:
# JSON output syntax highlighting
from __future__ import print_function
from pygments import highlight
from pygments.lexers import JsonLexer, TextLexer
from pygments.formatters import HtmlFormatter
from IPython.display import display, HTML
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

def json_print(inpt):
    string = str(inpt)
    formatter = HtmlFormatter()
    if string[0] == '{':
        lexer = JsonLexer()
    else:
        lexer = TextLexer()
    return HTML('<style type="text/css">{}</style>{}'.format(
                formatter.get_style_defs('.highlight'),
                highlight(string, lexer, formatter)))

globals()['print'] = json_print

TAXIICollection

The TAXIICollection suite contains TAXIICollectionStore, TAXIICollectionSource, and TAXIICollectionSink. TAXIICollectionStore pushes and retrieves STIX content to local/remote TAXII Collection(s). TAXIICollectionSource retrieves STIX content from local/remote TAXII Collection(s). TAXIICollectionSink pushes STIX content to local/remote TAXII Collection(s). Each of the interfaces is designed to be bound to a Collection from the taxii2client library (taxii2client.Collection), where all TAXIICollection API calls will be executed through that Collection instance.

A note on TAXII2 searching/filtering of STIX content: TAXII2 server implementations natively support searching on the STIX2 object properties: id, type and version; API requests made to TAXII2 can contain filter arguments for those 3 properties. However, the TAXIICollection suite supports searching on all STIX2 common object properties (see Filters documentation for full listing). This works simply by augmenting the filtering that is done remotely at the TAXII2 server instance. TAXIICollection will seperate any supplied queries into TAXII supported filters and non-supported filters. During a TAXIICollection API call, TAXII2 supported filters get inserted into the TAXII2 server request (to be evaluated at the server). The rest of the filters are kept locally and then applied to the STIX2 content that is returned from the TAXII2 server, before being returned from the TAXIICollection API call.

TAXIICollection API

TAXIICollection Examples

TAXIICollectionSource


In [3]:
from stix2 import TAXIICollectionSource
from taxii2client import Collection

# establish TAXII2 Collection instance
collection = Collection("http://127.0.0.1:5000/trustgroup1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/", user="admin", password="Password0")
# supply the TAXII2 collection to TAXIICollection
tc_source = TAXIICollectionSource(collection)

#retrieve STIX objects by id
stix_obj = tc_source.get("malware--c0931cc6-c75e-47e5-9036-78fabc95d4ec")
stix_obj_versions = tc_source.all_versions("indicator--6770298f-0fd8-471a-ab8c-1c658a46574e")

#for visual purposes
print(stix_obj)
print("-------")
for so in stix_obj_versions:
    print(so)


Out[3]:
{
    "type": "malware",
    "spec_version": "2.1",
    "id": "malware--c0931cc6-c75e-47e5-9036-78fabc95d4ec",
    "created": "2017-01-27T13:49:53.997Z",
    "modified": "2017-01-27T13:49:53.997Z",
    "name": "Poison Ivy",
    "description": "Poison Ivy",
    "malware_types": [
        "remote-access-trojan"
    ],
    "is_family": true
}
Out[3]:
-------
Out[3]:
{
    "type": "indicator",
    "spec_version": "2.1",
    "id": "indicator--6770298f-0fd8-471a-ab8c-1c658a46574e",
    "created": "2016-11-03T12:30:59.000Z",
    "modified": "2016-11-03T12:30:59.000Z",
    "name": "Malicious site hosting downloader",
    "description": "Accessing this url will infect your machine with malware.",
    "indicator_types": [
        "url-watchlist"
    ],
    "pattern": "[url:value = 'http://z4z10farb.cn/4712']",
    "pattern_type": "stix",
    "pattern_version": "2.1",
    "valid_from": "2017-01-27T13:49:53.935382Z"
}
Out[3]:
{
    "type": "indicator",
    "spec_version": "2.1",
    "id": "indicator--6770298f-0fd8-471a-ab8c-1c658a46574e",
    "created": "2016-11-03T12:30:59.000Z",
    "modified": "2016-12-25T12:30:59.444Z",
    "name": "Malicious site hosting downloader",
    "description": "Accessing this url will infect your machine with malware. Updated indicator",
    "indicator_types": [
        "url-watchlist"
    ],
    "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
    "pattern_type": "stix",
    "pattern_version": "2.1",
    "valid_from": "2017-01-27T13:49:53.935382Z"
}
Out[3]:
{
    "type": "indicator",
    "spec_version": "2.1",
    "id": "indicator--6770298f-0fd8-471a-ab8c-1c658a46574e",
    "created": "2016-11-03T12:30:59.000Z",
    "modified": "2017-01-27T13:49:53.935Z",
    "name": "Malicious site hosting downloader",
    "description": "Accessing this url will infect your machine with malware. This is the last updated indicator",
    "indicator_types": [
        "url-watchlist"
    ],
    "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
    "pattern_type": "stix",
    "pattern_version": "2.1",
    "valid_from": "2016-11-03T12:30:59Z"
}

In [4]:
from stix2 import Filter

# retrieve multiple object from TAXIICollectionSource
# by using filters
f1 = Filter("type","=", "indicator")

indicators = tc_source.query([f1])

#for visual purposes
for indicator in indicators:
    print(indicator)


Out[4]:
{
    "type": "indicator",
    "spec_version": "2.1",
    "id": "indicator--cd981c25-8042-4166-8945-51178443bdac",
    "created": "2014-05-08T09:00:00.000Z",
    "modified": "2014-05-08T09:00:00.000Z",
    "name": "File hash for Poison Ivy variant",
    "indicator_types": [
        "file-hash-watchlist"
    ],
    "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
    "pattern_type": "stix",
    "pattern_version": "2.1",
    "valid_from": "2014-05-08T09:00:00Z"
}
Out[4]:
{
    "type": "indicator",
    "spec_version": "2.1",
    "id": "indicator--6770298f-0fd8-471a-ab8c-1c658a46574e",
    "created": "2016-11-03T12:30:59.000Z",
    "modified": "2017-01-27T13:49:53.935Z",
    "name": "Malicious site hosting downloader",
    "description": "Accessing this url will infect your machine with malware. This is the last updated indicator",
    "indicator_types": [
        "url-watchlist"
    ],
    "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
    "pattern_type": "stix",
    "pattern_version": "2.1",
    "valid_from": "2016-11-03T12:30:59Z"
}
Out[4]:
{
    "type": "indicator",
    "spec_version": "2.1",
    "id": "indicator--d8f573d9-5796-4d3f-98fd-d3b953738520",
    "created": "2020-06-26T19:04:05.608201Z",
    "modified": "2020-06-26T19:04:05.608201Z",
    "description": "Smokey Bear implant",
    "pattern": "[file:hashes.'SHA-256' = '09c7e05a39a59428743635242e4a867c932140a909f12a1e54fa7ee6a440c73b']",
    "pattern_type": "stix",
    "pattern_version": "2.1",
    "valid_from": "2020-06-26T19:04:05.608201Z"
}
Out[4]:
{
    "type": "indicator",
    "spec_version": "2.1",
    "id": "indicator--acd03fef-b8df-4c45-bd89-1ede90c39959",
    "created": "2020-06-26T19:04:09.476525Z",
    "modified": "2020-06-26T19:04:09.476525Z",
    "description": "Smokey Bear implant",
    "pattern": "[file:hashes.'SHA-256' = '09c7e05a39a59428743635242e4a867c932140a909f12a1e54fa7ee6a440c73b']",
    "pattern_type": "stix",
    "pattern_version": "2.1",
    "valid_from": "2020-06-26T19:04:09.476525Z"
}

TAXIICollectionSink


In [5]:
from stix2 import TAXIICollectionSink, ThreatActor

#create TAXIICollectionSINK and push STIX content to it
tc_sink = TAXIICollectionSink(collection)

# create new STIX threat-actor
ta = ThreatActor(name="Teddy Bear",
                threat_actor_types=["nation-state"],
                sophistication="innovator",
                resource_level="government",
                goals=[
                    "compromising environment NGOs",
                    "water-hole attacks geared towards energy sector",
                ])

tc_sink.add(ta)

TAXIICollectionStore


In [6]:
from stix2 import TAXIICollectionStore

# create TAXIICollectionStore - note the same collection instance can
# be used for the store
tc_store = TAXIICollectionStore(collection)

# retrieve STIX object by id from TAXII Collection through
# TAXIICollectionStore
stix_obj2 = tc_source.get("malware--c0931cc6-c75e-47e5-9036-78fabc95d4ec")

print(stix_obj2)


Out[6]:
{
    "type": "malware",
    "spec_version": "2.1",
    "id": "malware--c0931cc6-c75e-47e5-9036-78fabc95d4ec",
    "created": "2017-01-27T13:49:53.997Z",
    "modified": "2017-01-27T13:49:53.997Z",
    "name": "Poison Ivy",
    "description": "Poison Ivy",
    "malware_types": [
        "remote-access-trojan"
    ],
    "is_family": true
}

In [7]:
from stix2 import Indicator

# add STIX object to TAXIICollectionStore
ind = Indicator(description="Smokey Bear implant",
                pattern_type="stix",
                pattern="[file:hashes.'SHA-256' = '09c7e05a39a59428743635242e4a867c932140a909f12a1e54fa7ee6a440c73b']")

tc_store.add(ind)

Bug and Workaround

You may get an error similar to the following when adding STIX objects to a TAXIICollectionStore or TAXIICollectionSink:

TypeError: Object of type ThreatActor is not JSON serializable

This is a known bug and we are working to fix it. For more information, see this GitHub issue In the meantime, try this workaround:


In [ ]:
tc_sink.add(json.loads(Bundle(ta).serialize()))

Or bypass the TAXIICollection altogether and interact with the collection itself:


In [ ]:
collection.add_objects(json.loads(Bundle(ta).serialize()))