diff --git a/cf/__init__.py b/cf/__init__.py index e5a4aa0df3..4d421c3ab3 100644 --- a/cf/__init__.py +++ b/cf/__init__.py @@ -158,7 +158,7 @@ ) # Check the version of numpy -_minimum_vn = "1.15" +_minimum_vn = "1.22" if LooseVersion(numpy.__version__) < LooseVersion(_minimum_vn): raise RuntimeError( f"Bad numpy version: cf requires numpy>={_minimum_vn}. " diff --git a/cf/data/dask_utils.py b/cf/data/dask_utils.py index dc51ea9e67..d858b8ecf1 100644 --- a/cf/data/dask_utils.py +++ b/cf/data/dask_utils.py @@ -4,6 +4,8 @@ instance, as would be passed to `dask.array.map_blocks`. """ +from functools import reduce +from operator import mul import dask.array as da import numpy as np @@ -179,11 +181,140 @@ def cf_harden_mask(a): """ if np.ma.isMA(a): - a.harden_mask() + try: + a.harden_mask() + except AttributeError: + # Trap cases when the input array is not a numpy array + # (e.g. it might be numpy.ma.masked). + pass return a +def cf_percentile(a, q, axis, method, keepdims=False, mtol=1): + """Compute percentiles of the data along the specified axes. + + See `cf.Data.percentile` for further details. + + .. note:: This function correctly sets the mask hardness of the + output array. + + .. versionadded:: TODODASK + + .. seealso:: `cf.Data.percentile` + + :Parameters: + + a: `numpy.ndarray` + Input array. + + q: `numpy.ndarray` + Percentile or sequence of percentiles to compute, which + must be between 0 and 100 inclusive. + + axis: `tuple` of `int` + Axes along which the percentiles are computed. + + method: `str` + Specifies the interpolation method to use when the desired + percentile lies between two data points ``i < j``. + + keepdims: `bool`, optional + If this is set to True, the axes which are reduced are + left in the result as dimensions with size one. With this + option, the result will broadcast correctly against the + original array *a*. + + mtol: number, optional + Set an upper limit of the amount input data values which + are allowed to be missing data when contributing to + individual output percentile values. It is defined as a + fraction (between 0 and 1 inclusive) of the contributing + input data values. The default is 1, meaning that a + missing datum in the output array only occurs when all of + its contributing input array elements are missing data. A + value of 0 means that a missing datum in the output array + occurs whenever any of its contributing input array + elements are missing data. + + :Returns: + + `numpy.ndarray` + + """ + if np.ma.is_masked(a): + # ------------------------------------------------------------ + # Input array is masked: Replace missing values with NaNs and + # remask later. + # ------------------------------------------------------------ + if a.dtype != float: + # Can't assign NaNs to integer arrays + a = a.astype(float, copy=True) + + mask = None + if mtol < 1: + # Count the number of missing values that contribute to + # each output percentile value and make a corresponding + # mask + full_size = reduce( + mul, [size for i, size in enumerate(a.shape) if i in axis], 1 + ) + n_missing = full_size - np.ma.count( + a, axis=axis, keepdims=keepdims + ) + if n_missing.any(): + mask = np.where(n_missing >= mtol * full_size, True, False) + if q.ndim: + mask = np.expand_dims(mask, 0) + + a = np.ma.filled(a, np.nan) + + with np.testing.suppress_warnings() as sup: + sup.filter( + category=RuntimeWarning, + message=".*All-NaN slice encountered.*", + ) + sup.filter( + category=UserWarning, + message="Warning: 'partition' will ignore the 'mask' of the MaskedArray.*", + ) + p = np.nanpercentile( + a, + q, + axis=axis, + method=method, + keepdims=keepdims, + overwrite_input=True, + ) + + # Update the mask for NaN points + nan_mask = np.isnan(p) + if nan_mask.any(): + if mask is None: + mask = nan_mask + else: + mask = np.ma.where(nan_mask, True, mask) + + # Mask any NaNs and elements below the mtol threshold + if mask is not None: + p = np.ma.where(mask, np.ma.masked, p) + + else: + # ------------------------------------------------------------ + # Input array is not masked + # ------------------------------------------------------------ + p = np.percentile( + a, + q, + axis=axis, + method=method, + keepdims=keepdims, + overwrite_input=False, + ) + + return p + + def cf_soften_mask(a): """Soften the mask of a masked `numpy` array. @@ -205,7 +336,12 @@ def cf_soften_mask(a): """ if np.ma.isMA(a): - a.soften_mask() + try: + a.soften_mask() + except AttributeError: + # Trap cases when the input array is not a numpy array + # (e.g. it might be numpy.ma.masked). + pass return a diff --git a/cf/data/data.py b/cf/data/data.py index 78868d0336..2e46ded4c2 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -12,9 +12,11 @@ import cftime import dask.array as da import numpy as np - -# from dask.array.core import slices_from_chunks -from dask.base import is_dask_collection +from dask.array import Array +from dask.array.core import normalize_chunks +from dask.base import is_dask_collection, tokenize +from dask.core import flatten +from dask.highlevelgraph import HighLevelGraph from numpy.testing import suppress_warnings as numpy_testing_suppress_warnings from ..cfdatetime import dt as cf_dt @@ -27,7 +29,12 @@ _inplace_enabled_define_and_cleanup, _manage_log_level_via_verbosity, ) -from ..functions import _numpy_isclose, _section, abspath +from ..functions import ( + _DEPRECATION_ERROR_KWARGS, + _numpy_isclose, + _section, + abspath, +) from ..functions import atol as cf_atol from ..functions import broadcast_array from ..functions import chunksize as cf_chunksize @@ -99,6 +106,7 @@ from .dask_utils import ( _da_ma_allclose, cf_harden_mask, + cf_percentile, cf_soften_mask, cf_where, ) @@ -1971,6 +1979,8 @@ def digitize( return out + @daskified(_DASKIFIED_VERBOSE) + @_deprecated_kwarg_check("_preserve_partitions") def median( self, axes=None, @@ -1980,14 +1990,12 @@ def median( _preserve_partitions=False, ): """Compute the median of the values.""" - return self.percentile( 50, axes=axes, squeeze=squeeze, mtol=mtol, inplace=inplace, - _preserve_partitions=_preserve_partitions, ) @_inplace_enabled(default=False) @@ -2043,15 +2051,19 @@ def mean_of_upper_decile( return d + @daskified(_DASKIFIED_VERBOSE) + @_deprecated_kwarg_check("_preserve_partitions") + @_inplace_enabled(default=False) def percentile( self, ranks, axes=None, - interpolation="linear", + method="linear", squeeze=False, mtol=1, inplace=False, _preserve_partitions=False, + interpolation=None, ): """Compute percentiles of the data along the specified axes. @@ -2067,9 +2079,32 @@ def percentile( dimension is created so that percentiles can be stored for each percentile rank. + **Accuracy** + + The `percentile` method returns results that are consistent + with `numpy.percentile`, which may be different to those + created by `dask.percentile`. The dask method uses an + algorithm that calculates approximate percentiles which are + likely to be different from the correct values when there are + two or more dask chunks. + + >>> import numpy as np + >>> import dask.array as da + >>> import cf + >>> a = np.arange(101) + >>> dx = da.from_array(a, chunks=10) + >>> da.percentile(dx, [40, 60]).compute() + array([40.36]) + >>> np.percentile(a, 40) + array([40.]) + >>> d = cf.Data(a, chunks=10) + >>> d.percentile(40).array + array([40.]) + .. versionadded:: 3.0.4 - .. seealso:: `digitize`, `median`, `mean_of_upper_decile`, `where` + .. seealso:: `digitize`, `median`, `mean_of_upper_decile`, + `where` :Parameters: @@ -2084,23 +2119,36 @@ def percentile( By default, of *axes* is `None`, all axes are selected. - interpolation: `str`, optional - Specify the interpolation method to use when the desired - percentile lies between two data values ``i < j``: - - =============== ========================================= - *interpolation* Description - =============== ========================================= - ``'linear'`` ``i+(j-i)*fraction``, where ``fraction`` - is the fractional part of the index - surrounded by ``i`` and ``j`` - ``'lower'`` ``i`` - ``'higher'`` ``j`` - ``'nearest'`` ``i`` or ``j``, whichever is nearest - ``'midpoint'`` ``(i+j)/2`` - =============== ========================================= - - By default ``'linear'`` interpolation is used. + method: `str`, optional + Specify the interpolation method to use when the + desired percentile lies between two data values. The + methods are listed here, but their definitions must be + referenced from the documentation for + `numpy.percentile`. + + For the default ``'linear'`` method, if the percentile + lies between two adjacent data values ``i < j`` then + the percentile is calculated as ``i+(j-i)*fraction``, + where ``fraction`` is the fractional part of the index + surrounded by ``i`` and ``j``. + + =============================== + *method* + =============================== + ``'inverted_cdf'`` + ``'averaged_inverted_cdf'`` + ``'closest_observation'`` + ``'interpolated_inverted_cdf'`` + ``'hazen'`` + ``'weibull'`` + ``'linear'`` (default) + ``'median_unbiased'`` + ``'normal_unbiased'`` + ``'lower'`` + ``'higher'`` + ``'nearest'`` + ``'midpoint'`` + =============================== squeeze: `bool`, optional If True then all axes over which percentiles are @@ -2111,31 +2159,37 @@ def percentile( data. mtol: number, optional - Set the fraction of input data elements which is allowed - to contain missing data when contributing to an individual - output data element. Where this fraction exceeds *mtol*, - missing data is returned. The default is 1, meaning that a - missing datum in the output array occurs when its - contributing input array elements are all missing data. A - value of 0 means that a missing datum in the output array - occurs whenever any of its contributing input array - elements are missing data. Any intermediate value is - permitted. + Set an upper limit of the amount input data values + which are allowed to be missing data when contributing + to individual output percentile values. It is defined + as a fraction (between 0 and 1 inclusive) of the + contributing input data values. The default is 1, + meaning that a missing datum in the output array only + occurs when all of its contributing input array + elements are missing data. A value of 0 means that a + missing datum in the output array occurs whenever any + of its contributing input array elements are missing + data. *Parameter example:* To ensure that an output array element is a missing - datum if more than 25% of its input array elements are - missing data: ``mtol=0.25``. + value if more than 25% of its input array elements + are missing data: ``mtol=0.25``. {{inplace: `bool`, optional}} + interpolation: deprecated at version 4.0.0 + Use the *method* parameter instead. + + _preserve_partitions: deprecated at version 4.0.0 + :Returns: `Data` or `None` The percentiles of the original data, or `None` if the operation was in-place. - **Examples:** + **Examples** >>> d = cf.Data(numpy.arange(12).reshape(3, 4), 'm') >>> print(d.array) @@ -2196,99 +2250,103 @@ def percentile( [2 2 3 3]] """ - ranks = np.array(ranks).flatten() - ranks.sort() - - if ranks[0] < 0 or ranks[-1] > 100: - raise ValueError( - "Each percentile rank must be in the range [0, 100]. " - "Got {!r}".format(ranks) - ) - - n_ranks = ranks.size - if n_ranks == 1: - ranks = ranks.squeeze() + if interpolation is not None: + _DEPRECATION_ERROR_KWARGS( + self, + "interpolation", + {"interpolation": None}, + message="Use the 'method' parameter instead.", + version="4.0.0", + ) # pragma: no cover - if axes is None: - axes = list(range(self.ndim)) - else: - axes = sorted(self._parse_axes(axes)) - - # If the input data array 'fits' in one chunk of memory, then - # make sure that it has only one partition - if ( - not _preserve_partitions - and self._pmndim - and self.fits_in_one_chunk_in_memory(self.dtype.itemsize) - ): - self.varray + d = _inplace_enabled_define_and_cleanup(self) - org_chunksize = cf_chunksize(cf_chunksize() / n_ranks) - sections = self.section(axes, chunks=True) - cf_chunksize(org_chunksize) + # Parse percentile ranks + q = ranks + if not (isinstance(q, np.ndarray) or is_dask_collection(q)): + q = np.array(ranks) - for key, data in sections.items(): - array = data.array + if q.ndim > 1: + q = q.flatten() - masked = np.ma.is_masked(array) - if masked: - if array.dtype != _dtype_float: - # Can't assign NaNs to integer arrays - array = array.astype(float, copy=True) + if not np.issubdtype(d.dtype, np.number): + method = "nearest" - array = np.ma.filled(array, np.nan) - func = np.nanpercentile + if axes is None: + axes = tuple(range(d.ndim)) + else: + axes = tuple(sorted(d._parse_axes(axes))) - with numpy_testing_suppress_warnings() as sup: - sup.filter( - RuntimeWarning, message=".*All-NaN slice encountered" - ) - p = func( - array, - ranks, - axis=axes, - interpolation=interpolation, - keepdims=True, - overwrite_input=False, - ) + dx = d._get_dask() + dtype = dx.dtype + shape = dx.shape - # Replace NaNs with missing data - p = np.ma.masked_where(np.isnan(p), p, copy=False) - else: - func = np.percentile - p = func( - array, - ranks, - axis=axes, - interpolation=interpolation, - keepdims=True, - overwrite_input=False, - ) + # Rechunk the data so that the dimensions over which + # percentiles are being calculated all have one chunk. + # + # Make sure that no new chunks are larger (in bytes) than any + # original chunk. + new_chunks = normalize_chunks( + [-1 if i in axes else "auto" for i in range(dx.ndim)], + shape=shape, + dtype=dtype, + limit=dtype.itemsize * reduce(mul, map(max, dx.chunks), 1), + ) + dx = dx.rechunk(new_chunks) - sections[key] = type(self)( - p, units=self.Units, fill_value=self.fill_value + # Initialise the indices of each chunk of the result + # + # E.g. [(0, 0, 0), (0, 0, 1), (0, 1, 0), (0, 1, 1)] + keys = [key[1:] for key in flatten(dx.__dask_keys__())] + + keepdims = not squeeze + if not keepdims: + # Remove axes that will be dropped in the result + indices = [i for i in range(len(keys[0])) if i not in axes] + keys = [tuple([k[i] for i in indices]) for k in keys] + + if q.ndim: + # Insert a leading rank dimension for non-scalar input + # percentile ranks + keys = [(0,) + k for k in keys] + + # Create a new dask dictionary for the result + name = "cf-percentile-" + tokenize(dx, axes, q, method) + name = (name,) + dsk = { + name + + chunk_index: ( + cf_percentile, + dask_key, + q, + axes, + method, + keepdims, + mtol, ) - # --- End: for - - # Glue the sections back together again - out = self.reconstruct_sectioned_data(sections) + for chunk_index, dask_key in zip(keys, flatten(dx.__dask_keys__())) + } - if mtol < 1: - mask = self.sample_size(axes, mtol=mtol) == 0 - mask.filled(True, inplace=True) - if out.ndim == self.ndim + 1: - mask.insert_dimension(0, inplace=True) + # Define the chunks for the result + if q.ndim: + out_chunks = [(q.size,)] + else: + out_chunks = [] - out.where(mask, cf_masked, inplace=True) + for i, c in enumerate(dx.chunks): + if i in axes: + if keepdims: + out_chunks.append((1,)) + else: + out_chunks.append(c) - if squeeze: - out.squeeze(inplace=True) + name = name[0] + graph = HighLevelGraph.from_collections(name, dsk, dependencies=[dx]) + dx = Array(graph, name, chunks=out_chunks, dtype=float) - if inplace: - self.__dict__ = out.__dict__ - return + d._set_dask(dx, reset_mask_hardness=True) - return out + return d @_inplace_enabled(default=False) def persist(self, inplace=False): diff --git a/cf/field.py b/cf/field.py index fab4a0d7c9..c70e4390f8 100644 --- a/cf/field.py +++ b/cf/field.py @@ -14351,6 +14351,10 @@ def percentile( # ------------------------------------------------------------ # Initialize the output field with the percentile data # ------------------------------------------------------------ + + # TODODASK: Make sure that this is OK whaen `ranks` is a + # scalar + out = type(self)() out.set_properties(self.properties()) diff --git a/cf/test/test_Data.py b/cf/test/test_Data.py index 5d8e678c13..1a12302a88 100644 --- a/cf/test/test_Data.py +++ b/cf/test/test_Data.py @@ -2549,80 +2549,128 @@ def test_Data_max_min_sum_sum_of_squares(self): "\ne={}, \nb={}".format(h, axes, e.array, b), ) - @unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'") - def test_Data_median(self): + def test_Data_percentile_median(self): if self.test_only and inspect.stack()[0][3] not in self.test_only: return - for pp in (True, False): - # unweighted, unmasked - d = cf.Data(self.a) - for axes in self.axes_combinations: - b = reshape_array(self.a, axes) - b = np.median(b, axis=-1) + # ranks: a sequence of percentile rank inputs. NOTE: must + # include 50 as the last input so that cf.Data.median is also + # tested correctly. + ranks = ([30, 60, 90], [90, 30], [20]) + ranks = ranks + (50,) - e = d.median(axes=axes, squeeze=True, _preserve_partitions=pp) - self.assertTrue( - e.allclose(b, rtol=1e-05, atol=1e-08), - "median, axis={}, unweighted, unmasked " - "\ne={}, \nb={}".format(axes, e.array, b), - ) + d = cf.Data(self.a, chunks=(2, 2, 3, 5)) - # unweighted, masked - d = cf.Data(self.ma) - for axes in self.axes_combinations: - b = reshape_array(self.ma, axes) - b = np.ma.filled(b, np.nan) - with np.testing.suppress_warnings() as sup: - sup.filter( - RuntimeWarning, message=".*All-NaN slice encountered" - ) - b = np.nanpercentile(b, 50, axis=-1) + for axis in [None] + self.axes_combinations: + for keepdims in (True, False): + for q in ranks: + a1 = np.percentile(d, q, axis=axis, keepdims=keepdims) + b1 = d.percentile(q, axes=axis, squeeze=not keepdims) + self.assertEqual(b1.shape, a1.shape) + self.assertTrue((b1.array == a1).all()) - b = np.ma.masked_where(np.isnan(b), b, copy=False) - b = np.ma.asanyarray(b) + # Masked data + a = self.ma + filled = np.ma.filled(a, np.nan) + d = cf.Data(self.ma, chunks=(2, 2, 3, 5)) - e = d.median(axes=axes, squeeze=True, _preserve_partitions=pp) + with np.testing.suppress_warnings() as sup: + sup.filter( + category=RuntimeWarning, + message=".*All-NaN slice encountered.*", + ) + sup.filter( + category=UserWarning, + message="Warning: 'partition' will ignore the 'mask' of the MaskedArray.*", + ) + for axis in [None] + self.axes_combinations: + for keepdims in (True, False): + for q in ranks: + a1 = np.nanpercentile( + filled, q, axis=axis, keepdims=keepdims + ) + mask = np.isnan(a1) + if mask.any(): + a1 = np.ma.masked_where(mask, a1, copy=False) - self.assertTrue( - (e.mask.array == b.mask).all(), - "median, axis={}, \ne.mask={}, " - "\nb.mask={}".format(axes, e.mask.array, b.mask), - ) + b1 = d.percentile(q, axes=axis, squeeze=not keepdims) + self.assertEqual(b1.shape, a1.shape) + self.assertTrue((b1.array == a1).all()) - self.assertTrue( - e.allclose(b, rtol=1e-05, atol=1e-08), - "median, axis={}, unweighted, masked " - "\ne={}, \nb={}".format(axes, e.array, b), - ) + # Test scalar input (not masked) + a = np.array(9) + d = cf.Data(a) + for keepdims in (True, False): + for q in ranks: + a1 = np.nanpercentile(a, q, keepdims=keepdims) + b1 = d.percentile(q, squeeze=not keepdims) + self.assertEqual(b1.shape, a1.shape) + self.assertTrue((b1.array == a1).all()) + + # Test scalar input (masked) + a = np.ma.array(9, mask=True) + filled = np.ma.filled(a.astype(float), np.nan) + d = cf.Data(a) - @unittest.skipIf(TEST_DASKIFIED_ONLY, "no attribute '_pmndim'") - def test_Data_percentile(self): - if self.test_only and inspect.stack()[0][3] not in self.test_only: - return + with np.testing.suppress_warnings() as sup: + sup.filter( + category=RuntimeWarning, + message=".*All-NaN slice encountered.*", + ) + sup.filter( + category=UserWarning, + message="Warning: 'partition' will ignore the 'mask' of the MaskedArray.*", + ) + for keepdims in (True, False): + for q in ranks: + a1 = np.nanpercentile(filled, q, keepdims=keepdims) + mask = np.isnan(a1) + if mask.any(): + a1 = np.ma.masked_where(mask, a1, copy=False) + + b1 = d.percentile(q, squeeze=not keepdims) + self.assertEqual(b1.shape, a1.shape) + self.assertTrue( + (b1.array == a1).all() in (True, np.ma.masked) + ) + # Test mtol=1 d = cf.Data(self.a) + d[...] = cf.masked # All masked + for axis in [None] + self.axes_combinations: + for q in ranks: + e = d.percentile(q, axes=axis, mtol=1) + self.assertFalse(np.ma.count(e.array, keepdims=True).any()) - # Percentiles taken across *all axes* - ranks = [[30, 60, 90], [20], 80] # include valid singular form - - for rank in ranks: - # Note: in cf the default is squeeze=False, but - # numpy has an inverse parameter called keepdims - # which is by default False also, one must be set - # to the non-default for equivalents. So first - # cases (n1, n1) are both squeezed, (n2, n2) are - # not: - a1 = np.percentile(d, rank) # keepdims=False default - b1 = d.percentile(rank, squeeze=True) - self.assertTrue(b1.allclose(a1, rtol=1e-05, atol=1e-08)) - a2 = np.percentile(d, rank, keepdims=True) - b2 = d.percentile(rank) # squeeze=False default - self.assertTrue(b2.shape, a2.shape) - self.assertTrue(b2.allclose(a2, rtol=1e-05, atol=1e-08)) - - # TODO: add loop to check get same shape and close enough data - # for every possible axes combo (as with test_Data_median above). + a = np.ma.arange(12).reshape(3, 4) + d = cf.Data(a) + d[1, -1] = cf.masked # 1 value masked + for q in ranks: + e = d.percentile(q, mtol=1) + self.assertTrue(np.ma.count(e.array, keepdims=True).all()) + + # Test mtol=0 + for q in ranks: + e = d.percentile(q, mtol=0) + self.assertFalse(np.ma.count(e.array, keepdims=True).any()) + + # Test mtol=0.1 + for q in ranks: + e = d.percentile(q, axes=0, mtol=0.1) + self.assertEqual(np.ma.count(e.array), 3 * e.shape[0]) + + for q in ranks[:-1]: # axis=1: exclude the non-sequence rank + e = d.percentile(q, axes=1, mtol=0.1) + self.assertEqual(np.ma.count(e.array), 2 * e.shape[0]) + + q = ranks[-1] # axis=1: test the non-sequence rank + e = d.percentile(q, axes=1, mtol=0.1) + self.assertEqual(np.ma.count(e.array), e.shape[0] - 1) + + # Check invalid ranks (those not in [0, 100]) + for q in (-9, [999], [50, 999], [999, 50]): + with self.assertRaises(ValueError): + d.percentile(q).array @unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'") def test_Data_mean_of_upper_decile(self): diff --git a/requirements.txt b/requirements.txt index 69def82fe4..f1964f302d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ netCDF4>=1.5.4 cftime>=1.5.0 -numpy>=1.15 +numpy>=1.22 cfdm>=1.9.0.1, <1.9.1.0 psutil>=0.6.0 cfunits>=3.3.4