Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 93 additions & 47 deletions aodncore/pipeline/files.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import abc
import mimetypes
import os
import warnings
from collections import Counter, MutableSet, OrderedDict

from .common import (FileType, PipelineFilePublishType, PipelineFileCheckType, validate_addition_publishtype,
validate_checkresult, validate_deletion_publishtype, validate_publishtype,
validate_settable_checktype)
from .exceptions import AttributeValidationError, DuplicatePipelineFileError, MissingFileError
from .schema import validate_check_params
from ..util import (IndexedSet, ensure_regex_list, format_exception, get_file_checksum, iter_public_attributes,
matches_regexes, rm_f, slice_sequence, validate_bool, validate_callable, validate_int,
validate_mapping, validate_nonstring_iterable, validate_regexes, validate_relative_path_attr,
validate_string, validate_type)
from ..util import (IndexedSet, classproperty, ensure_regex_list, format_exception, get_file_checksum,
iter_public_attributes, matches_regexes, rm_f, slice_sequence, validate_bool, validate_callable,
validate_int, validate_mapping, validate_nonstring_iterable, validate_regexes,
validate_relative_path_attr, validate_string, validate_type)

__all__ = [
'PipelineFileCollection',
Expand Down Expand Up @@ -207,29 +208,31 @@ class PipelineFile(PipelineFileBase):
:type late_deletion: :py:class:`bool`
:param file_update_callback: optional callback to call when a file property is updated
:type file_update_callback: :py:class:`callable`
:param check_type: check type assigned to the file
:type check_type: PipelineFileCheckType
:param publish_type: publish type assigned to the file
:type publish_type: PipelineFilePublishType
"""
__slots__ = ['_archive_path', '_file_update_callback', '_check_type', '_is_deletion', '_late_deletion',
'_publish_type', '_should_archive', '_should_harvest', '_should_store', '_should_undo', '_is_checked',
'_is_archived', '_is_harvested', '_is_overwrite', '_is_stored', '_is_harvest_undone',
'_is_upload_undone', '_check_result', '_mime_type']

def __init__(self, local_path, name=None, archive_path=None, dest_path=None, is_deletion=False, late_deletion=False,
file_update_callback=None):
def __init__(self, local_path, name=None, archive_path=None, dest_path=None, is_deletion=False,
late_deletion=False, file_update_callback=None, check_type=None, publish_type=None):
super().__init__(local_path, dest_path)

self._name = name if name is not None else os.path.basename(local_path)

# general file attributes, set from parameters
self._archive_path = archive_path
self._is_deletion = is_deletion
self._late_deletion = late_deletion
self._name = name if name is not None else os.path.basename(local_path)

self._file_update_callback = None
if file_update_callback is not None:
self.file_update_callback = file_update_callback
# general file attributes, *not* set from parameters
self._check_result = None
self._mime_type = None

# processing flags - these express the *intended actions* for the file
self._check_type = PipelineFileCheckType.UNSET
self._is_deletion = is_deletion
self._late_deletion = late_deletion
self._publish_type = PipelineFilePublishType.UNSET
self._should_archive = False
self._should_harvest = False
self._should_store = False
Expand All @@ -244,8 +247,20 @@ def __init__(self, local_path, name=None, archive_path=None, dest_path=None, is_
self._is_harvest_undone = False
self._is_upload_undone = False

self._check_result = None
self._mime_type = None
# attributes which must be assigned by the property setter for validation. The backing variable is intentionally
# initialised to a safe default, before the setter is called if the calling code has supplied a value for the
# corresponding parameters
self._file_update_callback = None
if file_update_callback is not None:
self.file_update_callback = file_update_callback

self._check_type = PipelineFileCheckType.UNSET
if check_type is not None:
self.check_type = check_type

self._publish_type = PipelineFilePublishType.UNSET
if publish_type is not None:
self.publish_type = publish_type

@classmethod
def from_remotepipelinefile(cls, remotepipelinefile, is_deletion=False):
Expand Down Expand Up @@ -562,7 +577,7 @@ def _post_property_update(self, properties, include_values=True):
message="{properties}".format(properties=log_output))


class PipelineFileCollectionBase(MutableSet):
class PipelineFileCollectionBase(MutableSet, metaclass=abc.ABCMeta):
"""A collection base class which implements the MutableSet abstract base class to allow clean set operations, but
limited to containing only :py:class:`PipelineFile` or :py:class:`RemotePipelineFile`elements and providing specific
functionality for handling a collection of them (e.g. filtering, generating tabular data, etc.)
Expand All @@ -571,30 +586,45 @@ class PipelineFileCollectionBase(MutableSet):
path, or an :py:class:`Iterable` whose elements are :py:class:`PipelineFile` instances or file paths
:type data: :py:class:`PipelineFile`, :py:class:`RemotePipelineFile`, :py:class:`str`, :py:class:`Iterable`
"""
__slots__ = ['_s', 'member_class', 'member_validator', 'member_from_string_method', 'unique_attributes']
__slots__ = ['_s']

def __init__(self, data=None, member_class=PipelineFile, member_validator=None, member_from_string_method=None,
unique_attributes=()):
def __init__(self, data=None):
super().__init__()

self._s = IndexedSet()

self.member_class = member_class
self.member_validator = member_validator or validate_pipelinefile_or_string
self.member_from_string_method = getattr(self, member_from_string_method) or self.get_pipelinefile_from_src_path
self.unique_attributes = unique_attributes

if data is not None:
if isinstance(data, (self.member_class, str)):
data = [data]
for f in data:
self.add(f)

@property
@abc.abstractmethod
def member_class(cls):
raise NotImplementedError

@property
@abc.abstractmethod
def member_from_string_method(self):
raise NotImplementedError

@property
@abc.abstractmethod
def member_validator(cls):
raise NotImplementedError

@property
@abc.abstractmethod
def unique_attributes(cls):
raise NotImplementedError

def __bool__(self):
return bool(self._s)

def __contains__(self, v):
return v in self._s
element = v if isinstance(v, self.member_class) else self.member_from_string_method(v)
return element in self._s

def __getitem__(self, index):
result = self._s[index]
Expand Down Expand Up @@ -948,20 +978,27 @@ def validate_attribute_uniqueness(self, attribute):
duplicates=duplicates))


validate_remotepipelinefile_or_string = validate_type((RemotePipelineFile, str))


class RemotePipelineFileCollection(PipelineFileCollectionBase):
"""A PipelineFileCollectionBase subclass to hold a set of RemotePipelineFile instances
"""
@classproperty
def member_class(cls):
return RemotePipelineFile

def __init__(self, *args, **kwargs):
kwargs['member_class'] = RemotePipelineFile
kwargs['member_validator'] = validate_remotepipelinefile_or_string
kwargs['member_from_string_method'] = 'get_pipelinefile_from_dest_path'
kwargs['unique_attributes'] = {'local_path', 'dest_path'}
super().__init__(*args, **kwargs)
@classproperty
def member_validator(cls):
return validate_remotepipelinefile_or_string

def __contains__(self, v):
element = v if isinstance(v, self.member_class) else self.get_pipelinefile_from_dest_path(v)
return element in self._s
@property
def member_from_string_method(self):
return self.get_pipelinefile_from_dest_path

@classproperty
def unique_attributes(cls):
return 'local_path', 'dest_path'

@classmethod
def from_pipelinefilecollection(cls, pipelinefilecollection):
Expand All @@ -974,27 +1011,36 @@ def download(self, broker, local_path):
:param local_path: local path into which files are downloaded
:return: None
"""
warnings.warn("This method will be removed in a future version. From a pipeline handler, you should use "
"`self.state_query.download` instead.", DeprecationWarning)
broker.download(self, local_path)

def keys(self):
# backwards compatibility for code expecting broker query method to return a dict with keys being "dest_path"
return self.get_attribute_list('dest_path')


validate_pipelinefile_or_string = validate_type((PipelineFile, str))


class PipelineFileCollection(PipelineFileCollectionBase):
"""A PipelineFileCollectionBase subclass to hold a set of PipelineFile instances
"""
@classproperty
def member_class(cls):
return PipelineFile

def __init__(self, *args, **kwargs):
kwargs['member_class'] = PipelineFile
kwargs['member_validator'] = validate_pipelinefile_or_string
kwargs['member_from_string_method'] = 'get_pipelinefile_from_src_path'
kwargs['unique_attributes'] = {'archive_path', 'dest_path'}
super().__init__(*args, **kwargs)
@classproperty
def member_validator(cls):
return validate_pipelinefile_or_string

def __contains__(self, v):
element = v if isinstance(v, self.member_class) else self.get_pipelinefile_from_src_path(v)
return element in self._s
@property
def member_from_string_method(self):
return self.get_pipelinefile_from_src_path

@classproperty
def unique_attributes(cls):
return 'archive_path', 'dest_path'

@classmethod
def from_remotepipelinefilecollection(cls, remotepipelinefilecollection, are_deletions=False):
Expand Down Expand Up @@ -1134,9 +1180,9 @@ def set_publish_types_from_regexes(self, include_regexes, exclude_regexes, addit

validate_pipelinefilecollection = validate_type(PipelineFileCollection)
validate_pipelinefile_or_pipelinefilecollection = validate_type((PipelineFile, PipelineFileCollection))
validate_pipelinefile_or_string = validate_type((PipelineFile, str))


validate_remotepipelinefilecollection = validate_type(RemotePipelineFileCollection)
validate_remotepipelinefile_or_remotepipelinefilecollection = validate_type((RemotePipelineFile,
RemotePipelineFileCollection))
validate_remotepipelinefile_or_string = validate_type((RemotePipelineFile, str))

26 changes: 11 additions & 15 deletions aodncore/pipeline/handlerbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
from .destpath import get_path_function
from .exceptions import (PipelineProcessingError, HandlerAlreadyRunError, InvalidConfigError, InvalidInputFileError,
InvalidFileFormatError, MissingConfigParameterError, UnmatchedFilesError)
from .files import PipelineFile, PipelineFileCollection, ensure_remotepipelinefilecollection
from .files import PipelineFile, PipelineFileCollection
from .log import SYSINFO, get_pipeline_logger
from .schema import (validate_check_params, validate_custom_params, validate_harvest_params, validate_notify_params,
validate_resolve_params)
from .statequery import StateQuery
from .steps import (get_check_runner, get_harvester_runner, get_notify_runner, get_resolve_runner, get_store_runner)
from ..util import (ensure_regex_list, ensure_writeonceordereddict, format_exception,
get_file_checksum, iter_public_attributes, lazyproperty, matches_regexes, merge_dicts,
validate_relative_path_attr, TemporaryDirectory, DEFAULT_WFS_VERSION)
validate_relative_path_attr, TemporaryDirectory, WfsBroker, DEFAULT_WFS_VERSION)
from ..version import __version__ as _aodncore_version

__all__ = [
Expand Down Expand Up @@ -586,9 +586,9 @@ def state_query(self):
:return: StateQuery instance
:rtype: :py:class:`StateQuery`
"""
return StateQuery(storage_broker=self._upload_store_runner.broker,
wfs_url=self.config.pipeline_config['global'].get('wfs_url'),
wfs_version=self.config.pipeline_config['global'].get('wfs_version', DEFAULT_WFS_VERSION))
wfs_broker = WfsBroker(self.config.pipeline_config['global'].get('wfs_url'),
version=self.config.pipeline_config['global'].get('wfs_version', DEFAULT_WFS_VERSION))
return StateQuery(storage_broker=self._upload_store_runner.broker, wfs_broker=wfs_broker)

@property
def default_addition_publish_type(self):
Expand Down Expand Up @@ -1022,19 +1022,15 @@ def postprocess(self): # pragma: no cover
# "public" methods
#

def download_remotepipelinefilecollection(self, remotepipelinefilecollection, local_path=None):
"""Helper method to download a RemotePipelineFileCollection or RemotePipelineFile using the handler's internal
storage broker
def add_pipelinefile(self, pipeline_file):
"""Helper method to add a PipelineFile to the handler's file_collection, with the handler instance's
file_update_callback method assigned

:param remotepipelinefilecollection: RemotePipelineFileCollection to download
:param local_path: local path where files will be downloaded. Defaults to the handler's :attr:`temp_dir` value.
:param pipeline_file: PipelineFile object
:return: None
"""
remotepipelinefilecollection = ensure_remotepipelinefilecollection(remotepipelinefilecollection)
if local_path is None:
local_path = self.temp_dir

remotepipelinefilecollection.download(self._upload_store_runner.broker, local_path)
self.file_collection.add(pipeline_file)
pipeline_file.file_update_callback = self._file_update_callback

def run(self):
"""The entry point to the handler instance. Executes the automatic state machine transitions, and populates the
Expand Down
33 changes: 15 additions & 18 deletions aodncore/pipeline/statequery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ..util import WfsBroker, DEFAULT_WFS_VERSION
from .files import RemotePipelineFileCollection

__all__ = [
'StateQuery'
Expand All @@ -9,22 +9,9 @@ class StateQuery(object):
"""Simple state query interface, to provide user friendly access for querying existing Pipeline state
"""

def __init__(self, storage_broker, wfs_url, wfs_version=DEFAULT_WFS_VERSION):
def __init__(self, storage_broker, wfs_broker):
self._storage_broker = storage_broker
self._wfs_url = wfs_url
self._wfs_version = wfs_version

self._wfs_broker_object = None

@property
def _wfs_broker(self):
if not self._wfs_url:
raise AttributeError('WFS querying unavailable: no wfs_url configured?')

# lazy instantiation of broker to avoid any WFS activity unless a handler explicitly calls it
if self._wfs_broker_object is None:
self._wfs_broker_object = WfsBroker(self._wfs_url, version=self._wfs_version)
return self._wfs_broker_object
self._wfs_broker = wfs_broker

@property
def wfs(self):
Expand Down Expand Up @@ -55,9 +42,9 @@ def query_wfs_urls_for_layer(self, layer, **kwargs): # pragma: no cover

:param layer: layer name supplied to GetFeature typename parameter
:param kwargs: keyword arguments passed to underlying broker method
:return: list of files for the layer
:return: RemotePipelineFileCollection containing list of files for the layer
"""
return self._wfs_broker.query_urls_for_layer(layer, **kwargs)
return RemotePipelineFileCollection(self._wfs_broker.query_urls_for_layer(layer, **kwargs))

def query_wfs_url_exists(self, layer, name): # pragma: no cover
"""Returns a bool representing whether a given 'file_url' is present in a layer
Expand All @@ -67,3 +54,13 @@ def query_wfs_url_exists(self, layer, name): # pragma: no cover
:return: list of files for the layer
"""
return self._wfs_broker.query_url_exists_for_layer(layer, name)

def download(self, remotepipelinefilecollection, local_path):
"""Helper method to download a RemotePipelineFileCollection or RemotePipelineFile using the handler's internal
storage broker

:param remotepipelinefilecollection: RemotePipelineFileCollection to download
:param local_path: local path where files will be downloaded. Defaults to the handler's :attr:`temp_dir` value.
:return: None
"""
self._storage_broker.download(remotepipelinefilecollection, local_path)
Loading