From 62f48f06bfb6909def22191225e83b06127169b1 Mon Sep 17 00:00:00 2001 From: David Hassell Date: Mon, 21 Mar 2022 22:32:11 +0000 Subject: [PATCH 1/3] dev --- cf/constants.py | 15 ++++++---- cf/data/data.py | 11 ++++--- cf/functions.py | 80 +++++++++++++++++++++++++++++-------------------- 3 files changed, 64 insertions(+), 42 deletions(-) diff --git a/cf/constants.py b/cf/constants.py index 07fc689ee1..3868c023b7 100644 --- a/cf/constants.py +++ b/cf/constants.py @@ -3,6 +3,8 @@ from enum import Enum, auto from tempfile import gettempdir +from dask import config +from dask.utils import parse_bytes from numpy.ma import masked as numpy_ma_masked from psutil import virtual_memory @@ -120,12 +122,13 @@ else: CONSTANTS["MIN_TOTAL_MEMORY"] = CONSTANTS["TOTAL_MEMORY"] -CONSTANTS["CHUNKSIZE"] = ( - CONSTANTS["FREE_MEMORY_FACTOR"] * CONSTANTS["MIN_TOTAL_MEMORY"] -) / ( - mpi_size * CONSTANTS["WORKSPACE_FACTOR_1"] - + CONSTANTS["WORKSPACE_FACTOR_2"] -) +# CONSTANTS["CHUNKSIZE"] = ( +# CONSTANTS["FREE_MEMORY_FACTOR"] * CONSTANTS["MIN_TOTAL_MEMORY"] +# ) / ( +# mpi_size * CONSTANTS["WORKSPACE_FACTOR_1"] +# + CONSTANTS["WORKSPACE_FACTOR_2"] +# ) +CONSTANTS["CHUNKSIZE"] = parse_bytes(config.get("array.chunk-size")) masked = numpy_ma_masked # nomask = numpy_ma_nomask diff --git a/cf/data/data.py b/cf/data/data.py index 68523323d5..92ea70cdde 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -13448,7 +13448,7 @@ def var( def section( self, axes, stop=None, chunks=False, min_step=1, mode="dictionary" ): - """Returns a dictionary of sections of the Data object. + """Returns a dictionary of sections of the `Data` object. Specifically, returns a dictionary of Data objects which are the m-dimensional sections of this n-dimensional Data object, where @@ -13479,6 +13479,8 @@ def section( size that will fit in one chunk of memory instead of sectioning into slices of size 1 along the dimensions that are being sectioned. + + Deprecated at version TODODASK. Use `rechunk` instead. min_step: `int`, optional The minimum step size when making chunks. By default this @@ -13498,9 +13500,10 @@ def section( >>> d.section((0, 1)) """ - return _section( - self, axes, data=True, stop=stop, chunks=chunks, min_step=min_step - ) + if chunks: + print ("deprecated TODODASK") + + return _section(self, axes, stop=stop, min_step=min_step) # ---------------------------------------------------------------- # Alias diff --git a/cf/functions.py b/cf/functions.py index 070f30f84e..40df9d766a 100644 --- a/cf/functions.py +++ b/cf/functions.py @@ -11,6 +11,7 @@ import warnings from collections.abc import Iterable from hashlib import md5 as hashlib_md5 +from itertools import product from marshal import dumps as marshal_dumps from math import ceil as math_ceil from numbers import Integral @@ -26,6 +27,8 @@ # import cPickle import netCDF4 +from dask import config +from dask.utils import parse_bytes from numpy import __file__ as _numpy__file__ from numpy import __version__ as _numpy__version__ from numpy import all as _numpy_all @@ -799,27 +802,21 @@ def _parse(cls, arg): class chunksize(ConstantAccess): - r"""Set the chunksize used by LAMA for partitioning the data array. + """Set the default chunksize used by `dask` arrays. - This must be smaller than an upper limit determined by the free - memory factor, which is the fraction of memory kept free as a - temporary workspace, otherwise an error is raised. If called with - None as the argument then the chunksize is set to its upper - limit. If called without any arguments the existing chunksize is + If called without any arguments then the existing chunksize is returned. - The upper limit to the chunksize is given by: - - .. math:: upper\_chunksize = \dfrac{f \cdot total\_memory}{mpi\_size - \cdot w_1 + w_2} + :Parameters: - where :math:`f` is the *free memory factor* and :math:`w_1` and - :math:`w_2` the *workspace factors* *1* and *2* respectively. + arg: `float` or `str` or `Constant`, optional + The chunksize in bytes. Any size accepted by + `dask.utils.parse_bytes` is accepted. - :Parameters: + *Parameter example:* + A chunksize of 2 MiB may be specified as ``2097152`` or + ``'2 MiB'`` - arg: `float` or `Constant`, optional - The chunksize in bytes. :Returns: @@ -850,20 +847,8 @@ def _parse(cls, arg): into the `CONSTANTS` dictionary. """ - upper_chunksize = (free_memory_factor() * min_total_memory()) / ( - (mpi_size * _WORKSPACE_FACTOR_1()) + _WORKSPACE_FACTOR_2() - ) - - arg = float(arg) - if arg > upper_chunksize and mpi_size > 1: - raise ValueError( - f"Specified chunk size ({arg}) is too large for the given " - f"free memory factor ({upper_chunksize})" - ) - elif arg <= 0: - raise ValueError(f"Chunk size ({arg}) must be positive") - - return arg + config.set({"array.chunk-size": arg}) + return parse_bytes(arg) class tempdir(ConstantAccess): @@ -2838,9 +2823,7 @@ def allclose(x, y, rtol=None, atol=None): return _numpy_allclose(x, y, rtol=rtol, atol=atol) -def _section( - x, axes=None, data=False, stop=None, chunks=False, min_step=1, **kwargs -): +def _section(x, axes=None, stop=None, chunks=False, min_step=1): """Return a list of m dimensional sections of a Field of n dimensions or a dictionary of m dimensional sections of a Data object of n dimensions, where m <= n. @@ -2913,6 +2896,39 @@ def _section( >>> _section(f, ['latitude', 'longitude'], exact=True) """ + if axes is None: + axes = list(range(x.ndim)) + + axes = x.data._parse_axes(axes) + + ndim = x.ndim + shape = x.shape + if stop is None: + stop = x.size + + axes = [i for i in range(ndim) if i not in axes] + + indices = [[slice(None)]] * ndim + for i in axes: + indices[i] = [ + slice(j, j + min_step) for j in range(0, shape[i], min_step) + ] + + keys = [[None]] * ndim + for i in axes: + keys[i] = range(0, shape[i], min_step) + + if chunks: + + + out = {} + for j, (key, index) in enumerate(zip(product(*keys), product(*indices))): + if j >= stop: + return + + out[key] = x[index] + + return out def loop_over_index(x, current_index, axis_indices, indices): """Expects an index to loop over in the list indices. From 161acf470103ce11155b5e5d44e691c2744c7d78 Mon Sep 17 00:00:00 2001 From: David Hassell Date: Tue, 22 Mar 2022 11:28:04 +0000 Subject: [PATCH 2/3] section, chunksize --- cf/constants.py | 31 ++---- cf/data/data.py | 184 +++++++++----------------------- cf/data/mixin/deprecations.py | 127 ++++++++++++++++++++-- cf/docstring/docstring.py | 5 +- cf/field.py | 25 ++++- cf/functions.py | 191 +++++++++------------------------- cf/test/test_Data.py | 34 ++++-- 7 files changed, 279 insertions(+), 318 deletions(-) diff --git a/cf/constants.py b/cf/constants.py index 3868c023b7..46445a6891 100644 --- a/cf/constants.py +++ b/cf/constants.py @@ -8,11 +8,6 @@ from numpy.ma import masked as numpy_ma_masked from psutil import virtual_memory -from . import mpi_on, mpi_size - -if mpi_on: - from . import mpi_comm - from .units import Units # platform = sys.platform @@ -23,6 +18,10 @@ # Find the total amount of memory, in bytes # -------------------------------------------------------------------- _TOTAL_MEMORY = float(virtual_memory().total) + +_CHUNKSIZE = "128 MiB" +config.set({"array.chunk-size": _CHUNKSIZE}) + # if platform == 'darwin': # # MacOS # _MemTotal = float(virtual_memory().total) @@ -55,7 +54,7 @@ TOTAL_MEMORY: `float` Find the total amount of physical memory (in bytes). - CHUNKSIZE: `float` + CHUNKSIZE: `int` The chunk size (in bytes) for data storage and processing. FM_THRESHOLD: `float` @@ -80,9 +79,7 @@ disabled. FREE_MEMORY_FACTOR: `int` - Factor to divide the free memory by. If MPI is on this is equal - to the number of PEs. Otherwise it is equal to 1 and is ignored - in any case. + Factor to divide the free memory by. COLLAPSE_PARALLEL_MODE: `int` The mode to use when parallelising collapse. By default this is @@ -109,26 +106,14 @@ "RELAXED_IDENTITIES": False, "LOG_LEVEL": logging.getLevelName(logging.getLogger().level), "BOUNDS_COMBINATION_MODE": "AND", + "CHUNKSIZE": parse_bytes(_CHUNKSIZE), } CONSTANTS["FM_THRESHOLD"] = ( CONSTANTS["FREE_MEMORY_FACTOR"] * CONSTANTS["TOTAL_MEMORY"] ) -if mpi_on: - CONSTANTS["MIN_TOTAL_MEMORY"] = min( - mpi_comm.allgather(CONSTANTS["TOTAL_MEMORY"]) - ) -else: - CONSTANTS["MIN_TOTAL_MEMORY"] = CONSTANTS["TOTAL_MEMORY"] - -# CONSTANTS["CHUNKSIZE"] = ( -# CONSTANTS["FREE_MEMORY_FACTOR"] * CONSTANTS["MIN_TOTAL_MEMORY"] -# ) / ( -# mpi_size * CONSTANTS["WORKSPACE_FACTOR_1"] -# + CONSTANTS["WORKSPACE_FACTOR_2"] -# ) -CONSTANTS["CHUNKSIZE"] = parse_bytes(config.get("array.chunk-size")) +CONSTANTS["MIN_TOTAL_MEMORY"] = CONSTANTS["TOTAL_MEMORY"] masked = numpy_ma_masked # nomask = numpy_ma_nomask diff --git a/cf/data/data.py b/cf/data/data.py index 92ea70cdde..268f09b8a4 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -30,7 +30,6 @@ ) from ..functions import ( _DEPRECATION_ERROR_KWARGS, - _DEPRECATION_ERROR_METHOD, _numpy_isclose, _section, abspath, @@ -2293,10 +2292,11 @@ def percentile( if interpolation is not None: _DEPRECATION_ERROR_KWARGS( self, - "interpolation", + "percentile", {"interpolation": None}, message="Use the 'method' parameter instead.", - version="4.0.0", + version="TODODASK", + removed_at="5.0.0", ) # pragma: no cover d = _inplace_enabled_define_and_cleanup(self) @@ -3347,15 +3347,12 @@ def rechunk( `dask.array.rechunk` for details. block_size_limit: `int`, optional - The maximum block size (in bytes) we want to produce. - Defaults to the configuration value - ``dask.config.get('array.chunk-size')``. See - `dask.array.rechunk` for details. + The maximum block size (in bytes) we want to produce, + as defined by the `cf.chunksize` function. balance: `bool`, optional If True, try to make each chunk the same size. By - default this is not attempted. See - `dask.array.rechunk` for details. + default this is not attempted. This means ``balance=True`` will remove any small leftover chunks, so using ``d.rechunk(chunks=len(d) // @@ -3387,15 +3384,15 @@ def rechunk( >>> y = x.rechunk({0: -1, 1: 'auto'}, block_size_limit=1e8) - If a chunk size does not divide the dimension then rechunk will - leave any unevenness to the last chunk. + If a chunk size does not divide the dimension then rechunk + will leave any unevenness to the last chunk. >>> x.rechunk(chunks=(400, -1)).chunks ((400, 400, 200), (1000,)) - However if you want more balanced chunks, and don't mind Dask - choosing a different chunksize for you then you can use the - ``balance=True`` option. + However if you want more balanced chunks, and don't mind + `dask` choosing a different chunksize for you then you can use + the ``balance=True`` option. >>> x.rechunk(chunks=(400, -1), balance=True).chunks ((500, 500), (1000,)) @@ -7592,7 +7589,6 @@ def reconstruct_sectioned_data(cls, sections, cyclic=(), hardmask=None): out.hardmask = hardmask return out - # --- End: if if keys[0][i] is not None: new_sections = {} @@ -7607,11 +7603,9 @@ def reconstruct_sectioned_data(cls, sections, cyclic=(), hardmask=None): ) new_key = k[:i] data_list = [sections[k]] - # --- End: for new_sections[new_key] = cls.concatenate_data(data_list, i) sections = new_sections - # --- End: for def argmax(self, axis=None, unravel=False): """Return the indices of the maximum values along an axis. @@ -11060,86 +11054,6 @@ def flip(self, axes=None, inplace=False, i=False): return d - @daskified(_DASKIFIED_VERBOSE) - def HDF_chunks(self, *chunks): - """Get or set HDF chunk sizes. - - The HDF chunk sizes may be used by external code that allows - `Data` objects to be written to netCDF files. - - Deprecated at version TODODASK and is no longer available. Use - the methods `nc_clear_hdf5_chunksizes`, `nc_hdf5_chunksizes`, - and `nc_set_hdf5_chunksizes` instead. - - .. seealso:: `nc_clear_hdf5_chunksizes`, `nc_hdf5_chunksizes`, - `nc_set_hdf5_chunksizes` - - :Parameters: - - chunks: `dict` or `None`, *optional* - Specify HDF chunk sizes. - - When no positional argument is provided, the HDF chunk - sizes are unchanged. - - If `None` then the HDF chunks sizes for each dimension - are cleared, so that the HDF default chunk size value - will be used when writing data to disk. - - If a `dict` then it defines for a subset of the - dimensions, defined by their integer positions, the - corresponding HDF chunk sizes. The HDF chunk sizes are - set as a number of elements along the dimension. - - :Returns: - - `dict` - The HDF chunks for each dimension prior to the change, - or the current HDF chunks if no new values are - specified. A value of `None` is an indication that the - default chunk size should be used for that dimension. - - **Examples** - - >>> d = cf.Data(np.arange(30).reshape(5, 6)) - >>> d.HDF_chunks() - {0: None, 1: None} - >>> d.HDF_chunks({1: 2}) - {0: None, 1: None} - >>> d.HDF_chunks() - {0: None, 1: 2} - >>> d.HDF_chunks({1:None}) - {0: None, 1: 2} - >>> d.HDF_chunks() - {0: None, 1: None} - >>> d.HDF_chunks({0: 3, 1: 6}) - {0: None, 1: None} - >>> d.HDF_chunks() - {0: 3, 1: 6} - >>> d.HDF_chunks({1: 4}) - {0: 3, 1: 6} - >>> d.HDF_chunks() - {0: 3, 1: 4} - >>> d.HDF_chunks({1: 999}) - {0: 3, 1: 4} - >>> d.HDF_chunks() - {0: 3, 1: 999} - >>> d.HDF_chunks(None) - {0: 3, 1: 999} - >>> d.HDF_chunks() - {0: None, 1: None} - - """ - _DEPRECATION_ERROR_METHOD( - self, - "HDF_chunks", - message="Use the methods 'nc_clear_hdf5_chunksizes', " - "'nc_hdf5_chunksizes', and 'nc_set_hdf5_chunksizes' " - "instead.", - version="TODODASK", - removed_at="5.0.0", - ) - def inspect(self): """Inspect the object for debugging. @@ -11681,35 +11595,6 @@ def fits_in_memory(self, itemsize): # ------------------------------------------------------------ return self.size * (itemsize + 1) <= free_memory() - cf_fm_threshold() - def fits_in_one_chunk_in_memory(self, itemsize): - """Return True if the master array is small enough to be - retained in memory. - - :Parameters: - - itemsize: `int` - The number of bytes per word of the master data array. - - :Returns: - - `bool` - - **Examples:** - - >>> print(d.fits_one_chunk_in_memory(8)) - False - - """ - # ------------------------------------------------------------ - # Note that self._size*(itemsize+1) is the array size in bytes - # including space for a full boolean mask - # ------------------------------------------------------------ - return ( - cf_chunksize() - >= self._size * (itemsize + 1) - <= free_memory() - cf_fm_threshold() - ) - @_deprecated_kwarg_check("i") @_inplace_enabled(default=False) @_manage_log_level_via_verbosity @@ -13445,6 +13330,7 @@ def var( _preserve_partitions=_preserve_partitions, ) + @daskified(_DASKIFIED_VERBOSE) def section( self, axes, stop=None, chunks=False, min_step=1, mode="dictionary" ): @@ -13471,15 +13357,20 @@ def section( sectioned. stop: `int`, optional + Deprecated at version TODODASK. + Stop after this number of sections and return. If stop is None all sections are taken. chunks: `bool`, optional + Depreated at version TODODASK. Consider using + `cf.Data.rechunk` instead. + If True return sections that are of the maximum possible size that will fit in one chunk of memory instead of sectioning into slices of size 1 along the dimensions that are being sectioned. - + Deprecated at version TODODASK. Use `rechunk` instead. min_step: `int`, optional @@ -13493,17 +13384,42 @@ def section( The dictionary of m dimensional sections of the Data object. - **Examples:** - - Section a Data object into 2D slices: + **Examples** - >>> d.section((0, 1)) + >>> d = cf.Data(np.arange(120).reshape(2, 6, 10)) + >>> d + + >>> d.section([1, 2]) + {(0, None, None): , + (1, None, None): } + >>> d.section([0, 1], min_step=2) + {(None, None, 0): , + (None, None, 2): , + (None, None, 4): , + (None, None, 6): , + (None, None, 8): } """ if chunks: - print ("deprecated TODODASK") - - return _section(self, axes, stop=stop, min_step=min_step) + _DEPRECATION_ERROR_KWARGS( + self, + "section", + {"chunks": chunks}, + message="Consider using Data.rechunk() instead.", + version="TODODASK", + removed_at="5.0.0", + ) # pragma: no cover + + if stop is not None: + _DEPRECATION_ERROR_KWARGS( + self, + "section", + {"stop": stop}, + version="TODODASK", + removed_at="5.0.0", + ) # pragma: no cover + + return _section(self, axes, min_step=min_step) # ---------------------------------------------------------------- # Alias diff --git a/cf/data/mixin/deprecations.py b/cf/data/mixin/deprecations.py index 205e016333..62f9c15ff8 100644 --- a/cf/data/mixin/deprecations.py +++ b/cf/data/mixin/deprecations.py @@ -46,6 +46,34 @@ def expand_dims(self, position=0, i=False): version="3.0.0", ) # pragma: no cover + def fits_in_one_chunk_in_memory(self, itemsize): + """Return True if the master array is small enough to be + retained in memory. + + Deprecated at version TODODASK. + + :Parameters: + + itemsize: `int` + The number of bytes per word of the master data array. + + :Returns: + + `bool` + + **Examples** + + >>> print(d.fits_one_chunk_in_memory(8)) + False + + """ + _DEPRECATION_ERROR_METHOD( + self, + "fits_in_one_chunk_in_memory", + version="TODODASK", + removed_at="5.0.0", + ) # pragma: no cover + @property def ispartitioned(self): """True if the data array is partitioned. @@ -63,7 +91,7 @@ def ispartitioned(self): False """ - _DEPRECATION_ERROR_METHOD("TODODASK") + _DEPRECATION_ERROR_METHOD("TODODASK") # pragma: no cover def chunk(self, chunksize=None, total=None, omit_axes=None, pmshape=None): """Partition the data array. @@ -92,7 +120,88 @@ def chunk(self, chunksize=None, total=None, omit_axes=None, pmshape=None): >>> d.chunk(100000, omit_axes=[3, 4]) """ - _DEPRECATION_ERROR_METHOD("TODODASK. Use 'rechunk' instead") + _DEPRECATION_ERROR_METHOD( + "TODODASK. Use 'rechunk' instead" + ) # pragma: no cover + + def HDF_chunks(self, *chunks): + """Get or set HDF chunk sizes. + + The HDF chunk sizes may be used by external code that allows + `Data` objects to be written to netCDF files. + + Deprecated at version TODODASK and is no longer available. Use + the methods `nc_clear_hdf5_chunksizes`, `nc_hdf5_chunksizes`, + and `nc_set_hdf5_chunksizes` instead. + + .. seealso:: `nc_clear_hdf5_chunksizes`, `nc_hdf5_chunksizes`, + `nc_set_hdf5_chunksizes` + + :Parameters: + + chunks: `dict` or `None`, *optional* + Specify HDF chunk sizes. + + When no positional argument is provided, the HDF chunk + sizes are unchanged. + + If `None` then the HDF chunks sizes for each dimension + are cleared, so that the HDF default chunk size value + will be used when writing data to disk. + + If a `dict` then it defines for a subset of the + dimensions, defined by their integer positions, the + corresponding HDF chunk sizes. The HDF chunk sizes are + set as a number of elements along the dimension. + + :Returns: + + `dict` + The HDF chunks for each dimension prior to the change, + or the current HDF chunks if no new values are + specified. A value of `None` is an indication that the + default chunk size should be used for that dimension. + + **Examples** + + >>> d = cf.Data(np.arange(30).reshape(5, 6)) + >>> d.HDF_chunks() + {0: None, 1: None} + >>> d.HDF_chunks({1: 2}) + {0: None, 1: None} + >>> d.HDF_chunks() + {0: None, 1: 2} + >>> d.HDF_chunks({1:None}) + {0: None, 1: 2} + >>> d.HDF_chunks() + {0: None, 1: None} + >>> d.HDF_chunks({0: 3, 1: 6}) + {0: None, 1: None} + >>> d.HDF_chunks() + {0: 3, 1: 6} + >>> d.HDF_chunks({1: 4}) + {0: 3, 1: 6} + >>> d.HDF_chunks() + {0: 3, 1: 4} + >>> d.HDF_chunks({1: 999}) + {0: 3, 1: 4} + >>> d.HDF_chunks() + {0: 3, 1: 999} + >>> d.HDF_chunks(None) + {0: 3, 1: 999} + >>> d.HDF_chunks() + {0: None, 1: None} + + """ + _DEPRECATION_ERROR_METHOD( + self, + "HDF_chunks", + message="Use the methods 'nc_clear_hdf5_chunksizes', " + "'nc_hdf5_chunksizes', and 'nc_set_hdf5_chunksizes' " + "instead.", + version="TODODASK", + removed_at="5.0.0", + ) # pragma: no cover @property def ismasked(self): @@ -110,7 +219,9 @@ def ismasked(self): True """ - _DEPRECATION_ERROR_METHOD("TODODASK use is_masked instead") + _DEPRECATION_ERROR_METHOD( + "TODODASK use is_masked instead" + ) # pragma: no cover @property def varray(self): @@ -133,7 +244,7 @@ def varray(self): array([999, 1, 2, 3, 4]) """ - _DEPRECATION_ERROR_METHOD("TODODASK") + _DEPRECATION_ERROR_METHOD("TODODASK") # pragma: no cover def add_partitions(self, extra_boundaries, pdim): """Add partition boundaries. @@ -155,7 +266,9 @@ def add_partitions(self, extra_boundaries, pdim): >>> d.add_partitions( ) """ - _DEPRECATION_ERROR_METHOD("TODODASK Consider using rechunk instead") + _DEPRECATION_ERROR_METHOD( + "TODODASK Consider using rechunk instead" + ) # pragma: no cover def partition_boundaries(self): """Return the partition boundaries for each partition matrix @@ -168,4 +281,6 @@ def partition_boundaries(self): **Examples:** """ - _DEPRECATION_ERROR_METHOD("TODODASK - consider using 'chunks' instead") + _DEPRECATION_ERROR_METHOD( + "TODODASK - consider using 'chunks' instead" + ) # pragma: no cover diff --git a/cf/docstring/docstring.py b/cf/docstring/docstring.py index 6902931c0e..2016cbe048 100644 --- a/cf/docstring/docstring.py +++ b/cf/docstring/docstring.py @@ -200,9 +200,8 @@ By default, ``"auto"`` is used to specify the array chunking, which uses a chunk size in bytes defined by - the configuration value - ``dask.config.get("array.chunk-size")``, - prefering square-like chunk shapes. + the `cf.chunksize` function, prefering square-like + chunk shapes. *Parameter example:* A blocksize like ``1000``. diff --git a/cf/field.py b/cf/field.py index c70e4390f8..99e5ee3a82 100644 --- a/cf/field.py +++ b/cf/field.py @@ -15334,7 +15334,7 @@ def subspace(self): """ return SubspaceField(self) - def section(self, axes=None, stop=None, **kwargs): + def section(self, axes=None, stop=None, min_step=1, **kwargs): """Return a FieldList of m dimensional sections of a Field of n dimensions, where M <= N. @@ -15385,7 +15385,28 @@ def section(self, axes=None, stop=None, **kwargs): ] """ - return FieldList(_section(self, axes, data=False, stop=stop, **kwargs)) + + # TODODASK: This still need some attention, keyword checking, + # testing, docs, etc., but has been partially + # already updated due to changes already happening + # in `cf.functions._section` that might be + # overlooked/obscured later. See the daskification + # of `cf.functions._section` and `cf.Data.section` + # for more details. + + axes = [self.domain_axis(axis, key=True) for axis in axes] + axis_indices = [] + for key in axes: + try: + axis_indices.append(self.get_data_axes().index(key)) + except ValueError: + pass + + axes = axis_indices + + return FieldList( + tuple(_section(self, axes, min_step=min_step).values()) + ) @_deprecated_kwarg_check("i") @_inplace_enabled(default=False) diff --git a/cf/functions.py b/cf/functions.py index 40df9d766a..ce87473f5f 100644 --- a/cf/functions.py +++ b/cf/functions.py @@ -13,7 +13,6 @@ from hashlib import md5 as hashlib_md5 from itertools import product from marshal import dumps as marshal_dumps -from math import ceil as math_ceil from numbers import Integral from os import getpid, listdir, mkdir from os.path import abspath as _os_path_abspath @@ -46,7 +45,7 @@ from numpy.ma import take as _numpy_ma_take from psutil import Process, virtual_memory -from . import __file__, __version__, mpi_size +from . import __file__, __version__ from .constants import ( CONSTANTS, OperandBoundsCombination, @@ -807,6 +806,11 @@ class chunksize(ConstantAccess): If called without any arguments then the existing chunksize is returned. + .. note:: Setting the chunksize will change the `dask` global + configuration value ``'array.chunk-size'``. If + `chunksize` is used a context manager then the `dask` + configuration value is only altered within that context. + :Parameters: arg: `float` or `str` or `Constant`, optional @@ -817,7 +821,6 @@ class chunksize(ConstantAccess): A chunksize of 2 MiB may be specified as ``2097152`` or ``'2 MiB'`` - :Returns: `Constant` @@ -1023,6 +1026,8 @@ class free_memory_factor(ConstantAccess): """ + # TODODASK: Review how all this free memory stuff works with dask + _name = "FREE_MEMORY_FACTOR" def _parse(cls, arg): @@ -2862,10 +2867,15 @@ def _section(x, axes=None, stop=None, chunks=False, min_step=1): passed. By default it is False. stop: `int`, optional + Deprecated at version TODODASK. + Stop after taking this number of sections and return. If stop is None all sections are taken. chunks: `bool`, optional + Deprecated at version TODODASK. Consider using + `cf.Data.rechunk` instead. + If True return sections that are of the maximum possible size that will fit in one chunk of memory instead of sectioning into slices of size 1 along the dimensions that @@ -2882,20 +2892,32 @@ def _section(x, axes=None, stop=None, chunks=False, min_step=1): The list of m dimensional sections of the Field or the dictionary of m dimensional sections of the Data object. - **Examples:** - - Section a field into 2D longitude/time slices, checking the units: + **Examples** - >>> _section(f, {None: 'longitude', units: 'radians'}, - ... {None: 'time', - ... 'units': 'days since 2006-01-01 00:00:00'}) + >>> d = cf.Data(np.arange(120).reshape(2, 6, 10)) + >>> d + + >>> d.section([0, 1], min_step=2) + {(None, None, 0): , + (None, None, 2): , + (None, None, 4): , + (None, None, 6): , + (None, None, 8): } - Section a field into 2D longitude/latitude slices, requiring exact - names: + """ + if stop is not None: + raise DeprecationError( + "The 'stop' keyword of cf._section() was deprecated at " + "version TODODASK and is no longer available" + ) - >>> _section(f, ['latitude', 'longitude'], exact=True) + if chunks: + raise DeprecationError( + "The 'chunks' keyword of cf._section() was deprecated at " + "version TODODASK and is no longer available Consider using " + "cf.Data.rechunk instead." + ) - """ if axes is None: axes = list(range(x.ndim)) @@ -2903,139 +2925,30 @@ def _section(x, axes=None, stop=None, chunks=False, min_step=1): ndim = x.ndim shape = x.shape - if stop is None: - stop = x.size + # TODODASK: For v4.0.0, redefine axes by removing the next + # line. I.e. the specified axes would be those that you + # want to be chopped, not those that you want to remain + # whole. axes = [i for i in range(ndim) if i not in axes] - indices = [[slice(None)]] * ndim - for i in axes: - indices[i] = [ - slice(j, j + min_step) for j in range(0, shape[i], min_step) - ] - - keys = [[None]] * ndim - for i in axes: - keys[i] = range(0, shape[i], min_step) + indices = [ + (slice(j, j + min_step) for j in range(0, n, min_step)) + if i in axes + else [slice(None)] + for i, n in enumerate(shape) + ] - if chunks: - - - out = {} - for j, (key, index) in enumerate(zip(product(*keys), product(*indices))): - if j >= stop: - return - - out[key] = x[index] + keys = [ + range(0, n, min_step) if i in axes else [None] + for i, n in enumerate(shape) + ] + out = { + key: x[index] for key, index in zip(product(*keys), product(*indices)) + } return out - def loop_over_index(x, current_index, axis_indices, indices): - """Expects an index to loop over in the list indices. - - If this is less than 0 the horizontal slice defined by indices - is appended to the FieldList fl, if it is the specified axis - indices the value in indices is left as slice(None) and it calls - itself recursively with the next index, otherwise each index is - looped over. In this loop the routine is called recursively with - the next index. If the count of the number of slices taken is - greater than or equal to stop it returns before taking any more - slices. - - """ - if current_index < 0: - if data: - d[tuple([x.start for x in indices])] = x[tuple(indices)] - else: - fl.append(x[tuple(indices)]) - - nl_vars["count"] += 1 - return - - if current_index in axis_indices: - loop_over_index(x, current_index - 1, axis_indices, indices) - return - - for i in range(0, sizes[current_index], steps[current_index]): - if stop is not None and nl_vars["count"] >= stop: - return - - indices[current_index] = slice(i, i + steps[current_index]) - loop_over_index(x, current_index - 1, axis_indices, indices) - - # Retrieve the index of each axis defining the sections - if data: - if isinstance(axes, int): - axes = (axes,) - - if not axes: - axis_indices = tuple(range(x.ndim)) - else: - axis_indices = axes - else: - axis_keys = [x.domain_axis(axis, key=True) for axis in axes] - axis_indices = list() - for key in axis_keys: - try: - axis_indices.append(x.get_data_axes().index(key)) - except ValueError: - pass - - # find the size of each dimension - sizes = x.shape - - if chunks: - steps = list(sizes) - - # Define the factor which, when multiplied by the size of the - # data array, determines how many chunks are in the data - # array. - # - # I.e. factor = 1/(the number of words per chunk) - factor = (x.dtype.itemsize + 1.0) / chunksize() - - # n_chunks = number of equal sized bits the partition needs to - # be split up into so that each bit's size is less - # than the chunk size. - n_chunks = int(math_ceil(x.size * factor)) - - for (index, axis_size) in enumerate(sizes): - if index in axis_indices: - # Do not attempt to "chunk" non-sectioned axes - continue - - if int(math_ceil(float(axis_size) / min_step)) <= n_chunks: - n_chunks = int( - math_ceil(n_chunks / float(axis_size) * min_step) - ) - steps[index] = min_step - - else: - steps[index] = int(axis_size / n_chunks) - break - else: - steps = [ - size if i in axis_indices else 1 for i, size in enumerate(sizes) - ] - - # Use recursion to slice out each section - if data: - d = dict() - else: - fl = [] - - indices = [slice(None)] * len(sizes) - - nl_vars = {"count": 0} - - current_index = len(sizes) - 1 - loop_over_index(x, current_index, axis_indices, indices) - - if data: - return d - else: - return fl - def _get_module_info(module, try_except=False): """Helper function for processing modules for cf.environment.""" diff --git a/cf/test/test_Data.py b/cf/test/test_Data.py index 7c25131a7d..3f57d388ef 100644 --- a/cf/test/test_Data.py +++ b/cf/test/test_Data.py @@ -3328,21 +3328,33 @@ def test_Data_dumpd_loadd_dumps(self): d.to_disk() self.assertTrue(d.equals(cf.Data(loadd=dumpd), verbose=2)) - @unittest.skipIf(TEST_DASKIFIED_ONLY, "hits unexpected kwarg 'select'") def test_Data_section(self): + d = cf.Data(np.arange(24).reshape(2, 3, 4)) + + e = d.section(-1) + self.assertIsInstance(e, dict) + self.assertEqual(len(e), 6) + + e = d.section([0, 2], min_step=2) + self.assertEqual(len(e), 2) + f = e[(None, 0, None)] + self.assertEqual(f.shape, (2, 2, 4)) + f = e[(None, 2, None)] + self.assertEqual(f.shape, (2, 1, 4)) + + e = d.section([0, 1, 2]) + self.assertEqual(len(e), 1) + key, value = e.popitem() + self.assertEqual(key, (None, None, None)) + self.assertTrue(value.equals(d)) + + @unittest.skipIf(TEST_DASKIFIED_ONLY, "Needs reconstruct_sectioned_data") + def test_Data_reconstruct_sectioned_data(self): if self.test_only and inspect.stack()[0][3] not in self.test_only: return - f = cf.read(self.filename6)[0] - self.assertEqual( - list(sorted(f.data.section((1, 2)).keys())), - [(x, None, None) for x in range(1800)], - ) - d = cf.Data(np.arange(120).reshape(2, 3, 4, 5)) - x = d.section([1, 3]) - self.assertEqual(len(x), 8) - e = cf.Data.reconstruct_sectioned_data(x) - self.assertTrue(e.equals(d)) + # TODODASK: Write when Data.reconstruct_sectioned_data is + # daskified @unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'") def test_Data_count(self): From 2f79336c2d78655023f2c6492cc58bbdec5c2f91 Mon Sep 17 00:00:00 2001 From: David Hassell Date: Tue, 5 Apr 2022 08:45:45 +0100 Subject: [PATCH 3/3] Typos Co-authored-by: Sadie L. Bartholomew --- cf/data/data.py | 1 - cf/data/mixin/deprecations.py | 2 +- cf/docstring/docstring.py | 2 +- cf/functions.py | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cf/data/data.py b/cf/data/data.py index e561f9fdbe..d7474b7bd6 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -13437,7 +13437,6 @@ def section( sectioning into slices of size 1 along the dimensions that are being sectioned. - Deprecated at version TODODASK. Use `rechunk` instead. min_step: `int`, optional The minimum step size when making chunks. By default this diff --git a/cf/data/mixin/deprecations.py b/cf/data/mixin/deprecations.py index 8ba68e119c..0c6c2f1b17 100644 --- a/cf/data/mixin/deprecations.py +++ b/cf/data/mixin/deprecations.py @@ -145,7 +145,7 @@ def HDF_chunks(self, *chunks): When no positional argument is provided, the HDF chunk sizes are unchanged. - If `None` then the HDF chunks sizes for each dimension + If `None` then the HDF chunk sizes for each dimension are cleared, so that the HDF default chunk size value will be used when writing data to disk. diff --git a/cf/docstring/docstring.py b/cf/docstring/docstring.py index 2016cbe048..188d454dbd 100644 --- a/cf/docstring/docstring.py +++ b/cf/docstring/docstring.py @@ -200,7 +200,7 @@ By default, ``"auto"`` is used to specify the array chunking, which uses a chunk size in bytes defined by - the `cf.chunksize` function, prefering square-like + the `cf.chunksize` function, preferring square-like chunk shapes. *Parameter example:* diff --git a/cf/functions.py b/cf/functions.py index ce87473f5f..e7a5c94471 100644 --- a/cf/functions.py +++ b/cf/functions.py @@ -2914,7 +2914,7 @@ def _section(x, axes=None, stop=None, chunks=False, min_step=1): if chunks: raise DeprecationError( "The 'chunks' keyword of cf._section() was deprecated at " - "version TODODASK and is no longer available Consider using " + "version TODODASK and is no longer available. Consider using " "cf.Data.rechunk instead." )