How to use ERA5 in zarr format

Zarr is a new storage format which, thanks to its simple yet well-designed specification, makes large datasets easily accessible to distributed computing. In Zarr datasets, the arrays are divided into chunks and compressed. These individual chunks can be stored as files on a filesystem or as objects in a cloud storage bucket. The metadata are stored in lightweight .json files. Zarr works well on both local filesystems and cloud-based object stores. Existing datasets can easily be converted to zarr via xarray’s zarr functions.

In this example we show how to use zarr format from S3 shared by Planet OS. When data is read in, we show some easy operations with the data.


In [1]:
%matplotlib notebook
import xarray as xr
import datetime
import numpy as np
from dask.distributed import LocalCluster, Client
import s3fs
import cartopy.crs as ccrs
import boto3

First we look into the era5-pds bucket zarr folder to find out what variables are available. Assuming that all the variables are available for all the years, we look into a random year-month data.


In [2]:
bucket = 'era5-pds'
#Make sure you provide / in the end
prefix = 'zarr/2008/01/data/'  

client = boto3.client('s3')
result = client.list_objects(Bucket=bucket, Prefix=prefix, Delimiter='/')
for o in result.get('CommonPrefixes'):
    print (o.get('Prefix'))


zarr/2008/01/data/air_pressure_at_mean_sea_level.zarr/
zarr/2008/01/data/air_temperature_at_2_metres.zarr/
zarr/2008/01/data/air_temperature_at_2_metres_1hour_Maximum.zarr/
zarr/2008/01/data/air_temperature_at_2_metres_1hour_Minimum.zarr/
zarr/2008/01/data/dew_point_temperature_at_2_metres.zarr/
zarr/2008/01/data/eastward_wind_at_100_metres.zarr/
zarr/2008/01/data/eastward_wind_at_10_metres.zarr/
zarr/2008/01/data/integral_wrt_time_of_surface_direct_downwelling_shortwave_flux_in_air_1hour_Accumulation.zarr/
zarr/2008/01/data/lwe_thickness_of_surface_snow_amount.zarr/
zarr/2008/01/data/northward_wind_at_100_metres.zarr/
zarr/2008/01/data/northward_wind_at_10_metres.zarr/
zarr/2008/01/data/precipitation_amount_1hour_Accumulation.zarr/
zarr/2008/01/data/sea_surface_temperature.zarr/
zarr/2008/01/data/snow_density.zarr/
zarr/2008/01/data/surface_air_pressure.zarr/

In [3]:
client = Client()
client


Out[3]:

Client

Cluster

  • Workers: 4
  • Cores: 16
  • Memory: 17.18 GB

In [4]:
fs = s3fs.S3FileSystem(anon=False)

Here we define some functions to read in zarr data.


In [5]:
def inc_mon(indate):
    if indate.month < 12:
        return datetime.datetime(indate.year, indate.month+1, 1)
    else:
        return datetime.datetime(indate.year+1, 1, 1)

def gen_d_range(start, end):
    rr = []
    while start <= end:
        rr.append(start)
        start = inc_mon(start)
    return rr

def get_z(dtime,var):
    f_zarr = 'era5-pds/zarr/{year}/{month:02d}/data/{var}.zarr/'.format(year=dtime.year, month=dtime.month,var=var)
    return xr.open_zarr(s3fs.S3Map(f_zarr, s3=fs))

def gen_zarr_range(start, end,var):
    return [get_z(tt,var) for tt in gen_d_range(start, end)]

This is where we read in the data. We need to define the time range and variable name. In this example, we also choose to select only the area over Australia.


In [6]:
%%time
tmp_a = gen_zarr_range(datetime.datetime(1979,1,1), datetime.datetime(2020,3,31),'air_temperature_at_2_metres')
tmp_all = xr.concat(tmp_a, dim='time0')
tmp = tmp_all.air_temperature_at_2_metres.sel(lon=slice(110,160),lat=slice(-10,-45)) - 272.15


distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<timed exec> in <module>

<ipython-input-5-8ec6b59fddb4> in gen_zarr_range(start, end, var)
     17 
     18 def gen_zarr_range(start, end,var):
---> 19     return [get_z(tt,var) for tt in gen_d_range(start, end)]

<ipython-input-5-8ec6b59fddb4> in <listcomp>(.0)
     17 
     18 def gen_zarr_range(start, end,var):
---> 19     return [get_z(tt,var) for tt in gen_d_range(start, end)]

