Scattering and SBG-FS file stream read notebook

This notebook explores setting up tasks for scatter and SBG's file storage setup. This runs multiple samples in a scatter plus batch mode.


In [1]:
import sevenbridges as sbg
from sevenbridges.errors import SbgError
from sevenbridges.http.error_handlers import *
import re
import datetime
import binpacking
print("SBG library imported.")
print sbg.__version__


SBG library imported.
0.7.2

Logging into your account on CGC

Use your authentication token to sync up your account


In [2]:
prof = 'default'
config_file = sbg.Config(profile=prof)
api = sbg.Api(config=config_file,error_handlers=[rate_limit_sleeper,maintenance_sleeper,general_error_sleeper])
print "Api Configured!!"
print "Api Username : ", api.users.me()


Api Configured!!
Api Username :  <User: username=anellor1>

Finding the project


In [3]:
my_project = api.projects.get(id='anellor1/omfgene')

In [4]:
for m in my_project.get_members():
    print m
print my_project.billing_group


<Member: username=anellor1>
<Member: username=elehnert>
<Member: username=raunaqm_cgc>
aaf01c96-dc89-4b54-b0e9-46397b3d907d

Listing bam files in the project


In [5]:
#Listing all files in a project
files = [f for f in api.files.query(project=my_project,limit=100).all() if f.name.endswith(".bam")]
print len(files)


11096

Get the app to run


In [6]:
app = api.apps.get(id="anellor1/omfgene/omfgene-wrapper")
print app.name
input_port_app = 'input_file'


omfgene-wrapper

Set up the number of files per task


In [7]:
import math
inputs = {}

num_files = len(files)
num_hosts = 10 #instances in workflow
jobs_per_host = 36 #threads in per instance
minutes_per_run = 25 #estimated
runs_per_hour = 300 / minutes_per_run # Setting number of hours to run an task to be a LCD of minutes_per_run
tasks_per_run = runs_per_hour * jobs_per_host * num_hosts
num_runs = int(math.ceil(num_files*1.0 / tasks_per_run))

In [8]:
print num_files,tasks_per_run,num_runs


11096 4320 3

Set up draft tasks and perform analysis


In [9]:
for run_index in range(num_runs): 
    low_bound = run_index * tasks_per_run
    high_bound = min((run_index + 1) * tasks_per_run, num_files)
    #print low_bound,high_bound
    
    input_files = files[low_bound:high_bound]
    
    task_name = "OMFGene task Run:{}, NumFiles:{}, TimeStamp {}".format(run_index+1, high_bound-low_bound, datetime.datetime.now())
    
    inputs[input_port_app] = input_files
    my_task = api.tasks.create(name=task_name, project=my_project, 
                             app=app, inputs=inputs, run=False)
    if my_task.errors:
        print(my_task.errors())
    else: 
        print('Your task %s is ready to go' % my_task.name)
        # Comment off the statement for execution of tasks.
        my_task.run()


Your task OMFGene task Run:1, NumFiles:4320, TimeStamp 2017-04-11 15:50:08.901118 is ready to go
Your task OMFGene task Run:2, NumFiles:4320, TimeStamp 2017-04-11 15:50:27.317010 is ready to go
Your task OMFGene task Run:3, NumFiles:2456, TimeStamp 2017-04-11 15:50:43.074345 is ready to go

In [ ]: