In [1]:
import sys
sys.path.append("../app")
import time
from webview import WebView
from webview.run import Run
webview = WebView(username='crawl', password='test')
In [2]:
# TODO handle effigy exception
#run = webview.start_run(workflow_file="../sum-display.kar", parameters={'x':12}, synchronous=True)
#assert run.error() == 'no effigy'
#assert run.outputs() == []
In [3]:
# test synchronous, success
run = webview.start_run(workflow_file="../workflows/sum-webview1.kar", parameters={'x':12}, synchronous=True)
status = run.status()
assert status['workflowName'] == 'sum-webview1'
assert run.workflow_name() == 'sum-webview1'
assert status['status'] == 'complete'
assert run.error() == None
assert run.outputs() == [{'sum':13}]
In [4]:
# test asynchronous, success
run = webview.start_run(workflow_file="../workflows/sum-webview1.kar", parameters={'x':12}, synchronous=False)
print("Error = {}".format(run.error()))
assert run.error() == None
assert run.outputs() == [{'sum':13}]
print("Outputs = {}".format(run.outputs()))
In [5]:
# test synchronous, failure
try:
run = webview.start_run(workflow_name="ListSharedRuns.kar", parameters={'x':1}, synchronous=True)
except Exception as err:
assert str(err) == 'Error executing workflow: Error setting parameters: Workflow does not have settable parameter x'
#print(run.error())
#print(run.outputs())
In [6]:
# test asynchronous, failure
run = webview.start_run(workflow_file="../workflows/exception.kar", synchronous=False)
status = run.status()
assert status['runError'] == 'Model triggered an exception.\n in .exception.Throw Exception'
assert status['status'] == 'error'
assert status['workflowName'] == 'exception'
assert run.error() == status['runError']
In [7]:
# test bad run id
run = Run(username='crawl', password='test', id='urn:lsid:uuid:dae23ddf-fff-4dcc-9ca8-fff:87:232')
try:
run.is_running()
except Exception as err:
assert str(err) == 'Error: Bad run id.'
In [ ]: