In [21]:
# header for 2018-1 kernel
from pyCHX.chx_packages import *
from pyCHX.chx_generic_functions import append_txtfile
%matplotlib notebook
plt.rcParams.update({'figure.max_open_warning': 0})
plt.rcParams.update({ 'image.origin': 'lower' })
plt.rcParams.update({ 'image.interpolation': 'none' })
import pickle as cpk
#from pyCHX.chx_xpcs_xsvs_jupyter_V1 import *
#%run /home/yuzhang/pyCHX_link/pyCHX/chx_generic_functions.py
#%matplotlib inline
import papermill as pm
In [2]:
# import database -> should be hidden from user in same package....
import datetime
import pymongo
from tqdm import tqdm
from bson import ObjectId
import matplotlib.patches as mpatches
from IPython.display import clear_output
cli = pymongo.MongoClient('xf11id-ca')
samples_2 = cli.get_database('samples').get_collection('samples_2')
data_acquisition_collection = cli.get_database('samples').get_collection('data_acquisition_collection')
beamline_pos = cli.get_database('samples').get_collection('beamline_pos')
from databroker import Broker
dbx = Broker.named('temp') # for real applications, 'temp' would be 'chx'
print('available databases:')
print(cli.database_names())
print('\n available collection in database samples:')
print(cli.samples.collection_names())
In [ ]:
In [ ]:
In [4]:
username = 'koga'
def _chx_analysis_data( uid,
template_pipeline = '/nsls2/xf11id1/analysis/2019_3/AutoRuns/%s/XPCS_Single_2019_V7.ipynb'%username,
outDir = None,):
''' YG. Octo 6, 2018, Compress a eiger data using papermail
Input:
uid: string, the uique data id
force compress: if True, will force to compress data no matter the data was compressed already
The default compress pipeline
template_pipeline: str, the filename of the template pipeline
outDir:str, the path for the output pipeline
Output:
save the current pipeline to outDir
'''
if outDir is None:
outDir = '/nsls2/xf11id1/analysis/2019_3/%s/ResPipelines/'%username
os.makedirs(outDir, exist_ok=True)
output_pipeline = outDir + template_pipeline.split('/')[-1] + '_%s.ipynb'%uid
pm.execute_notebook(
template_pipeline, output_pipeline,
parameters = dict( uid = uid,
username = username ,
run_two_time = True,
run_dose = False ),
kernel_name='python3', report_mode=True )
In [5]:
if True:
temp1 = data_acquisition_collection.find_one({'_id':'general_list'})['compression_completed']
temp2= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_completed']
#temp3= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_failed']
temp4= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_failed_userX']
s2 = set(temp2)
#s3 = set(temp3)
s4 = set(temp4)
#######################################
#######Get uids to be compressed#######
uids = [x for x in temp1 if x not in s2 and x not in s4]
print(uids)
In [ ]:
#temp1[-10:]
In [ ]:
#temp4[-10:]
In [ ]:
#temp2[-10:]
In [ ]:
#not_done=[x for x in temp1 if x not in s2 and x not in s4]
#not_done
In [ ]:
temp1[-1], temp2[-1], temp4[-1]
In [12]:
def get_masked_analysis_database( start_uid ):
'''Give the uid for the first running and get the masked database'''
temp1 = data_acquisition_collection.find_one({'_id':'general_list'})['compression_completed']
for i, t in enumerate(temp1):
if t == start_uid:
print(i,t)
start_id = i +1
masked = data_acquisition_collection.update_one(
{'_id':'general_list'},{'$set':{'analysis_failed_userX': temp1[:start_id] } })
In [13]:
#start_fuid = '0b619b52-a8f8-4988-9947-ca40e009d2fc'
#get_masked_analysis_database( start_uid = start_fuid )
In [14]:
if True:
temp1 = data_acquisition_collection.find_one({'_id':'general_list'})['compression_completed']
temp2= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_completed']
#temp3= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_failed']
temp4= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_failed_userX']
s2 = set(temp2)
#s3 = set(temp3)
s4 = set(temp4)
#######################################
#######Get uids to be compressed#######
uids = [x for x in temp1 if x not in s2 and x not in s4]
print(uids)
In [15]:
#data_acquisition_collection.update_one( {'_id':'general_list'},{'$set':{'analysis_failed_petrash': temp1[:797] } })
data_acquisition_collection.update_one( {'_id':'general_list'},{'$set':{'analysis_in_progress': [] } })
Out[15]:
In [16]:
data_acquisition_collection.find_one({'_id':'general_list'})['analysis_in_progress']
Out[16]:
In [17]:
#uids = ['ad658cdf']
In [18]:
#temp1
In [19]:
#temp4.append(['2891d947-58b1-4db8-9b7e-c93f94259e78', 'f34aceea-dacb-439f-9e91-94bc37156354', '0639c0c3-7257-436f-9bfe-b964830e3823'] )
#data_acquisition_collection.update_one({'_id':'general_list'},{'$set':{'analysis_failed_Surita':tem}})
#data_acquisition_collection.update_one({'_id':'general_list'},{'$set':{'analysis_failed':[]}})
In [22]:
#%run ~/pyCHX_link/pyCHX/chx_generic_functions.py
In [25]:
#append_txtfile
In [26]:
uids
Out[26]:
In [28]:
txtDir = '/nsls2/xf11id1/analysis/2019_3/%s/'%username
txt_filename = '2019_Sep_XPCS_AutoProcess_srv2.txt'
X = []
txt_header = 'fuid, sample, notes, comp_time, comp_status'
print(txtDir,)
In [ ]:
In [ ]:
# get list of uids
end_of_compression_key='none' # stops if only this key is left in compressed_uid_list, 'none': not looking for key, just for empty list timeout
empty_list_timeout= 3600 * 24 * 6 #[s] stops if compressed_uid_list is empty for x s
time_count=0
run_condition = True
while run_condition:
clear_output()
temp1 = data_acquisition_collection.find_one({'_id':'general_list'})['compression_completed']
temp2= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_completed']
temp3= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_failed_userX']
temp4 = data_acquisition_collection.find_one({'_id':'general_list'})['analysis_in_progress']
#data_acquisition_collection.find_one({'_id':'general_list'})['analysis_failed']
s2 = set(temp2)
s3 = set(temp3)
s4 = set( temp4 )
#######################################
#######Get uids to be compressed#######
#uids = [x for x in temp1 if x not in s2 and x not in s3]
uids = [x for x in temp1 if x not in s2 and x not in s3 and x not in s4 ]
######################################
#uids = [ '8ef4e9b2-61cb-4e4c-b184-9e09e1d484af' ]
if uids: # list of uids is not empty
print('uid list for analysis is NOT empty, found '+str(len(uids))+' uids awaiting analysis.')
time_count=0
if end_of_compression_key != 'none' and uids[0] == end_of_compression_key: #looking for a stop key, next uid up IS the stop key
run_condition = False
print('Stop Key for analysis detected!')
else:
print('Doing data analysis for uid '+uids[0])
#######################################
#for ics in tqdm(range(100)):
# time.sleep(.35)
########################################
uid = uids[0]
if uid not in s4:
t0 = time.time()
try:
temp4.append( uid )
data_acquisition_collection.update_one({'_id':'general_list'},
{'$set':{ 'analysis_in_progress': temp4}})
_chx_analysis_data( uid )
# update list of compressed uids:
temp2= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_completed']
temp2.append(uids[0])
data_acquisition_collection.update_one({'_id':'general_list'},
{'$set':{ 'analysis_completed': temp2}})
status='succeed'
except:
temp3= data_acquisition_collection.find_one({'_id':'general_list'})['analysis_failed_userX']
temp3.append(uids[0])
data_acquisition_collection.update_one({'_id':'general_list'},{'$set':{ 'analysis_failed_userX': temp3}})
status='fail'
ts = (time.time() - t0)/60 #in unit of min
txt_header = 'fuid, sample, notes, comp_time, comp_status'
ss = db[uid]['start']
sample = ss['sample']
note= ss['Measurement']
x = [ uid, sample, note, ts, status ]
X.append(x)
##remove this uid from uid in analysis_in_progress
temp4 = data_acquisition_collection.find_one({'_id':'general_list'})['analysis_in_progress']
tx = [u for u in temp4 if u!=uid ]
data_acquisition_collection.update_one( {'_id':'general_list'},
{'$set':{'analysis_in_progress': tx } })
append_txtfile( txtDir + txt_filename, data=X, fmt='%s',
delimiter=',', header= txt_header)
else:
if time_count > empty_list_timeout:
print('uid list for analysis was empty for > '+str(empty_list_timeout)+'s -> stop looking for new uids')
run_condition = False
else:
time_count=time_count+5
print('list of uids for analysis is emtpy...going to look again in 5s.')
time.sleep(5)
In [ ]:
In [ ]:
In [ ]: