In [1]:
import os
import asyncio
import h5py
In [2]:
import file_transfer.datamover as dm
In [3]:
s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")
In [10]:
def download_from_s3(s3handle):
while True:
file_key = (yield)
s3handle.download_file(file_key)
In [8]:
test_list = [
"be/jab/2016/11/20/23/bejab_vp_20161120233000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120233500.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120234000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120234500.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120235000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120235500.h5"]
In [9]:
# initiate downloader
downloader = download_from_s3(s3handle)
downloader.send(None)
download_all = [downloader.send(file) for file in test_list]
In [61]:
test_list = [
"be/jab/2016/11/20/23/bejab_vp_20161120233000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120233500.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120234000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120234500.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120235000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120235500.h5",
"nl/dhl/2017/08/04/01/nldhl_vp_20170804010000.h5"]
In [62]:
def check_h5(file_key):
try:
h5py.File(file_key, mode="r")
return None
except:
return file_key
In [127]:
import aiobotocore
In [138]:
aiobotocore.AioSession(profile_name="lw-enram")
In [136]:
loop = asyncio.get_event_loop()
session = aiobotocore.get_session(loop=loop)
session.create_client(config=)
Out[136]:
In [116]:
session = aiobotocore.get_session(loop=loop)
client = session.create_client('s3')
#loop.run_until_complete(go(loop))
In [ ]:
In [108]:
async def download_from_s3(file_key):
session = aiobotocore.get_session(loop=loop)
s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")
s3handle.download_file(file_key)
return file_key
In [109]:
async def main(file_list):
downloaded_files = [download_from_s3(file) for file in testfiles]
results = []
for next_to_complete in asyncio.as_completed(downloaded_files):
file_key = await next_to_complete
if check_h5(file_key):
results.append(file_key)
print('results: {!r}'.format(results))
return results
In [110]:
%%time
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(main(test_list))
Comparison with nonsync
In [104]:
%%time
results = []
s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")
for file_key in testfiles:
s3handle.download_file(file_key)
if check_h5(file_key):
results.append(file_key)
In [106]:
results
Out[106]:
In [107]:
1
Out[107]:
In [ ]:
In [ ]:
In [100]:
testfiles = []
for j, file in enumerate(s3handle.bucket.objects.all()):
if file.key.endswith(".h5"):
testfiles.append(file.key)
if j == 200:
break
In [101]:
testfiles
Out[101]:
In [ ]:
In [ ]:
In [ ]:
In [3]:
s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")
In [4]:
async def download_h5(file):
await s3handle.download_file(file.key)
try:
h5py.File(file.key, mode="r")
except:
print(file.key)
return file.key
await os.remove(file.key)
In [9]:
async def download_h5(file):
await s3handle.download_file(file)
try:
h5py.File(file, mode="r")
print(file, 'readable')
except:
print(file)
return file
await os.remove(file)
In [10]:
test_list = [
"be/jab/2016/11/20/23/bejab_vp_20161120233000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120233500.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120234000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120234500.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120235000.h5",
"be/jab/2016/11/20/23/bejab_vp_20161120235500.h5"]
In [11]:
#h5_files = [download_h5(file) for file in s3handle.bucket.objects.all()]
h5_files = [download_h5(file) for file in test_list]
In [12]:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(h5_files))
loop.close()
In [13]:
h5_files
Out[13]:
In [ ]: