Handling SC2017 PRP productions

This module has functions to:

  • create all the workload
  • move from state to state [created,training,trained,generating,generated,transporting,done]

In [ ]:
import sys
import subprocess
from time import time, sleep
from elasticsearch import Elasticsearch, exceptions as es_exceptions
es = Elasticsearch(['atlas-kibana.mwt2.org:9200'],timeout=60)
index_name='sc2017'

create template


In [1]:
! curl -XPOST "atlas-kibana.mwt2.org:9200/_template/sc2017" -d @SC2017_template.json


{"acknowledged":true}

[--nb-epochs NB_EPOCHS] [--batch-size BATCH_SIZE] [--latent-size LATENT_SIZE] [--disc-lr DISC_LR] [--gen-lr GEN_LR] [--adam-beta ADAM_BETA] [--prog-bar] [--no-attn] [--debug] [--d-pfx D_PFX] [--g-pfx G_PFX] dataset


In [ ]:
def create_workload():
    
    # clean all
    try:
        es.indices.delete(index='sc2017')
    except:
        print("not there?")
    
    id=0
    
    l0 = [50,100,200]
    l1 = [0.0001,0.0002,0.0005]
    l2  = [0.00001, 0.00002, 0.00005]
    l3    = ['', '--no-attn']
    l4    = ['gamma.yaml', 'eplus.yaml', 'pion.yaml']
    for a in l0:
        for b in l1:
            for c in l2:
                for d in l3:
                    for e in l4:
                        doc={}
                        doc['created']=int(time()*1000)
                        doc['status']='created'
                        doc['training']={
                            'nb-epochs' : a,
                            'disc-lr' : b,
                            'gen-lr' : c,
                            'particle' : e,
                            'output_folder':'/data/CaloGAN/weights/'+str(id)
                        }
                        if d!='': doc['training']['options'] = d
                        doc['generator']={
                            'input_folder':'/data/CaloGAN/weights/'+str(id), 
                            'output_folder':'/data/CaloGAN/outputs/'+str(id), 
                            'epochs' : a,
                            'sets' : 10, 
                            'showers' : 100000
                        }
                        doc['transferring_options']=['root://faxbox.usatlas.org:1094//faxbox2/user/ivukotic/outputs/'+str(id)]
#                         print(doc)
                        es.create(index=index_name, doc_type='doc', id=id, body=doc)
                        id+=1


def get_training_job():
    my_query={
        "size": 1,
        "query":{ 
            "term": {"status":"created"} 
        }
    }

    res = es.search(index=index_name, body=my_query )
    res=res['hits']
    if res['total']==0:
        print('no training jobs at the moment.')
        return
    res_id=res['hits'][0]['_id']
    res=res['hits'][0]['_source']
    
    res['status']='training'
    res['start_training']=int(time()*1000)
    es.update(index=index_name, doc_type='doc', id=res_id, body={"doc":res})
    return (res_id, res)

def get_generating_job():
    my_query={
        "size": 1,
        "query":{ 
            "term": {"status":"trained"} 
        }
    }

    res = es.search(index=index_name, body=my_query )
    res=res['hits']
    if res['total']==0:
        print('no generating jobs at the moment.')
        return
    res_id=res['hits'][0]['_id']
    res=res['hits'][0]['_source']
    
    res['status']='generating'
    res['start_generating']=int(time()*1000)
    es.update(index=index_name, doc_type='doc', id=res_id, body={"doc":res})
    return (res_id, res)

def get_transfering_job():
    my_query={
        "size": 1,
        "query":{ 
            "term": {"status":"generated"} 
        }
    }

    res = es.search(index=index_name, body=my_query )
    res=res['hits']
    if res['total']==0:
        print('no transfering jobs at the moment.')
        return
    res_id=res['hits'][0]['_id']
    res=res['hits'][0]['_source']
    
    res['status']='transfering'
    res['start_transfering']=int(time()*1000)
    es.update(index=index_name, doc_type='doc', id=res_id, body={"doc":res})
    return (res_id, res)

def done_training(id):
    res={'status':'trained', 'end_training':int(time()*1000) }
    es.update(index=index_name, doc_type='doc', id=id, body={"doc":res})

def done_generating(id):
    res={'status':'generated', 'end_generating':int(time()*1000) }
    es.update(index=index_name, doc_type='doc', id=id, body={"doc":res})

def done_transfering(id):
    res={'status':'transfered', 'end_transfering':int(time()*1000) }
    es.update(index=index_name, doc_type='doc', id=id, body={"doc":res})

def set_status(id, new_status):
    res={'status':new_status }
    es.update(index=index_name, doc_type='doc', id=id, body={"doc":res})

Test all the steps


In [ ]:
def test_flow():
    
    create_workload()

    (id, job) = get_training_job()
    print(id, job)
    sleep(5)
    done_training(id)

    sleep(15)

    (id, job) = get_generating_job()
    print(id, job)
    sleep(5)
    done_generating(id)

    sleep(15)

    (id, job) = get_transfering_job()
    print(id, job)
    sleep(5)
    done_transfering(id)

In [ ]:
if __name__=='__main__':
    if len(sys.argv)!=2:
        print('Usage - sc2017.py <creator|trainer|generator|transporter>')
    else:
        print( 'this pod will be:', sys.argv[1] )
    
    role=sys.argv[1]
    if role=='creator':
        create_workload()
    elif role=='trainer':
        while (True):
            res = get_training_job()
            if not res:
                print('waiting...')
                sleep(120)
                continue
            (id, job) = res
            print('training job:',id, '\nsetting up:\n', job)
            op=job['training']
            
            print ('(re)create output directory')
            output = subprocess.check_output(['rm', '-rf', op['output_folder']])
            output = subprocess.check_output(['mkdir', '-p', op['output_folder']])

            options=[]
            options.append( '--output_folder=' + op['output_folder'] )
            options.append( '--nb-epochs=' + str(op['nb-epochs']) )
            options.append( '--disc-lr=' + str(op['disc-lr']) )
            options.append( '--gen-lr=' + str(op['gen-lr']) )
            if 'options' in op: 
                options.append( op['options'] )
            options.append( op['particle'] )
            print(options)
            output = subprocess.check_output(['/ML_platform_tests/tutorial/sc2017_prp/train.py'] + options)
            print(output)
            done_training(id)
            sleep(15)
    elif role=='generator':
        while (True):
            res = get_generating_job()
            if not res:
                print('waiting...')
                sleep(120)
                continue
            (id, job) = res
            print('generator job:',id, '\nsetting up:\n', job)
            g=job['generator']
            output = subprocess.check_output(['rm', '-rf', g['output_folder']])
            output = subprocess.check_output(['mkdir', '-p', g['output_folder']])
            options=[g['input_folder'], g['output_folder'], str(g['epochs']), str(g['sets']), str(g['showers']) ]
            print(options)
            output = subprocess.check_output(['/ML_platform_tests/tutorial/sc2017_prp/generator.py']+options)
            print(output)
            done_generating(id)
            sleep(15)
    elif role=='transporter':
        while (True):
            res = get_transfering_job()
            if not res:
                print('waiting...')
                sleep(120)
                continue
            (id, job) = res
            print('transporter job:',id, '\nsetting up:\n', job)
            output_folder=job['generator']['output_folder']
            output = subprocess.check_output( ['xrdcp', '-r', output_folder, job['transferring_options'] ] )
            print(output)
            done_transfering(id)
            sleep(60)

In [ ]:
create_workload()

In [ ]:
set_statut(0,"created")