Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions cf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
from enum import Enum, auto
from tempfile import gettempdir

from dask import config
from dask.utils import parse_bytes
from numpy.ma import masked as numpy_ma_masked
from psutil import virtual_memory

from . import mpi_on, mpi_size

if mpi_on:
from . import mpi_comm

from .units import Units

# platform = sys.platform
Expand All @@ -21,6 +18,10 @@
# Find the total amount of memory, in bytes
# --------------------------------------------------------------------
_TOTAL_MEMORY = float(virtual_memory().total)

_CHUNKSIZE = "128 MiB"
config.set({"array.chunk-size": _CHUNKSIZE})

# if platform == 'darwin':
# # MacOS
# _MemTotal = float(virtual_memory().total)
Expand Down Expand Up @@ -53,7 +54,7 @@
TOTAL_MEMORY: `float`
Find the total amount of physical memory (in bytes).

CHUNKSIZE: `float`
CHUNKSIZE: `int`
The chunk size (in bytes) for data storage and processing.

FM_THRESHOLD: `float`
Expand All @@ -78,9 +79,7 @@
disabled.

FREE_MEMORY_FACTOR: `int`
Factor to divide the free memory by. If MPI is on this is equal
to the number of PEs. Otherwise it is equal to 1 and is ignored
in any case.
Factor to divide the free memory by.

COLLAPSE_PARALLEL_MODE: `int`
The mode to use when parallelising collapse. By default this is
Expand All @@ -107,25 +106,14 @@
"RELAXED_IDENTITIES": False,
"LOG_LEVEL": logging.getLevelName(logging.getLogger().level),
"BOUNDS_COMBINATION_MODE": "AND",
"CHUNKSIZE": parse_bytes(_CHUNKSIZE),
}

CONSTANTS["FM_THRESHOLD"] = (
CONSTANTS["FREE_MEMORY_FACTOR"] * CONSTANTS["TOTAL_MEMORY"]
)

if mpi_on:
CONSTANTS["MIN_TOTAL_MEMORY"] = min(
mpi_comm.allgather(CONSTANTS["TOTAL_MEMORY"])
)
else:
CONSTANTS["MIN_TOTAL_MEMORY"] = CONSTANTS["TOTAL_MEMORY"]

CONSTANTS["CHUNKSIZE"] = (
CONSTANTS["FREE_MEMORY_FACTOR"] * CONSTANTS["MIN_TOTAL_MEMORY"]
) / (
mpi_size * CONSTANTS["WORKSPACE_FACTOR_1"]
+ CONSTANTS["WORKSPACE_FACTOR_2"]
)
CONSTANTS["MIN_TOTAL_MEMORY"] = CONSTANTS["TOTAL_MEMORY"]

