In [1]:
# Delete this cell to re-enable tracebacks
import sys
ipython = get_ipython()
def hide_traceback(exc_tuple=None, filename=None, tb_offset=None,
exception_only=False, running_compiled_code=False):
etype, value, tb = sys.exc_info()
value.__cause__ = None # suppress chained exceptions
return ipython._showtraceback(etype, value, ipython.InteractiveTB.get_exception_only(etype, value))
#ipython.showtraceback = hide_traceback
In [2]:
# JSON output syntax highlighting
from __future__ import print_function
from pygments import highlight
from pygments.lexers import JsonLexer, TextLexer
from pygments.formatters import HtmlFormatter
from IPython.display import display, HTML
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
def json_print(inpt):
string = str(inpt)
formatter = HtmlFormatter()
if string[0] == '{':
lexer = JsonLexer()
else:
lexer = TextLexer()
return HTML('<style type="text/css">{}</style>{}'.format(
formatter.get_style_defs('.highlight'),
highlight(string, lexer, formatter)))
globals()['print'] = json_print
The TAXIICollection suite contains TAXIICollectionStore, TAXIICollectionSource, and TAXIICollectionSink. TAXIICollectionStore pushes and retrieves STIX content to local/remote TAXII Collection(s). TAXIICollectionSource retrieves STIX content from local/remote TAXII Collection(s). TAXIICollectionSink pushes STIX content to local/remote TAXII Collection(s). Each of the interfaces is designed to be bound to a Collection from the taxii2client library (taxii2client.Collection), where all TAXIICollection API calls will be executed through that Collection instance.
A note on TAXII2 searching/filtering of STIX content: TAXII2 server implementations natively support searching on the STIX2 object properties: id, type and version; API requests made to TAXII2 can contain filter arguments for those 3 properties. However, the TAXIICollection suite supports searching on all STIX2 common object properties (see Filters documentation for full listing). This works simply by augmenting the filtering that is done remotely at the TAXII2 server instance. TAXIICollection will seperate any supplied queries into TAXII supported filters and non-supported filters. During a TAXIICollection API call, TAXII2 supported filters get inserted into the TAXII2 server request (to be evaluated at the server). The rest of the filters are kept locally and then applied to the STIX2 content that is returned from the TAXII2 server, before being returned from the TAXIICollection API call.
In [3]:
from stix2 import TAXIICollectionSource
from taxii2client import Collection
# establish TAXII2 Collection instance
collection = Collection("http://127.0.0.1:5000/trustgroup1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/", user="admin", password="Password0")
# supply the TAXII2 collection to TAXIICollection
tc_source = TAXIICollectionSource(collection)
#retrieve STIX objects by id
stix_obj = tc_source.get("malware--c0931cc6-c75e-47e5-9036-78fabc95d4ec")
stix_obj_versions = tc_source.all_versions("indicator--6770298f-0fd8-471a-ab8c-1c658a46574e")
#for visual purposes
print(stix_obj)
print("-------")
for so in stix_obj_versions:
print(so)
Out[3]:
Out[3]:
Out[3]:
Out[3]:
Out[3]:
In [4]:
from stix2 import Filter
# retrieve multiple object from TAXIICollectionSource
# by using filters
f1 = Filter("type","=", "indicator")
indicators = tc_source.query([f1])
#for visual purposes
for indicator in indicators:
print(indicator)
Out[4]:
Out[4]:
Out[4]:
Out[4]:
In [5]:
from stix2 import TAXIICollectionSink, ThreatActor
#create TAXIICollectionSINK and push STIX content to it
tc_sink = TAXIICollectionSink(collection)
# create new STIX threat-actor
ta = ThreatActor(name="Teddy Bear",
threat_actor_types=["nation-state"],
sophistication="innovator",
resource_level="government",
goals=[
"compromising environment NGOs",
"water-hole attacks geared towards energy sector",
])
tc_sink.add(ta)
In [6]:
from stix2 import TAXIICollectionStore
# create TAXIICollectionStore - note the same collection instance can
# be used for the store
tc_store = TAXIICollectionStore(collection)
# retrieve STIX object by id from TAXII Collection through
# TAXIICollectionStore
stix_obj2 = tc_source.get("malware--c0931cc6-c75e-47e5-9036-78fabc95d4ec")
print(stix_obj2)
Out[6]:
In [7]:
from stix2 import Indicator
# add STIX object to TAXIICollectionStore
ind = Indicator(description="Smokey Bear implant",
pattern_type="stix",
pattern="[file:hashes.'SHA-256' = '09c7e05a39a59428743635242e4a867c932140a909f12a1e54fa7ee6a440c73b']")
tc_store.add(ind)
You may get an error similar to the following when adding STIX objects to a TAXIICollectionStore or TAXIICollectionSink:
TypeError: Object of type ThreatActor is not JSON serializable
This is a known bug and we are working to fix it. For more information, see this GitHub issue In the meantime, try this workaround:
In [ ]:
tc_sink.add(json.loads(Bundle(ta).serialize()))
Or bypass the TAXIICollection altogether and interact with the collection itself:
In [ ]:
collection.add_objects(json.loads(Bundle(ta).serialize()))