In [1]:
import os
import asyncio

import h5py

In [2]:
import file_transfer.datamover as dm

Testzone

Writing the download and cleaning as coroutines


In [3]:
s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")

In [10]:
def download_from_s3(s3handle):
    while True:
        file_key = (yield)
        s3handle.download_file(file_key)

In [8]:
test_list = [
    "be/jab/2016/11/20/23/bejab_vp_20161120233000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120233500.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120234000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120234500.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120235000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120235500.h5"]

In [9]:
# initiate downloader
downloader = download_from_s3(s3handle)
downloader.send(None)
download_all = [downloader.send(file) for file in test_list]


starting
be/jab/2016/11/20/23/bejab_vp_20161120233000.h5
downloaded be/jab/2016/11/20/23/bejab_vp_20161120233000.h5
be/jab/2016/11/20/23/bejab_vp_20161120233500.h5
downloaded be/jab/2016/11/20/23/bejab_vp_20161120233500.h5
be/jab/2016/11/20/23/bejab_vp_20161120234000.h5
downloaded be/jab/2016/11/20/23/bejab_vp_20161120234000.h5
be/jab/2016/11/20/23/bejab_vp_20161120234500.h5
downloaded be/jab/2016/11/20/23/bejab_vp_20161120234500.h5
be/jab/2016/11/20/23/bejab_vp_20161120235000.h5
downloaded be/jab/2016/11/20/23/bejab_vp_20161120235000.h5
be/jab/2016/11/20/23/bejab_vp_20161120235500.h5
downloaded be/jab/2016/11/20/23/bejab_vp_20161120235500.h5

Tasks scheduling


In [61]:
test_list = [
    "be/jab/2016/11/20/23/bejab_vp_20161120233000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120233500.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120234000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120234500.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120235000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120235500.h5",
    "nl/dhl/2017/08/04/01/nldhl_vp_20170804010000.h5"]

In [62]:
def check_h5(file_key):
    try:
        h5py.File(file_key, mode="r")
        return None
    except:
        return file_key

In [127]:
import aiobotocore

In [138]:
aiobotocore.AioSession(profile_name="lw-enram")


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-138-390709882e16> in <module>()
----> 1 aiobotocore.AioSession(profile_name="lw-enram")

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/site-packages/aiobotocore/session.py in __init__(self, *args, **kwargs)
     13         self._loop = kwargs.pop('loop', None)
     14 
---> 15         super().__init__(*args, **kwargs)
     16 
     17     def create_client(self, service_name, region_name=None, api_version=None,

TypeError: __init__() got an unexpected keyword argument 'profile_name'

In [136]:
loop = asyncio.get_event_loop()
session = aiobotocore.get_session(loop=loop)
session.create_client(config=)


Out[136]:
['lw-enram', 'default']

In [116]:
session = aiobotocore.get_session(loop=loop)
client = session.create_client('s3')
#loop.run_until_complete(go(loop))


---------------------------------------------------------------------------
DataNotFoundError                         Traceback (most recent call last)
<ipython-input-116-ae8a884c81bd> in <module>()
      1 session = aiobotocore.get_session(loop=loop)
----> 2 client = session.create_client('s3')
      3 #loop.run_until_complete(go(loop))

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/site-packages/aiobotocore/session.py in create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config)
     64         else:
     65             credentials = self.get_credentials()
