In [ ]:
import os, sys
try:
from synapse.lib.jupyter import *
except ImportError as e:
# Insert the root path of the repository to sys.path.
# This assumes the notebook is located three directories away
# From the root synapse directory. It may need to be varied
synroot = os.path.abspath('../../../')
sys.path.insert(0, synroot)
from synapse.lib.jupyter import *
In [ ]:
# For printing command example output
import synapse.lib.cell as s_cell
import synapse.lib.stormsvc as s_stormsvc
class MySvcApi(s_cell.CellApi, s_stormsvc.StormSvc):
_storm_svc_name = 'mysvc'
_storm_svc_vers = (0, 0, 1)
_storm_svc_pkgs = (
{
'name': 'mysvc',
'version': (0, 0, 1),
'commands': (
{
'name': 'mysvc.get',
'descr': 'Example Storm service command.',
'storm': '$lib.print("Hello Storm service.")',
},
),
},
)
class MySvc(s_cell.Cell):
cellapi = MySvcApi
In [ ]:
cmdr, svcprox = await getTempCoreCmdrStormsvc('mysvc', MySvc.anit, svcconf=None, outp=None)
svcs = await cmdr.core.callStorm('return($lib.service.list())')
assert len(svcs) == 1
svcurl = svcs[0]['url']
svciden = svcs[0]['iden']
await cmdr.storm(f'$lib.service.del({svciden})')
svcs = await cmdr.core.callStorm('return($lib.service.list())')
assert len(svcs) == 0
Services are added to a Cortex with the service.add
command.
In [ ]:
await cmdr.runCmdLine(f'storm service.add mysvc {svcurl}')
Services that have been connected to the Cortex can be listed with the service.list
command.
In [ ]:
await cmdr.runCmdLine(f'storm service.list')
In [ ]:
import sys
import asyncio
import synapse.lib.cell as s_cell
import synapse.lib.stormsvc as s_stormsvc
# The Storm definitions below are included here for convenience
# but are typically contained in a separate storm.py file and imported to service.py.
# Other Storm commands could be created to call the additional Telepath endpoints.
svc_name = 'example'
svc_guid = '0ecc1eb65659a0f07141bc1a360abda3' # can be generated with synapse.common.guid()
svc_vers = (0, 0, 1)
svc_evts = {
'add': {
'storm': f'[(meta:source={svc_guid} :name="Example data")]'
}
}
svc_mod_ingest_storm = '''
function ingest_ips(data, srcguid) {
$results = $lib.set()
for $ip in $data {
[ inet:ipv4=$ip ]
// Lightweight edge back to meta:source
{ [ <(seen)+ { meta:source=$srcguid } ] }
{ +inet:ipv4 $results.add($node) }
}
| spin |
return($results)
}
'''
# The first line of this description will display in the Storm help
svc_cmd_get_desc = '''
Query the Example service.
Examples:
# Query the service and create an IPv4 node
inet:fqdn=good.com | example.get
# Query the service and yield the created inet:ipv4 node
inet:fqdn=good.com | example.get --yield
'''
svc_cmd_get_forms = {
'input': [
'inet:fqdn',
],
'output': [
'inet:ipv4',
],
}
svc_cmd_get_args = (
('--yield', {'default': False, 'action': 'store_true',
'help': 'Whether to yield the created nodes to the output stream.'}),
('--debug', {'default': False, 'action': 'store_true',
'help': 'Enable debug output.'}),
)
svc_cmd_get_conf = {
'srcguid': svc_guid,
}
svc_cmd_get_storm = '''
init {
$svc = $lib.service.get($cmdconf.svciden)
$ingest = $lib.import(example.ingest)
$srcguid = $cmdconf.srcguid
$debug = $cmdopts.debug
$yield = $cmdopts.yield
}
// $node is a special variable that references the inbound Node object
$form = $node.form()
switch $form {
"inet:fqdn": {
$query=$node.repr()
}
*: {
$query=""
$lib.warn("Example service does not support {form} nodes", form=$form)
}
}
// Yield behavior to drop the inbound node
if $yield { spin }
// Call the service endpoint and ingest the results
if $query {
if $debug { $lib.print("example.get query: {query}", query=$query) }
$retn = $svc.getData($query)
if $retn.status {
$results = $ingest.ingest_ips($retn.data, $srcguid)
if $yield {
for $result in $results { $lib.print($result) yield $result }
}
} else {
$lib.warn("example.get error: {err}", err=$retn.mesg)
}
}
'''
svc_cmds = (
{
'name': f'{svc_name}.get',
'descr': svc_cmd_get_desc,
'cmdargs': svc_cmd_get_args,
'cmdconf': svc_cmd_get_conf,
'forms': svc_cmd_get_forms,
'storm': svc_cmd_get_storm,
},
)
svc_pkgs = (
{
'name': svc_name,
'version': svc_vers,
'modules': (
{
'name': f'{svc_name}.ingest',
'storm': svc_mod_ingest_storm,
},
),
'commands': svc_cmds,
},
)
class ExampleApi(s_cell.CellApi, s_stormsvc.StormSvc):
'''
A Telepath API for the Example service.
'''
# These defaults must be overridden from the StormSvc mixin
_storm_svc_name = svc_name
_storm_svc_vers = svc_vers
_storm_svc_evts = svc_evts
_storm_svc_pkgs = svc_pkgs
async def getData(self, query):
return await self.cell.getData(query)
async def getInfo(self):
await self._reqUserAllowed(('example', 'info'))
return await self.cell.getInfo()
@s_cell.adminapi()
async def getAdminInfo(self):
return await self.cell.getAdminInfo()
class Example(s_cell.Cell):
cellapi = ExampleApi
confdefs = {
'api_key': {
'type': 'string',
'description': 'API key for accessing an external service.',
},
'api_url': {
'type': 'string',
'description': 'The URL for an external service.',
'default': 'https://example.com',
},
}
async def __anit__(self, dirn, conf):
await s_cell.Cell.__anit__(self, dirn, conf=conf)
self.apikey = self.conf.get('api_key')
self.apiurl = self.conf.get('api_url')
async def getData(self, query):
# Best practice is to also return a status and optional message in case of an error
retn = {
'status': True,
'data': None,
'mesg': None,
}
# Retrieving and parsing data would go here
if query == 'good.com':
data = ['1.2.3.4', '5.6.7.8']
retn['data'] = data
else:
retn['status'] = False
retn['mesg'] = 'An error occurred during data retrieval.'
return retn
async def getInfo(self):
info = {
'generic': 'info',
}
return info
async def getAdminInfo(self):
info = {
'admin': 'info',
}
return info
In [ ]:
cmdr, svcprox = await getTempCoreCmdrStormsvc('example', Example.anit, svcconf=None, outp=None)
msgs = await cmdr.storm('service.list')
assert(any(['True (example)' in str(msg) for msg in msgs]))
nodes = await cmdr.eval(f'meta:source={svc_guid}')
assert(len(nodes) == 1)
await cmdr.eval('[inet:fqdn=good.com inet:fqdn=bad.com]')
nodes = await cmdr.eval(f'inet:fqdn=good.com | example.get --yield')
assert(len(nodes) == 2)
assert(all(node[0][0] == 'inet:ipv4' for node in nodes))
nodes = await cmdr.eval(f'inet:fqdn=good.com | example.get')
assert(len(nodes) == 1)
assert(nodes[0][0][0] == 'inet:fqdn')
msgs = await cmdr.storm('inet:fqdn=bad.com | example.get')
assert(any(['error occurred' in str(msg) for msg in msgs]))
info = await svcprox.getInfo()
assert(info == {'generic': 'info'})
info = await svcprox.getAdminInfo()
assert(info == {'admin': 'info'})
_ = await cmdr.fini()
_ = await svcprox.fini()