<ipython-input-5-8ec6b59fddb4> in get_z(dtime, var)
     14 def get_z(dtime,var):
     15     f_zarr = 'era5-pds/zarr/{year}/{month:02d}/data/{var}.zarr/'.format(year=dtime.year, month=dtime.month,var=var)
---> 16     return xr.open_zarr(s3fs.S3Map(f_zarr, s3=fs))
     17 
     18 def gen_zarr_range(start, end,var):

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/xarray/backends/zarr.py in open_zarr(store, group, synchronizer, chunks, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, consolidated, overwrite_encoded_chunks, **kwargs)
    596         consolidated=consolidated,
    597     )
--> 598     ds = maybe_decode_store(zarr_store)
    599 
    600     # auto chunking needs to be here and not in ZarrStore because variable

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/xarray/backends/zarr.py in maybe_decode_store(store, lock)
    573 
    574     def maybe_decode_store(store, lock=False):
--> 575         ds = conventions.decode_cf(
    576             store,
    577             mask_and_scale=mask_and_scale,

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/xarray/conventions.py in decode_cf(obj, concat_characters, mask_and_scale, decode_times, decode_coords, drop_variables, use_cftime)
    568         encoding = obj.encoding
    569     elif isinstance(obj, AbstractDataStore):
--> 570         vars, attrs = obj.load()
    571         extra_coords = set()
    572         file_obj = obj

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/xarray/backends/common.py in load(self)
    121         """
    122         variables = FrozenDict(
--> 123             (_decode_variable_name(k), v) for k, v in self.get_variables().items()
    124         )
    125         attributes = FrozenDict(self.get_attrs())

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/xarray/backends/zarr.py in get_variables(self)
    286 
    287     def get_variables(self):
--> 288         return FrozenDict(
    289             (k, self.open_store_variable(k, v)) for k, v in self.ds.arrays()
    290         )

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/xarray/core/utils.py in FrozenDict(*args, **kwargs)
    400 
    401 def FrozenDict(*args, **kwargs) -> Frozen:
--> 402     return Frozen(dict(*args, **kwargs))
    403 
    404 

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/xarray/backends/zarr.py in <genexpr>(.0)
    286 
    287     def get_variables(self):
--> 288         return FrozenDict(
    289             (k, self.open_store_variable(k, v)) for k, v in self.ds.arrays()
    290         )

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/zarr/hierarchy.py in _array_iter(self, keys_only, method, recurse)
    480             path = self._key_prefix + key
    481             if contains_array(self._store, path):
--> 482                 yield key if keys_only else (key, self[key])
    483             elif recurse and contains_group(self._store, path):
    484                 group = self[key]

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/zarr/hierarchy.py in __getitem__(self, item)
    336         path = self._item_path(item)
    337         if contains_array(self._store, path):
--> 338             return Array(self._store, read_only=self._read_only, path=path,
    339                          chunk_store=self._chunk_store,
    340                          synchronizer=self._synchronizer, cache_attrs=self.attrs.cache)

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/zarr/core.py in __init__(self, store, path, read_only, chunk_store, synchronizer, cache_metadata, cache_attrs)
    122 
    123         # initialize metadata
--> 124         self._load_metadata()
    125 
    126         # initialize attributes

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/zarr/core.py in _load_metadata(self)
    139         """(Re)load metadata from store."""
    140         if self._synchronizer is None:
--> 141             self._load_metadata_nosync()
    142         else:
    143             mkey = self._key_prefix + array_meta_key

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/zarr/core.py in _load_metadata_nosync(self)
    148         try:
    149             mkey = self._key_prefix + array_meta_key
--> 150             meta_bytes = self._store[mkey]
    151         except KeyError:
    152             err_array_not_found(self._path)

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/fsspec/mapping.py in __getitem__(self, key, default)
     74         k = self._key_to_str(key)
     75         try:
---> 76             result = self.fs.cat(k)
     77         except FileNotFoundError:
     78             if default is not None:

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/fsspec/spec.py in cat(self, path)
    585     def cat(self, path):
    586         """ Get the content of a file """
--> 587         return self.open(path, "rb").read()
    588 
    589     def get(self, rpath, lpath, recursive=False, **kwargs):

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/fsspec/spec.py in read(self, length)
   1229             # don't even bother calling fetch
   1230             return b""
-> 1231         out = self.cache._fetch(self.loc, self.loc + length)
   1232         self.loc += len(out)
   1233         return out

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/fsspec/caching.py in _fetch(self, start, end)
    337         ):
    338             # First read, or extending both before and after
--> 339             self.cache = self.fetcher(start, bend)
    340             self.start = start
    341         elif start < self.start:

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/s3fs/core.py in _fetch_range(self, start, end)
   1198 
   1199     def _fetch_range(self, start, end):
-> 1200         return _fetch_range(self.fs.s3, self.bucket, self.key, self.version_id, start, end, req_kw=self.req_kw)
   1201 
   1202     def _upload_chunk(self, final=False):

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/s3fs/core.py in _fetch_range(client, bucket, key, version_id, start, end, max_attempts, req_kw)
   1335                                      **version_id_kw(version_id),
   1336                                      **req_kw)
-> 1337             return resp['Body'].read()
   1338         except S3_RETRYABLE_ERRORS as e:
   1339             logger.debug('Exception %r on S3 download, retrying', e,

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/botocore/response.py in read(self, amt)
     76         """
     77         try:
---> 78             chunk = self._raw_stream.read(amt)
     79         except URLLib3ReadTimeoutError as e:
     80             # TODO: the url will be None as urllib3 isn't setting it yet

~/opt/anaconda3/envs/apiclient/lib/python3.8/site-packages/urllib3/response.py in read(self, amt, decode_content, cache_content)
    501             if amt is None:
    502                 # cStringIO doesn't like amt=None
--> 503                 data = self._fp.read() if not fp_closed else b""
    504                 flush_decoder = True
    505             else:

~/opt/anaconda3/envs/apiclient/lib/python3.8/http/client.py in read(self, amt)
    465             else:
    466                 try:
--> 467                     s = self._safe_read(self.length)
    468                 except IncompleteRead:
    469                     self._close_conn()

~/opt/anaconda3/envs/apiclient/lib/python3.8/http/client.py in _safe_read(self, amt)
    606         IncompleteRead exception can be used to detect the problem.
    607         """
--> 608         data = self.fp.read(amt)
    609         if len(data) < amt:
    610             raise IncompleteRead(data, amt-len(data))

~/opt/anaconda3/envs/apiclient/lib/python3.8/socket.py in readinto(self, b)
    667         while True:
    668             try:
--> 669                 return self._sock.recv_into(b)
    670             except timeout:
    671                 self._timeout_occurred = True

~/opt/anaconda3/envs/apiclient/lib/python3.8/ssl.py in recv_into(self, buffer, nbytes, flags)
   1239                   "non-zero flags not allowed in calls to recv_into() on %s" %
   1240                   self.__class__)
-> 1241             return self.read(nbytes, buffer)
   1242         else:
   1243             return super().recv_into(buffer, nbytes, flags)

~/opt/anaconda3/envs/apiclient/lib/python3.8/ssl.py in read(self, len, buffer)
   1097         try:
   1098             if buffer is not None:
-> 1099                 return self._sslobj.read(len, buffer)
   1100             else:
   1101                 return self._sslobj.read(len)

KeyboardInterrupt: 

Here we read in an other variable. This time only for a month as we want to use it only for masking.


In [ ]:
sea_data = gen_zarr_range(datetime.datetime(2018,1,1), datetime.datetime(2018,1,1),'sea_surface_temperature')
sea_data_all = xr.concat(sea_data, dim='time0').sea_surface_temperature.sel(lon=slice(110,160),lat=slice(-10,-45))

We decided to use sea surface temperature data for making a sea-land mask.


In [ ]:
sea_data_all0 = sea_data_all[0].values
mask = np.isnan(sea_data_all0)

Mask out the data over the sea. To find out average temepratures over the land, it is important to mask out data over the ocean.


In [ ]:
tmp_masked = tmp.where(mask)
tmp_mean = tmp_masked.mean('time0').compute()

Now we plot the all time (1980-2019) average temperature over Australia. This time we decided to use only xarray plotting tools.


In [ ]:
ax = plt.axes(projection=ccrs.Orthographic(130, -20))
tmp_mean.plot.contourf(ax=ax, transform=ccrs.PlateCarree())
ax.set_global()
ax.coastlines();
plt.draw()

Now we are finding out yearly average temperature over the Australia land area.


In [ ]:
yearly_tmp_AU = tmp_masked.groupby('time0.year').mean('time0').mean(dim=['lon','lat'])

In [ ]:
f, ax = plt.subplots(1, 1)
yearly_tmp_AU.plot.line();
plt.draw()

In conclusion, this was the easy example on how to use zarr data. We were reading in 39 years of global data with 1 hour temporal coverage. Proceeded some operations on this data like selecting out only needed area and computing averages. Zarr makes large amount of data processing much faster than it used to be.


In [ ]: