diff --git a/aodncore/pipeline/files.py b/aodncore/pipeline/files.py index fb8e9ed7..7717f318 100644 --- a/aodncore/pipeline/files.py +++ b/aodncore/pipeline/files.py @@ -1,6 +1,7 @@ import abc import mimetypes import os +import warnings from collections import Counter, MutableSet, OrderedDict from .common import (FileType, PipelineFilePublishType, PipelineFileCheckType, validate_addition_publishtype, @@ -8,10 +9,10 @@ validate_settable_checktype) from .exceptions import AttributeValidationError, DuplicatePipelineFileError, MissingFileError from .schema import validate_check_params -from ..util import (IndexedSet, ensure_regex_list, format_exception, get_file_checksum, iter_public_attributes, - matches_regexes, rm_f, slice_sequence, validate_bool, validate_callable, validate_int, - validate_mapping, validate_nonstring_iterable, validate_regexes, validate_relative_path_attr, - validate_string, validate_type) +from ..util import (IndexedSet, classproperty, ensure_regex_list, format_exception, get_file_checksum, + iter_public_attributes, matches_regexes, rm_f, slice_sequence, validate_bool, validate_callable, + validate_int, validate_mapping, validate_nonstring_iterable, validate_regexes, + validate_relative_path_attr, validate_string, validate_type) __all__ = [ 'PipelineFileCollection', @@ -207,29 +208,31 @@ class PipelineFile(PipelineFileBase): :type late_deletion: :py:class:`bool` :param file_update_callback: optional callback to call when a file property is updated :type file_update_callback: :py:class:`callable` + :param check_type: check type assigned to the file + :type check_type: PipelineFileCheckType + :param publish_type: publish type assigned to the file + :type publish_type: PipelineFilePublishType """ __slots__ = ['_archive_path', '_file_update_callback', '_check_type', '_is_deletion', '_late_deletion', '_publish_type', '_should_archive', '_should_harvest', '_should_store', '_should_undo', '_is_checked', '_is_archived', '_is_harvested', '_is_overwrite', '_is_stored', '_is_harvest_undone', '_is_upload_undone', '_check_result', '_mime_type'] - def __init__(self, local_path, name=None, archive_path=None, dest_path=None, is_deletion=False, late_deletion=False, - file_update_callback=None): + def __init__(self, local_path, name=None, archive_path=None, dest_path=None, is_deletion=False, + late_deletion=False, file_update_callback=None, check_type=None, publish_type=None): super().__init__(local_path, dest_path) - self._name = name if name is not None else os.path.basename(local_path) - + # general file attributes, set from parameters self._archive_path = archive_path + self._is_deletion = is_deletion + self._late_deletion = late_deletion + self._name = name if name is not None else os.path.basename(local_path) - self._file_update_callback = None - if file_update_callback is not None: - self.file_update_callback = file_update_callback + # general file attributes, *not* set from parameters + self._check_result = None + self._mime_type = None # processing flags - these express the *intended actions* for the file - self._check_type = PipelineFileCheckType.UNSET - self._is_deletion = is_deletion - self._late_deletion = late_deletion - self._publish_type = PipelineFilePublishType.UNSET self._should_archive = False self._should_harvest = False self._should_store = False @@ -244,8 +247,20 @@ def __init__(self, local_path, name=None, archive_path=None, dest_path=None, is_ self._is_harvest_undone = False self._is_upload_undone = False - self._check_result = None - self._mime_type = None + # attributes which must be assigned by the property setter for validation. The backing variable is intentionally + # initialised to a safe default, before the setter is called if the calling code has supplied a value for the + # corresponding parameters + self._file_update_callback = None + if file_update_callback is not None: + self.file_update_callback = file_update_callback + + self._check_type = PipelineFileCheckType.UNSET + if check_type is not None: + self.check_type = check_type + + self._publish_type = PipelineFilePublishType.UNSET + if publish_type is not None: + self.publish_type = publish_type @classmethod def from_remotepipelinefile(cls, remotepipelinefile, is_deletion=False): @@ -562,7 +577,7 @@ def _post_property_update(self, properties, include_values=True): message="{properties}".format(properties=log_output)) -class PipelineFileCollectionBase(MutableSet): +class PipelineFileCollectionBase(MutableSet, metaclass=abc.ABCMeta): """A collection base class which implements the MutableSet abstract base class to allow clean set operations, but limited to containing only :py:class:`PipelineFile` or :py:class:`RemotePipelineFile`elements and providing specific functionality for handling a collection of them (e.g. filtering, generating tabular data, etc.) @@ -571,30 +586,45 @@ class PipelineFileCollectionBase(MutableSet): path, or an :py:class:`Iterable` whose elements are :py:class:`PipelineFile` instances or file paths :type data: :py:class:`PipelineFile`, :py:class:`RemotePipelineFile`, :py:class:`str`, :py:class:`Iterable` """ - __slots__ = ['_s', 'member_class', 'member_validator', 'member_from_string_method', 'unique_attributes'] + __slots__ = ['_s'] - def __init__(self, data=None, member_class=PipelineFile, member_validator=None, member_from_string_method=None, - unique_attributes=()): + def __init__(self, data=None): super().__init__() self._s = IndexedSet() - self.member_class = member_class - self.member_validator = member_validator or validate_pipelinefile_or_string - self.member_from_string_method = getattr(self, member_from_string_method) or self.get_pipelinefile_from_src_path - self.unique_attributes = unique_attributes - if data is not None: if isinstance(data, (self.member_class, str)): data = [data] for f in data: self.add(f) + @property + @abc.abstractmethod + def member_class(cls): + raise NotImplementedError + + @property + @abc.abstractmethod + def member_from_string_method(self): + raise NotImplementedError + + @property + @abc.abstractmethod + def member_validator(cls): + raise NotImplementedError + + @property + @abc.abstractmethod + def unique_attributes(cls): + raise NotImplementedError + def __bool__(self): return bool(self._s) def __contains__(self, v): - return v in self._s + element = v if isinstance(v, self.member_class) else self.member_from_string_method(v) + return element in self._s def __getitem__(self, index): result = self._s[index] @@ -948,20 +978,27 @@ def validate_attribute_uniqueness(self, attribute): duplicates=duplicates)) +validate_remotepipelinefile_or_string = validate_type((RemotePipelineFile, str)) + + class RemotePipelineFileCollection(PipelineFileCollectionBase): """A PipelineFileCollectionBase subclass to hold a set of RemotePipelineFile instances """ + @classproperty + def member_class(cls): + return RemotePipelineFile - def __init__(self, *args, **kwargs): - kwargs['member_class'] = RemotePipelineFile - kwargs['member_validator'] = validate_remotepipelinefile_or_string - kwargs['member_from_string_method'] = 'get_pipelinefile_from_dest_path' - kwargs['unique_attributes'] = {'local_path', 'dest_path'} - super().__init__(*args, **kwargs) + @classproperty + def member_validator(cls): + return validate_remotepipelinefile_or_string - def __contains__(self, v): - element = v if isinstance(v, self.member_class) else self.get_pipelinefile_from_dest_path(v) - return element in self._s + @property + def member_from_string_method(self): + return self.get_pipelinefile_from_dest_path + + @classproperty + def unique_attributes(cls): + return 'local_path', 'dest_path' @classmethod def from_pipelinefilecollection(cls, pipelinefilecollection): @@ -974,6 +1011,8 @@ def download(self, broker, local_path): :param local_path: local path into which files are downloaded :return: None """ + warnings.warn("This method will be removed in a future version. From a pipeline handler, you should use " + "`self.state_query.download` instead.", DeprecationWarning) broker.download(self, local_path) def keys(self): @@ -981,20 +1020,27 @@ def keys(self): return self.get_attribute_list('dest_path') +validate_pipelinefile_or_string = validate_type((PipelineFile, str)) + + class PipelineFileCollection(PipelineFileCollectionBase): """A PipelineFileCollectionBase subclass to hold a set of PipelineFile instances """ + @classproperty + def member_class(cls): + return PipelineFile - def __init__(self, *args, **kwargs): - kwargs['member_class'] = PipelineFile - kwargs['member_validator'] = validate_pipelinefile_or_string - kwargs['member_from_string_method'] = 'get_pipelinefile_from_src_path' - kwargs['unique_attributes'] = {'archive_path', 'dest_path'} - super().__init__(*args, **kwargs) + @classproperty + def member_validator(cls): + return validate_pipelinefile_or_string - def __contains__(self, v): - element = v if isinstance(v, self.member_class) else self.get_pipelinefile_from_src_path(v) - return element in self._s + @property + def member_from_string_method(self): + return self.get_pipelinefile_from_src_path + + @classproperty + def unique_attributes(cls): + return 'archive_path', 'dest_path' @classmethod def from_remotepipelinefilecollection(cls, remotepipelinefilecollection, are_deletions=False): @@ -1134,9 +1180,9 @@ def set_publish_types_from_regexes(self, include_regexes, exclude_regexes, addit validate_pipelinefilecollection = validate_type(PipelineFileCollection) validate_pipelinefile_or_pipelinefilecollection = validate_type((PipelineFile, PipelineFileCollection)) -validate_pipelinefile_or_string = validate_type((PipelineFile, str)) + validate_remotepipelinefilecollection = validate_type(RemotePipelineFileCollection) validate_remotepipelinefile_or_remotepipelinefilecollection = validate_type((RemotePipelineFile, RemotePipelineFileCollection)) -validate_remotepipelinefile_or_string = validate_type((RemotePipelineFile, str)) + diff --git a/aodncore/pipeline/handlerbase.py b/aodncore/pipeline/handlerbase.py index 016d2306..a0ecadb1 100644 --- a/aodncore/pipeline/handlerbase.py +++ b/aodncore/pipeline/handlerbase.py @@ -12,7 +12,7 @@ from .destpath import get_path_function from .exceptions import (PipelineProcessingError, HandlerAlreadyRunError, InvalidConfigError, InvalidInputFileError, InvalidFileFormatError, MissingConfigParameterError, UnmatchedFilesError) -from .files import PipelineFile, PipelineFileCollection, ensure_remotepipelinefilecollection +from .files import PipelineFile, PipelineFileCollection from .log import SYSINFO, get_pipeline_logger from .schema import (validate_check_params, validate_custom_params, validate_harvest_params, validate_notify_params, validate_resolve_params) @@ -20,7 +20,7 @@ from .steps import (get_check_runner, get_harvester_runner, get_notify_runner, get_resolve_runner, get_store_runner) from ..util import (ensure_regex_list, ensure_writeonceordereddict, format_exception, get_file_checksum, iter_public_attributes, lazyproperty, matches_regexes, merge_dicts, - validate_relative_path_attr, TemporaryDirectory, DEFAULT_WFS_VERSION) + validate_relative_path_attr, TemporaryDirectory, WfsBroker, DEFAULT_WFS_VERSION) from ..version import __version__ as _aodncore_version __all__ = [ @@ -586,9 +586,9 @@ def state_query(self): :return: StateQuery instance :rtype: :py:class:`StateQuery` """ - return StateQuery(storage_broker=self._upload_store_runner.broker, - wfs_url=self.config.pipeline_config['global'].get('wfs_url'), - wfs_version=self.config.pipeline_config['global'].get('wfs_version', DEFAULT_WFS_VERSION)) + wfs_broker = WfsBroker(self.config.pipeline_config['global'].get('wfs_url'), + version=self.config.pipeline_config['global'].get('wfs_version', DEFAULT_WFS_VERSION)) + return StateQuery(storage_broker=self._upload_store_runner.broker, wfs_broker=wfs_broker) @property def default_addition_publish_type(self): @@ -1022,19 +1022,15 @@ def postprocess(self): # pragma: no cover # "public" methods # - def download_remotepipelinefilecollection(self, remotepipelinefilecollection, local_path=None): - """Helper method to download a RemotePipelineFileCollection or RemotePipelineFile using the handler's internal - storage broker + def add_pipelinefile(self, pipeline_file): + """Helper method to add a PipelineFile to the handler's file_collection, with the handler instance's + file_update_callback method assigned - :param remotepipelinefilecollection: RemotePipelineFileCollection to download - :param local_path: local path where files will be downloaded. Defaults to the handler's :attr:`temp_dir` value. + :param pipeline_file: PipelineFile object :return: None """ - remotepipelinefilecollection = ensure_remotepipelinefilecollection(remotepipelinefilecollection) - if local_path is None: - local_path = self.temp_dir - - remotepipelinefilecollection.download(self._upload_store_runner.broker, local_path) + self.file_collection.add(pipeline_file) + pipeline_file.file_update_callback = self._file_update_callback def run(self): """The entry point to the handler instance. Executes the automatic state machine transitions, and populates the diff --git a/aodncore/pipeline/statequery.py b/aodncore/pipeline/statequery.py index 2f4850fa..49c45658 100644 --- a/aodncore/pipeline/statequery.py +++ b/aodncore/pipeline/statequery.py @@ -1,4 +1,4 @@ -from ..util import WfsBroker, DEFAULT_WFS_VERSION +from .files import RemotePipelineFileCollection __all__ = [ 'StateQuery' @@ -9,22 +9,9 @@ class StateQuery(object): """Simple state query interface, to provide user friendly access for querying existing Pipeline state """ - def __init__(self, storage_broker, wfs_url, wfs_version=DEFAULT_WFS_VERSION): + def __init__(self, storage_broker, wfs_broker): self._storage_broker = storage_broker - self._wfs_url = wfs_url - self._wfs_version = wfs_version - - self._wfs_broker_object = None - - @property - def _wfs_broker(self): - if not self._wfs_url: - raise AttributeError('WFS querying unavailable: no wfs_url configured?') - - # lazy instantiation of broker to avoid any WFS activity unless a handler explicitly calls it - if self._wfs_broker_object is None: - self._wfs_broker_object = WfsBroker(self._wfs_url, version=self._wfs_version) - return self._wfs_broker_object + self._wfs_broker = wfs_broker @property def wfs(self): @@ -55,9 +42,9 @@ def query_wfs_urls_for_layer(self, layer, **kwargs): # pragma: no cover :param layer: layer name supplied to GetFeature typename parameter :param kwargs: keyword arguments passed to underlying broker method - :return: list of files for the layer + :return: RemotePipelineFileCollection containing list of files for the layer """ - return self._wfs_broker.query_urls_for_layer(layer, **kwargs) + return RemotePipelineFileCollection(self._wfs_broker.query_urls_for_layer(layer, **kwargs)) def query_wfs_url_exists(self, layer, name): # pragma: no cover """Returns a bool representing whether a given 'file_url' is present in a layer @@ -67,3 +54,13 @@ def query_wfs_url_exists(self, layer, name): # pragma: no cover :return: list of files for the layer """ return self._wfs_broker.query_url_exists_for_layer(layer, name) + + def download(self, remotepipelinefilecollection, local_path): + """Helper method to download a RemotePipelineFileCollection or RemotePipelineFile using the handler's internal + storage broker + + :param remotepipelinefilecollection: RemotePipelineFileCollection to download + :param local_path: local path where files will be downloaded. Defaults to the handler's :attr:`temp_dir` value. + :return: None + """ + self._storage_broker.download(remotepipelinefilecollection, local_path) diff --git a/aodncore/util/wfs.py b/aodncore/util/wfs.py index f828abdb..d886bce8 100644 --- a/aodncore/util/wfs.py +++ b/aodncore/util/wfs.py @@ -2,7 +2,7 @@ from collections import OrderedDict from owslib.etree import etree -from owslib.fes import PropertyIsEqualTo +from owslib.fes import PropertyIsEqualTo, OgcExpression from owslib.wfs import WebFeatureService from ..util import IndexedSet @@ -18,23 +18,24 @@ def ogc_filter_to_string(ogc_filter): - """Convert an OGC filter object into it's XML string representation + """Convert an OGC filter object into its XML string representation - :param ogc_filter: OGC filter object + :param ogc_filter: OGC filter object (or XML representation) :return: XML string """ - return etree.tostring(ogc_filter.toXML()).decode('utf-8') - + if isinstance(ogc_filter, OgcExpression): + return etree.tostring(ogc_filter.toXML()).decode('utf-8') + else: + return ogc_filter def get_filter_for_file_url(file_url, property_name='url'): """Return OGC filter XML to query for a single file_url :param file_url: URL string :param property_name: URL property name to filter on - :return: OGC XML filter string + :return: OGC filter expression """ - file_url_filter = PropertyIsEqualTo(propertyname=property_name, literal=file_url) - return ogc_filter_to_string(file_url_filter) + return PropertyIsEqualTo(propertyname=property_name, literal=file_url) class WfsBroker(object): @@ -57,12 +58,15 @@ def wfs(self): """ return self._wfs - def getfeature_dict(self, **kwargs): + def getfeature_dict(self, ogc_filter=None, **kwargs): """Make a GetFeature request, and return the response in a native dict + :param ogc_filter: OGC filter expression. If omitted, all features are returned. :param kwargs: keyword arguments passed to the underlying WebFeatureService.getfeature method :return: dict containing the parsed GetFeature response """ + if ogc_filter: + kwargs['filter'] = ogc_filter_to_string(ogc_filter) kwargs.pop('outputFormat', None) response = self.wfs.getfeature(outputFormat='json', **kwargs) response_body = response.getvalue() @@ -88,7 +92,7 @@ def query_urls_for_layer(self, layer, ogc_filter=None, url_property_name=None): """Return an IndexedSet of files for a given layer :param layer: layer name supplied to GetFeature typename parameter - :param ogc_filter: XML string represenation of an OGC filter expression. If omitted, all URLs are returned. + :param ogc_filter: OGC filter expression. If omitted, all URLs are returned. :param url_property_name: property name for file URL. If omitted, property name is determined from layer schema :return: list of files for the layer """ @@ -101,10 +105,7 @@ def query_urls_for_layer(self, layer, ogc_filter=None, url_property_name=None): 'propertyname': url_property_name } - if ogc_filter: - getfeature_kwargs['filter'] = ogc_filter - - parsed_response = self.getfeature_dict(**getfeature_kwargs) + parsed_response = self.getfeature_dict(ogc_filter=ogc_filter, **getfeature_kwargs) file_urls = IndexedSet(f['properties'][url_property_name] for f in parsed_response['features']) return file_urls diff --git a/test_aodncore/pipeline/test_dummyHandler.py b/test_aodncore/pipeline/test_dummyHandler.py index b862d55c..204e16ae 100644 --- a/test_aodncore/pipeline/test_dummyHandler.py +++ b/test_aodncore/pipeline/test_dummyHandler.py @@ -5,14 +5,13 @@ from jsonschema import ValidationError -from aodncore.pipeline import (PipelineFile, PipelineFileCheckType, PipelineFilePublishType, RemotePipelineFile, - RemotePipelineFileCollection, HandlerResult) +from aodncore.pipeline import PipelineFile, PipelineFileCheckType, PipelineFilePublishType, HandlerResult from aodncore.pipeline.exceptions import (AttributeValidationError, ComplianceCheckFailedError, HandlerAlreadyRunError, InvalidCheckSuiteError, InvalidInputFileError, InvalidFileFormatError, InvalidRecipientError, UnmatchedFilesError) from aodncore.pipeline.statequery import StateQuery from aodncore.pipeline.steps import NotifyList -from aodncore.testlib import DummyHandler, HandlerTestCase, NullStorageBroker, dest_path_testing, get_nonexistent_path +from aodncore.testlib import DummyHandler, HandlerTestCase, dest_path_testing, get_nonexistent_path from aodncore.util import WriteOnceOrderedDict from test_aodncore import TESTDATA_DIR @@ -403,7 +402,8 @@ def test_opendap_root(self): handler = self.run_handler(self.temp_nc_file) self.assertEqual(handler.opendap_root, 'http://opendap.example.com') - def test_state_query(self): + @patch('aodncore.util.wfs.WebFeatureService') + def test_state_query(self, mock_webfeatureservice): handler = self.handler_class(self.temp_nc_file) self.assertIsInstance(handler.state_query, StateQuery) @@ -412,36 +412,18 @@ def test_platform_vocab_helper(self): self.assertIsInstance(handler.platform_vocab_helper.platform_type_uris_by_category(), dict) self.assertEqual(handler.platform_vocab_helper.platform_altlabels_per_preflabel('Vessel')['9VUU'], 'Anro Asia') - def test_download_remotepipelinefilecollection_collection(self): - local_path = os.path.join(self.temp_dir, 'local_download_path') - broker = NullStorageBroker('') - - remote_collection = RemotePipelineFileCollection([ - RemotePipelineFile('dest/path/1.nc', name='1.nc'), - RemotePipelineFile('dest/path/2.nc', name='2.nc') - ]) - + def test_add_pipelinefile(self): + pf = PipelineFile(self.temp_nc_file) handler = self.handler_class(self.temp_nc_file) - handler._upload_store_runner.broker = broker - - handler.download_remotepipelinefilecollection(remote_collection, local_path) - local_paths = remote_collection.get_attribute_list('local_path') - expected = [os.path.join(local_path, rf.dest_path) for rf in remote_collection] - - broker.assert_download_call_count(1) - self.assertCountEqual(local_paths, expected) - - def test_download_remotepipelinefilecollection_file(self): - local_path = os.path.join(self.temp_dir, 'local_download_path') - broker = NullStorageBroker('') - remote_file = RemotePipelineFile('dest/path/1.nc', name='1.nc') + def _preprocess(self_): + self_.add_pipelinefile(pf) - handler = self.handler_class(self.temp_nc_file) - handler._upload_store_runner.broker = broker + handler.preprocess = partial(_preprocess, self_=handler) + handler.run() - handler.download_remotepipelinefilecollection(remote_file, local_path) - expected = os.path.join(local_path, remote_file.dest_path) + self.assertEqual(handler._file_update_callback, pf.file_update_callback) + self.assertIn(pf, handler.file_collection) - broker.assert_download_call_count(1) - self.assertEqual(remote_file.local_path, expected) + with self.assertRaises(TypeError): + handler.add_pipelinefile(1) diff --git a/test_aodncore/pipeline/test_files.py b/test_aodncore/pipeline/test_files.py index 1df58b87..cf0f6c75 100644 --- a/test_aodncore/pipeline/test_files.py +++ b/test_aodncore/pipeline/test_files.py @@ -91,6 +91,9 @@ def test_property_check_type(self): self.pipelinefile.check_type = test_value self.assertIs(self.pipelinefile.check_type, test_value) + pf = PipelineFile(GOOD_NC, check_type=PipelineFileCheckType.NC_COMPLIANCE_CHECK) + self.assertIs(pf.check_type, PipelineFileCheckType.NC_COMPLIANCE_CHECK) + with self.assertRaises(ValueError): self.pipelinefile.check_type = 'invalid' @@ -112,6 +115,9 @@ def test_property_publish_type(self): self.pipelinefile.publish_type = test_value self.assertIs(self.pipelinefile.publish_type, test_value) + pf = PipelineFile(GOOD_NC, publish_type=PipelineFilePublishType.ARCHIVE_ONLY) + self.assertIs(pf.publish_type, PipelineFilePublishType.ARCHIVE_ONLY) + with self.assertRaises(ValueError): self.pipelinefile.publish_type = 'invalid' diff --git a/test_aodncore/pipeline/test_statequery.py b/test_aodncore/pipeline/test_statequery.py index 24a38998..ac6c6b6f 100644 --- a/test_aodncore/pipeline/test_statequery.py +++ b/test_aodncore/pipeline/test_statequery.py @@ -1,14 +1,32 @@ +import json +import os +from unittest.mock import patch + +from aodncore.pipeline import RemotePipelineFileCollection, RemotePipelineFile, PipelineFile from aodncore.pipeline.statequery import StateQuery from aodncore.pipeline.storage import get_storage_broker from aodncore.testlib import BaseTestCase +from aodncore.util.wfs import WfsBroker +from test_aodncore import TESTDATA_DIR + +GOOD_NC = os.path.join(TESTDATA_DIR, 'good.nc') class TestStateQuery(BaseTestCase): - def setUp(self): - self.storage_broker = get_storage_broker(self.config.pipeline_config['global']['error_uri']) + @patch('aodncore.util.wfs.WebFeatureService') + def setUp(self, mock_webfeatureservice): + self.storage_broker = get_storage_broker(self.config.pipeline_config['global']['upload_uri']) + + self.wfs_broker = WfsBroker(self.config.pipeline_config['global']['wfs_url']) + + with open(os.path.join(TESTDATA_DIR, 'wfs/GetFeature.json')) as f: + self.wfs_broker.wfs.getfeature().getvalue.return_value = f.read() + + with open(os.path.join(TESTDATA_DIR, 'wfs/get_schema.json')) as f: + self.wfs_broker.wfs.get_schema.return_value = json.load(f) def test_no_wfs(self): - state_query = StateQuery(storage_broker=self.storage_broker, wfs_url=None) + state_query = StateQuery(storage_broker=self.storage_broker, wfs_broker=None) with self.assertRaises(AttributeError): _ = state_query.wfs @@ -18,3 +36,26 @@ def test_no_wfs(self): with self.assertRaises(AttributeError): _ = state_query.query_wfs_url_exists('', '') + + def test_wfs(self): + state_query = StateQuery(storage_broker=self.storage_broker, wfs_broker=self.wfs_broker) + response = state_query.query_wfs_urls_for_layer('anmn_velocity_timeseries_map') + + expected = RemotePipelineFileCollection([ + RemotePipelineFile( + 'IMOS/ANMN/QLD/GBROTE/Velocity/IMOS_ANMN-QLD_AETVZ_20140408T102930Z_GBROTE_FV01_GBROTE-1404-AWAC-13_END-20141022T052930Z_C-20150215T063708Z.nc'), + RemotePipelineFile( + 'IMOS/ANMN/NRS/NRSYON/Velocity/IMOS_ANMN-NRS_AETVZ_20110413T025900Z_NRSYON_FV01_NRSYON-1104-Workhorse-ADCP-27_END-20111014T222900Z_C-20150306T004801Z.nc') + ]) + + self.assertEqual(expected, response) + + def test_download_remotepipelinefilecollection(self): + state_query = StateQuery(storage_broker=self.storage_broker, wfs_broker=self.wfs_broker) + pipeline_file = PipelineFile(GOOD_NC, dest_path='dest/path/1.nc') + self.storage_broker.upload(pipeline_file) + + remote_file = RemotePipelineFile.from_pipelinefile(pipeline_file) + state_query.download(remote_file, local_path=self.temp_dir) + + self.assertTrue(os.path.exists(remote_file.local_path)) diff --git a/test_aodncore/util/test_wfs.py b/test_aodncore/util/test_wfs.py index 4b635365..5a1077c2 100644 --- a/test_aodncore/util/test_wfs.py +++ b/test_aodncore/util/test_wfs.py @@ -3,6 +3,7 @@ from unittest.mock import patch from owslib.etree import etree +from owslib.fes import PropertyIsEqualTo from aodncore.testlib import BaseTestCase from aodncore.util import IndexedSet @@ -13,14 +14,11 @@ class TestPipelineWfs(BaseTestCase): def test_get_filter_for_file_url(self): file_url = 'IMOS/test/file/url' - xml_filter = get_filter_for_file_url(file_url, property_name='file_url') + ogc_filter = get_filter_for_file_url(file_url, property_name='file_url') - root = etree.fromstring(xml_filter) - property_name = root.findtext('ogc:PropertyName', namespaces=root.nsmap) - literal = root.findtext('ogc:Literal', namespaces=root.nsmap) - - self.assertEqual(property_name, 'file_url') - self.assertEqual(literal, file_url) + self.assertIsInstance(ogc_filter, PropertyIsEqualTo) + self.assertEqual(ogc_filter.propertyname, 'file_url') + self.assertEqual(ogc_filter.literal, file_url) class TestWfsBroker(BaseTestCase):