masked = numpy_ma_masked
# nomask = numpy_ma_nomask
Expand Down
184 changes: 51 additions & 133 deletions cf/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
)
from ..functions import (
_DEPRECATION_ERROR_KWARGS,
_DEPRECATION_ERROR_METHOD,
_numpy_isclose,
_section,
abspath,
Expand Down Expand Up @@ -2242,10 +2241,11 @@ def percentile(
if interpolation is not None:
_DEPRECATION_ERROR_KWARGS(
self,
"interpolation",
"percentile",
{"interpolation": None},
message="Use the 'method' parameter instead.",
version="4.0.0",
version="TODODASK",
removed_at="5.0.0",
) # pragma: no cover

d = _inplace_enabled_define_and_cleanup(self)
Expand Down Expand Up @@ -3296,15 +3296,12 @@ def rechunk(
`dask.array.rechunk` for details.

block_size_limit: `int`, optional
The maximum block size (in bytes) we want to produce.
Defaults to the configuration value
``dask.config.get('array.chunk-size')``. See
`dask.array.rechunk` for details.
The maximum block size (in bytes) we want to produce,
as defined by the `cf.chunksize` function.

balance: `bool`, optional
If True, try to make each chunk the same size. By
default this is not attempted. See
`dask.array.rechunk` for details.
default this is not attempted.

This means ``balance=True`` will remove any small
leftover chunks, so using ``d.rechunk(chunks=len(d) //
Expand Down Expand Up @@ -3336,15 +3333,15 @@ def rechunk(

>>> y = x.rechunk({0: -1, 1: 'auto'}, block_size_limit=1e8)

If a chunk size does not divide the dimension then rechunk will
leave any unevenness to the last chunk.
If a chunk size does not divide the dimension then rechunk
will leave any unevenness to the last chunk.

>>> x.rechunk(chunks=(400, -1)).chunks
((400, 400, 200), (1000,))

However if you want more balanced chunks, and don't mind Dask
choosing a different chunksize for you then you can use the
``balance=True`` option.
However if you want more balanced chunks, and don't mind
`dask` choosing a different chunksize for you then you can use
the ``balance=True`` option.

>>> x.rechunk(chunks=(400, -1), balance=True).chunks
((500, 500), (1000,))
Expand Down Expand Up @@ -7556,7 +7553,6 @@ def reconstruct_sectioned_data(cls, sections, cyclic=(), hardmask=None):
out.hardmask = hardmask

return out
# --- End: if

if keys[0][i] is not None:
new_sections = {}
Expand All @@ -7571,11 +7567,9 @@ def reconstruct_sectioned_data(cls, sections, cyclic=(), hardmask=None):
)
new_key = k[:i]
data_list = [sections[k]]
# --- End: for

new_sections[new_key] = cls.concatenate_data(data_list, i)
sections = new_sections
# --- End: for

def argmax(self, axis=None, unravel=False):
"""Return the indices of the maximum values along an axis.
Expand Down Expand Up @@ -11096,86 +11090,6 @@ def flip(self, axes=None, inplace=False, i=False):

return d

@daskified(_DASKIFIED_VERBOSE)
def HDF_chunks(self, *chunks):
"""Get or set HDF chunk sizes.

The HDF chunk sizes may be used by external code that allows
`Data` objects to be written to netCDF files.

Deprecated at version TODODASK and is no longer available. Use
the methods `nc_clear_hdf5_chunksizes`, `nc_hdf5_chunksizes`,
and `nc_set_hdf5_chunksizes` instead.

.. seealso:: `nc_clear_hdf5_chunksizes`, `nc_hdf5_chunksizes`,
`nc_set_hdf5_chunksizes`

:Parameters:

chunks: `dict` or `None`, *optional*
Specify HDF chunk sizes.

When no positional argument is provided, the HDF chunk
sizes are unchanged.

If `None` then the HDF chunks sizes for each dimension
are cleared, so that the HDF default chunk size value
will be used when writing data to disk.

If a `dict` then it defines for a subset of the
dimensions, defined by their integer positions, the
corresponding HDF chunk sizes. The HDF chunk sizes are
set as a number of elements along the dimension.

:Returns:

`dict`
The HDF chunks for each dimension prior to the change,
or the current HDF chunks if no new values are
specified. A value of `None` is an indication that the
default chunk size should be used for that dimension.

**Examples**

>>> d = cf.Data(np.arange(30).reshape(5, 6))
>>> d.HDF_chunks()
{0: None, 1: None}
>>> d.HDF_chunks({1: 2})
{0: None, 1: None}
>>> d.HDF_chunks()
{0: None, 1: 2}
>>> d.HDF_chunks({1:None})
{0: None, 1: 2}
>>> d.HDF_chunks()
{0: None, 1: None}
>>> d.HDF_chunks({0: 3, 1: 6})
{0: None, 1: None}
>>> d.HDF_chunks()
{0: 3, 1: 6}
>>> d.HDF_chunks({1: 4})
{0: 3, 1: 6}
>>> d.HDF_chunks()
{0: 3, 1: 4}
>>> d.HDF_chunks({1: 999})
{0: 3, 1: 4}
>>> d.HDF_chunks()
{0: 3, 1: 999}
>>> d.HDF_chunks(None)
{0: 3, 1: 999}
>>> d.HDF_chunks()
{0: None, 1: None}

"""
_DEPRECATION_ERROR_METHOD(
self,
"HDF_chunks",
message="Use the methods 'nc_clear_hdf5_chunksizes', "
"'nc_hdf5_chunksizes', and 'nc_set_hdf5_chunksizes' "
"instead.",
version="TODODASK",
removed_at="5.0.0",
)

def inspect(self):
"""Inspect the object for debugging.

Expand Down Expand Up @@ -11712,35 +11626,6 @@ def fits_in_memory(self, itemsize):
# ------------------------------------------------------------
return self.size * (itemsize + 1) <= free_memory() - cf_fm_threshold()

def fits_in_one_chunk_in_memory(self, itemsize):
"""Return True if the master array is small enough to be
retained in memory.

:Parameters:

itemsize: `int`
The number of bytes per word of the master data array.

:Returns:

`bool`

**Examples:**

>>> print(d.fits_one_chunk_in_memory(8))
False

"""
# ------------------------------------------------------------
# Note that self._size*(itemsize+1) is the array size in bytes
# including space for a full boolean mask
# ------------------------------------------------------------
return (
cf_chunksize()
>= self._size * (itemsize + 1)
<= free_memory() - cf_fm_threshold()
)

@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
@_manage_log_level_via_verbosity
Expand Down Expand Up @@ -13483,10 +13368,11 @@ def var(
_preserve_partitions=_preserve_partitions,
)

@daskified(_DASKIFIED_VERBOSE)
def section(
self, axes, stop=None, chunks=False, min_step=1, mode="dictionary"
):
"""Returns a dictionary of sections of the Data object.
"""Returns a dictionary of sections of the `Data` object.

Specifically, returns a dictionary of Data objects which are the
m-dimensional sections of this n-dimensional Data object, where
Expand All @@ -13509,15 +13395,21 @@ def section(
sectioned.

stop: `int`, optional
Deprecated at version TODODASK.

Stop after this number of sections and return. If stop is
None all sections are taken.

chunks: `bool`, optional
Depreated at version TODODASK. Consider using
`cf.Data.rechunk` instead.

If True return sections that are of the maximum possible
size that will fit in one chunk of memory instead of
sectioning into slices of size 1 along the dimensions that
are being sectioned.


min_step: `int`, optional
The minimum step size when making chunks. By default this
is 1. Can be set higher to avoid size 1 dimensions, which
Expand All @@ -13531,14 +13423,40 @@ def section(

**Examples:**

Section a Data object into 2D slices:
>>> d = cf.Data(np.arange(120).reshape(2, 6, 10))
>>> d
<CF Data(2, 6, 10): [[[0, ..., 119]]]>
>>> d.section([1, 2])
{(0, None, None): <CF Data(1, 6, 10): [[[0, ..., 59]]]>,
(1, None, None): <CF Data(1, 6, 10): [[[60, ..., 119]]]>}
>>> d.section([0, 1], min_step=2)
{(None, None, 0): <CF Data(2, 6, 2): [[[0, ..., 111]]]>,
(None, None, 2): <CF Data(2, 6, 2): [[[2, ..., 113]]]>,
(None, None, 4): <CF Data(2, 6, 2): [[[4, ..., 115]]]>,
(None, None, 6): <CF Data(2, 6, 2): [[[6, ..., 117]]]>,
(None, None, 8): <CF Data(2, 6, 2): [[[8, ..., 119]]]>}

"""
if chunks:
_DEPRECATION_ERROR_KWARGS(
self,
"section",
{"chunks": chunks},
message="Consider using Data.rechunk() instead.",
version="TODODASK",
removed_at="5.0.0",
) # pragma: no cover

>>> d.section((0, 1))
if stop is not None:
_DEPRECATION_ERROR_KWARGS(
self,
"section",
{"stop": stop},
version="TODODASK",
removed_at="5.0.0",
) # pragma: no cover

"""
return _section(
self, axes, data=True, stop=stop, chunks=chunks, min_step=min_step
)
return _section(self, axes, min_step=min_step)

# ----------------------------------------------------------------
# Alias
Expand Down
Loading