---> 66         endpoint_resolver = self.get_component('endpoint_resolver')
     67         exceptions_factory = self.get_component('exceptions_factory')
     68         client_creator = AioClientCreator(

/home/stijn_vanhoey/.local/lib/python3.5/site-packages/botocore/session.py in get_component(self, name)

/home/stijn_vanhoey/.local/lib/python3.5/site-packages/botocore/session.py in get_component(self, name)

/home/stijn_vanhoey/.local/lib/python3.5/site-packages/botocore/session.py in create_default_resolver()

/home/stijn_vanhoey/.local/lib/python3.5/site-packages/botocore/loaders.py in _wrapper(self, *args, **kwargs)

/home/stijn_vanhoey/.local/lib/python3.5/site-packages/botocore/loaders.py in load_data(self, name)

DataNotFoundError: Unable to load data for: endpoints

In [ ]:


In [108]:
async def download_from_s3(file_key):
    session = aiobotocore.get_session(loop=loop)
    
    
    s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")
    s3handle.download_file(file_key) 
    return file_key

In [109]:
async def main(file_list):
    downloaded_files = [download_from_s3(file) for file in testfiles]
    
    results = []
    for next_to_complete in asyncio.as_completed(downloaded_files):
        file_key = await next_to_complete
        if check_h5(file_key):
            results.append(file_key)
            
    print('results: {!r}'.format(results))
    return results

In [110]:
%%time
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(main(test_list))


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<timed exec> in <module>()

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
    464             raise RuntimeError('Event loop stopped before Future completed.')
    465 
--> 466         return future.result()
    467 
    468     def stop(self):

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/asyncio/futures.py in result(self)
    291             self._tb_logger = None
    292         if self._exception is not None:
--> 293             raise self._exception
    294         return self._result
    295 

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/asyncio/tasks.py in _step(***failed resolving arguments***)
    237                 # We use the `send` method directly, because coroutines
    238                 # don't have `__iter__` and `__next__` methods.
--> 239                 result = coro.send(None)
    240             else:
    241                 result = coro.throw(exc)

<ipython-input-109-03b8c28593bf> in main(file_list)
      4     results = []
      5     for next_to_complete in asyncio.as_completed(downloaded_files):
----> 6         file_key = await next_to_complete
      7         if check_h5(file_key):
      8             results.append(file_key)

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/asyncio/tasks.py in _wait_for_one()
    498             # Dummy value from _on_timeout().
    499             raise futures.TimeoutError
--> 500         return f.result()  # May raise f.exception().
    501 
    502     for f in todo:

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/asyncio/futures.py in result(self)
    291             self._tb_logger = None
    292         if self._exception is not None:
--> 293             raise self._exception
    294         return self._result
    295 

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/asyncio/tasks.py in _step(***failed resolving arguments***)
    237                 # We use the `send` method directly, because coroutines
    238                 # don't have `__iter__` and `__next__` methods.
--> 239                 result = coro.send(None)
    240             else:
    241                 result = coro.throw(exc)

<ipython-input-108-3a744067d36b> in download_from_s3(file_key)
      1 async def download_from_s3(file_key):
      2     s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")
----> 3     await s3handle.download_file(file_key)
      4     return file_key

TypeError: object NoneType can't be used in 'await' expression

Comparison with nonsync


In [104]:
%%time
results = []
s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")
for file_key in testfiles:
    s3handle.download_file(file_key) 
    if check_h5(file_key):
            results.append(file_key)


CPU times: user 1.22 s, sys: 224 ms, total: 1.45 s
Wall time: 24.3 s

In [106]:
results


Out[106]:
[]

In [107]:
1


Out[107]:
1

In [ ]:


In [ ]:


In [100]:
testfiles = []
for j, file in enumerate(s3handle.bucket.objects.all()):
    if file.key.endswith(".h5"):
        testfiles.append(file.key)
    if j == 200:
        break

In [101]:
testfiles


Out[101]:
['be/jab/2016/11/20/23/bejab_vp_20161120233000.h5',
 'be/jab/2016/11/20/23/bejab_vp_20161120233500.h5',
 'be/jab/2016/11/20/23/bejab_vp_20161120234000.h5',
 'be/jab/2016/11/20/23/bejab_vp_20161120234500.h5',
 'be/jab/2016/11/20/23/bejab_vp_20161120235000.h5',
 'be/jab/2016/11/20/23/bejab_vp_20161120235500.h5',
 'be/wid/2016/11/20/23/bewid_vp_20161120233000.h5',
 'be/wid/2016/11/20/23/bewid_vp_20161120233500.h5',
 'be/wid/2016/11/20/23/bewid_vp_20161120234000.h5',
 'be/wid/2016/11/20/23/bewid_vp_20161120234500.h5',
 'be/wid/2016/11/20/23/bewid_vp_20161120235000.h5',
 'be/wid/2016/11/20/23/bewid_vp_20161120235500.h5',
 'ch/lem/2016/11/20/23/chlem_vp_20161120233000.h5',
 'ch/lem/2016/11/20/23/chlem_vp_20161120234500.h5',
 'cz/brd/2016/11/20/23/czbrd_vp_20161120233000.h5',
 'cz/brd/2016/11/20/23/czbrd_vp_20161120234400.h5',
 'cz/brd/2016/11/20/23/czbrd_vp_20161120234500.h5',
 'cz/brd/2016/11/20/23/czbrd_vp_20161120235900.h5',
 'cz/brd/2017/01/14/23/czbrd_vp_20170114233000.h5',
 'cz/brd/2017/01/14/23/czbrd_vp_20170114234500.h5',
 'cz/brd/2017/01/15/03/czbrd_vp_20170115034500.h5',
 'cz/brd/2017/01/15/04/czbrd_vp_20170115043000.h5',
 'cz/brd/2017/01/15/05/czbrd_vp_20170115054500.h5',
 'cz/brd/2017/01/15/06/czbrd_vp_20170115060000.h5',
 'cz/brd/2017/01/15/06/czbrd_vp_20170115061500.h5',
 'cz/brd/2017/01/15/06/czbrd_vp_20170115063000.h5',
 'cz/brd/2017/01/15/11/czbrd_vp_20170115110000.h5',
 'cz/brd/2017/01/15/11/czbrd_vp_20170115111500.h5',
 'cz/brd/2017/01/15/11/czbrd_vp_20170115113000.h5',
 'cz/brd/2017/01/15/13/czbrd_vp_20170115130000.h5',
 'cz/brd/2017/01/15/14/czbrd_vp_20170115143000.h5',
 'cz/brd/2017/01/15/17/czbrd_vp_20170115171500.h5',
 'cz/brd/2017/01/15/17/czbrd_vp_20170115174500.h5',
 'cz/brd/2017/01/15/18/czbrd_vp_20170115180000.h5',
 'cz/brd/2017/01/15/18/czbrd_vp_20170115181500.h5',
 'cz/brd/2017/01/15/19/czbrd_vp_20170115193000.h5',
 'cz/brd/2017/01/15/21/czbrd_vp_20170115214500.h5',
 'cz/brd/2017/01/15/23/czbrd_vp_20170115231500.h5',
 'cz/brd/2017/01/16/04/czbrd_vp_20170116041500.h5',
 'cz/brd/2017/01/16/05/czbrd_vp_20170116054500.h5',
 'cz/brd/2017/01/16/06/czbrd_vp_20170116060000.h5',
 'cz/brd/2017/01/16/10/czbrd_vp_20170116100000.h5',
 'cz/brd/2017/01/16/11/czbrd_vp_20170116111500.h5',
 'cz/brd/2017/01/16/11/czbrd_vp_20170116113000.h5',
 'cz/brd/2017/01/16/14/czbrd_vp_20170116144500.h5',
 'cz/brd/2017/01/16/15/czbrd_vp_20170116154500.h5',
 'cz/brd/2017/01/16/17/czbrd_vp_20170116174500.h5',
 'cz/brd/2017/01/16/18/czbrd_vp_20170116180000.h5',
 'cz/brd/2017/01/16/20/czbrd_vp_20170116204500.h5',
 'cz/brd/2017/01/16/23/czbrd_vp_20170116231500.h5',
 'cz/brd/2017/01/16/23/czbrd_vp_20170116233000.h5',
 'cz/brd/2017/01/17/02/czbrd_vp_20170117024500.h5',
 'cz/brd/2017/01/17/04/czbrd_vp_20170117043000.h5',
 'cz/brd/2017/01/17/05/czbrd_vp_20170117054500.h5',
 'cz/brd/2017/01/17/06/czbrd_vp_20170117060000.h5',
 'cz/brd/2017/01/17/06/czbrd_vp_20170117061500.h5',
 'cz/brd/2017/01/17/06/czbrd_vp_20170117064500.h5',
 'cz/brd/2017/01/17/11/czbrd_vp_20170117113000.h5',
 'cz/brd/2017/01/17/11/czbrd_vp_20170117114500.h5',
 'cz/brd/2017/01/17/12/czbrd_vp_20170117124500.h5',
 'cz/brd/2017/02/07/01/czbrd_vp_20170207014500.h5',
 'cz/brd/2017/02/12/05/czbrd_vp_20170212054500.h5',
 'cz/brd/2017/02/12/11/czbrd_vp_20170212111500.h5',
 'cz/brd/2017/02/12/22/czbrd_vp_20170212224500.h5',
 'cz/brd/2017/02/13/06/czbrd_vp_20170213060000.h5',
 'cz/brd/2017/02/13/18/czbrd_vp_20170213184500.h5',
 'cz/brd/2017/04/03/03/czbrd_vp_20170403033000.h5',
 'cz/brd/2017/04/06/11/czbrd_vp_20170406113000.h5',
 'cz/brd/2017/04/10/18/czbrd_vp_20170410181500.h5',
 'cz/brd/2017/04/10/19/czbrd_vp_20170410191500.h5',
 'cz/brd/2017/04/21/06/czbrd_vp_20170421063000.h5',
 'cz/brd/2017/05/03/09/czbrd_vp_20170503090000.h5',
 'cz/brd/2017/05/03/11/czbrd_vp_20170503114500.h5',
 'cz/brd/2017/05/03/14/czbrd_vp_20170503141500.h5',
 'cz/brd/2017/05/03/14/czbrd_vp_20170503144500.h5',
 'cz/brd/2017/05/03/19/czbrd_vp_20170503190000.h5',
 'cz/brd/2017/05/03/20/czbrd_vp_20170503203000.h5',
 'cz/brd/2017/05/03/22/czbrd_vp_20170503220000.h5',
 'cz/brd/2017/05/03/23/czbrd_vp_20170503231500.h5',
 'cz/brd/2017/05/04/00/czbrd_vp_20170504004500.h5',
 'cz/brd/2017/05/04/04/czbrd_vp_20170504041500.h5',
 'cz/brd/2017/05/04/05/czbrd_vp_20170504050000.h5',
 'cz/brd/2017/05/04/06/czbrd_vp_20170504060000.h5',
 'cz/brd/2017/05/04/07/czbrd_vp_20170504071500.h5',
 'cz/brd/2017/05/04/08/czbrd_vp_20170504081500.h5',
 'cz/brd/2017/05/10/07/czbrd_vp_20170510074500.h5',
 'cz/brd/2017/05/10/09/czbrd_vp_20170510090000.h5',
 'cz/brd/2017/05/10/10/czbrd_vp_20170510100000.h5',
 'cz/brd/2017/05/10/10/czbrd_vp_20170510104500.h5',
 'cz/brd/2017/05/10/12/czbrd_vp_20170510121500.h5',
 'cz/brd/2017/05/10/13/czbrd_vp_20170510130000.h5',
 'cz/brd/2017/05/10/16/czbrd_vp_20170510160000.h5',
 'cz/brd/2017/05/10/16/czbrd_vp_20170510161500.h5',
 'cz/brd/2017/05/10/16/czbrd_vp_20170510164500.h5',
 'cz/brd/2017/05/10/19/czbrd_vp_20170510194500.h5',
 'cz/brd/2017/05/10/20/czbrd_vp_20170510201500.h5',
 'cz/brd/2017/05/10/20/czbrd_vp_20170510203000.h5',
 'cz/brd/2017/05/11/09/czbrd_vp_20170511094500.h5',
 'cz/brd/2017/05/16/04/czbrd_vp_20170516041500.h5',
 'cz/brd/2017/05/16/12/czbrd_vp_20170516120000.h5',
 'cz/brd/2017/05/16/13/czbrd_vp_20170516134500.h5',
 'cz/brd/2017/05/19/09/czbrd_vp_20170519091500.h5',
 'cz/brd/2017/05/19/19/czbrd_vp_20170519191500.h5',
 'cz/brd/2017/05/21/19/czbrd_vp_20170521194500.h5',
 'cz/brd/2017/05/23/08/czbrd_vp_20170523081500.h5',
 'cz/brd/2017/05/23/10/czbrd_vp_20170523101500.h5',
 'cz/brd/2017/05/24/09/czbrd_vp_20170524091500.h5',
 'cz/brd/2017/05/24/10/czbrd_vp_20170524104500.h5',
 'cz/brd/2017/05/24/23/czbrd_vp_20170524234500.h5',
 'cz/brd/2017/05/27/02/czbrd_vp_20170527024500.h5',
 'cz/brd/2017/05/30/07/czbrd_vp_20170530071500.h5',
 'cz/brd/2017/05/31/22/czbrd_vp_20170531224500.h5',
 'cz/brd/2017/06/02/05/czbrd_vp_20170602050000.h5',
 'cz/brd/2017/06/03/17/czbrd_vp_20170603174500.h5',
 'cz/brd/2017/06/07/03/czbrd_vp_20170607030000.h5',
 'cz/brd/2017/06/11/08/czbrd_vp_20170611081500.h5',
 'cz/brd/2017/06/11/14/czbrd_vp_20170611140000.h5',
 'cz/brd/2017/06/15/23/czbrd_vp_20170615230000.h5',
 'cz/brd/2017/06/16/04/czbrd_vp_20170616044500.h5',
 'cz/brd/2017/06/21/00/czbrd_vp_20170621003000.h5',
 'cz/brd/2017/06/23/01/czbrd_vp_20170623010000.h5',
 'cz/brd/2017/06/27/06/czbrd_vp_20170627061500.h5',
 'cz/brd/2017/06/27/11/czbrd_vp_20170627113000.h5',
 'cz/brd/2017/06/27/14/czbrd_vp_20170627140000.h5',
 'cz/brd/2017/06/30/10/czbrd_vp_20170630101500.h5',
 'cz/brd/2017/07/03/04/czbrd_vp_20170703041500.h5',
 'cz/brd/2017/07/03/09/czbrd_vp_20170703094500.h5',
 'cz/brd/2017/07/07/19/czbrd_vp_20170707191500.h5',
 'cz/brd/2017/07/08/13/czbrd_vp_20170708130000.h5',
 'cz/brd/2017/07/13/18/czbrd_vp_20170713180000.h5',
 'cz/brd/2017/07/15/17/czbrd_vp_20170715174500.h5',
 'cz/brd/2017/07/18/00/czbrd_vp_20170718004500.h5',
 'cz/brd/2017/07/19/19/czbrd_vp_20170719190000.h5',
 'cz/brd/2017/07/20/02/czbrd_vp_20170720021500.h5',
 'cz/brd/2017/07/22/07/czbrd_vp_20170722073000.h5',
 'cz/brd/2017/07/22/10/czbrd_vp_20170722101500.h5',
 'cz/brd/2017/07/22/14/czbrd_vp_20170722143000.h5',
 'cz/brd/2017/07/28/02/czbrd_vp_20170728024500.h5',
 'cz/brd/2017/08/04/03/czbrd_vp_20170804030000.h5',
 'cz/brd/2017/08/06/07/czbrd_vp_20170806073000.h5',
 'cz/brd/2017/08/07/19/czbrd_vp_20170807190000.h5',
 'cz/brd/2017/08/08/05/czbrd_vp_20170808054500.h5',
 'cz/brd/2017/08/10/08/czbrd_vp_20170810084500.h5',
 'cz/brd/2017/08/10/16/czbrd_vp_20170810163000.h5',
 'cz/brd/2017/08/10/20/czbrd_vp_20170810200000.h5',
 'cz/brd/2017/08/19/22/czbrd_vp_20170819223000.h5',
 'cz/brd/2017/08/19/22/czbrd_vp_20170819224500.h5',
 'cz/brd/2017/08/19/23/czbrd_vp_20170819231500.h5',
 'cz/brd/2017/08/19/23/czbrd_vp_20170819233000.h5',
 'cz/brd/2017/08/19/23/czbrd_vp_20170819234500.h5',
 'cz/brd/2017/08/20/22/czbrd_vp_20170820223000.h5',
 'cz/brd/2017/08/20/22/czbrd_vp_20170820224500.h5',
 'cz/brd/2017/08/20/23/czbrd_vp_20170820230000.h5',
 'cz/brd/2017/08/20/23/czbrd_vp_20170820231500.h5',
 'cz/brd/2017/08/20/23/czbrd_vp_20170820233000.h5',
 'cz/brd/2017/08/20/23/czbrd_vp_20170820234500.h5',
 'cz/brd/2017/08/21/00/czbrd_vp_20170821000000.h5',
 'cz/brd/2017/08/21/00/czbrd_vp_20170821001500.h5',
 'cz/brd/2017/08/21/00/czbrd_vp_20170821003000.h5',
 'cz/brd/2017/08/21/00/czbrd_vp_20170821004500.h5',
 'cz/brd/2017/08/21/01/czbrd_vp_20170821010000.h5',
 'cz/brd/2017/08/21/01/czbrd_vp_20170821011500.h5',
 'cz/brd/2017/08/21/01/czbrd_vp_20170821013000.h5',
 'cz/brd/2017/08/21/01/czbrd_vp_20170821014500.h5',
 'cz/brd/2017/08/21/02/czbrd_vp_20170821020000.h5',
 'cz/brd/2017/08/21/02/czbrd_vp_20170821021500.h5',
 'cz/brd/2017/08/21/02/czbrd_vp_20170821023000.h5',
 'cz/brd/2017/08/21/02/czbrd_vp_20170821024500.h5',
 'cz/brd/2017/08/21/03/czbrd_vp_20170821030000.h5',
 'cz/brd/2017/08/21/03/czbrd_vp_20170821031500.h5',
 'cz/brd/2017/08/21/03/czbrd_vp_20170821033000.h5',
 'cz/brd/2017/08/21/03/czbrd_vp_20170821034500.h5',
 'cz/brd/2017/08/21/04/czbrd_vp_20170821040000.h5',
 'cz/brd/2017/08/21/04/czbrd_vp_20170821041500.h5',
 'cz/brd/2017/08/21/04/czbrd_vp_20170821043000.h5',
 'cz/brd/2017/08/21/04/czbrd_vp_20170821044500.h5',
 'cz/brd/2017/08/21/05/czbrd_vp_20170821050000.h5',
 'cz/brd/2017/08/21/05/czbrd_vp_20170821051500.h5',
 'cz/brd/2017/08/21/05/czbrd_vp_20170821053000.h5',
 'cz/brd/2017/08/21/05/czbrd_vp_20170821054500.h5',
 'cz/brd/2017/08/21/06/czbrd_vp_20170821060000.h5',
 'cz/brd/2017/08/21/06/czbrd_vp_20170821061500.h5',
 'cz/brd/2017/08/21/06/czbrd_vp_20170821063000.h5',
 'cz/brd/2017/08/21/06/czbrd_vp_20170821064500.h5',
 'cz/brd/2017/08/21/07/czbrd_vp_20170821070000.h5',
 'cz/brd/2017/08/21/07/czbrd_vp_20170821071500.h5',
 'cz/brd/2017/08/21/07/czbrd_vp_20170821073000.h5',
 'cz/brd/2017/08/21/07/czbrd_vp_20170821074500.h5',
 'cz/brd/2017/08/21/08/czbrd_vp_20170821080000.h5',
 'cz/brd/2017/08/21/08/czbrd_vp_20170821081500.h5',
 'cz/brd/2017/08/21/08/czbrd_vp_20170821083000.h5',
 'cz/brd/2017/08/21/08/czbrd_vp_20170821084500.h5',
 'cz/brd/2017/08/21/09/czbrd_vp_20170821090000.h5',
 'cz/brd/2017/08/21/09/czbrd_vp_20170821091500.h5',
 'cz/brd/2017/08/21/09/czbrd_vp_20170821093000.h5',
 'cz/brd/2017/08/21/09/czbrd_vp_20170821094500.h5']

In [ ]:


In [ ]:


In [ ]:

For real


In [3]:
s3handle = dm.S3EnramHandler(bucket_name="lw-enram", profile_name="lw-enram")

In [4]:
async def download_h5(file):  
    await s3handle.download_file(file.key)
    try:
        h5py.File(file.key, mode="r")
    except:
        print(file.key)
        return file.key
    await os.remove(file.key)

In [9]:
async def download_h5(file):  
    await s3handle.download_file(file)
    try:
        h5py.File(file, mode="r")
        print(file, 'readable')
    except:
        print(file)
        return file
    await os.remove(file)

In [10]:
test_list = [
    "be/jab/2016/11/20/23/bejab_vp_20161120233000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120233500.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120234000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120234500.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120235000.h5",
    "be/jab/2016/11/20/23/bejab_vp_20161120235500.h5"]

In [11]:
#h5_files = [download_h5(file) for file in s3handle.bucket.objects.all()]
h5_files = [download_h5(file) for file in test_list]

In [12]:
loop = asyncio.get_event_loop()  
loop.run_until_complete(asyncio.wait(h5_files))  
loop.close()


---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-12-eed822ddfe85> in <module>()
      1 loop = asyncio.get_event_loop()
----> 2 loop.run_until_complete(asyncio.wait(h5_files))
      3 loop.close()

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
    441         Return the Future's result, or raise its exception.
    442         """
--> 443         self._check_closed()
    444 
    445         new_task = not futures.isfuture(future)

/home/stijn_vanhoey/miniconda2/envs/inbobase/lib/python3.5/asyncio/base_events.py in _check_closed(self)
    355     def _check_closed(self):
    356         if self._closed:
--> 357             raise RuntimeError('Event loop is closed')
    358 
    359     def _asyncgen_finalizer_hook(self, agen):

RuntimeError: Event loop is closed

In [13]:
h5_files


Out[13]:
[<coroutine object download_h5 at 0x7fbca6fc4fc0>,
 <coroutine object download_h5 at 0x7fbca6f6b4c0>,
 <coroutine object download_h5 at 0x7fbca6f6b5c8>,
 <coroutine object download_h5 at 0x7fbca6f6b3b8>,
 <coroutine object download_h5 at 0x7fbca6f6b570>,
 <coroutine object download_h5 at 0x7fbca6f6ba98>]

In [ ]: