Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 64 additions & 59 deletions aodncore/pipeline/handlerbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ class HandlerBase(object):
ARCHIVE_URI.
:type archive_path_function: str, function

:param celery_task: A Celery task object, in order for the handler instance to derive runtime information such as
the current task name and UUID.

.. note:: If absent (e.g. when unit testing), the handler will revert to having no task information available,
and will log output to standard output.
:type celery_task: :py:class:`celery.Task`

:param check_params: A dict containing parameters passed directly to the check step (e.g. compliance checker
suites). The structure of the dict is defined by the :const:`CHECK_PARAMS_SCHEMA` object in the
:py:mod:`aodncore.pipeline.schema` module.
Expand Down Expand Up @@ -154,11 +147,31 @@ class HandlerBase(object):

:type include_regexes: list

:param logger: a Logger instance.

.. note:: If omitted(e.g. when run from unit tests, a basic root Logger will be created)

:type logger: :py:class:`Logger`

:param notify_params: A dict containing parameters passed directly to the :py:mod:`aodncore.pipeline.steps.notify`
step (e.g. owner/success/failure notify lists). The structure of the dict is defined by the
:py:const:`NOTIFY_PARAMS_SCHEMA` object in the :py:mod:`aodncore.pipeline.schema` module.
:type notify_params: :py:class:`dict`

:param pipeline_name: name of the pipeline executing the handler instance
:type pipeline_name: :py:class:`str`

:param state_change_callback: callable object which will be executed on every state change with the new state
(`self.state`) as the parameter
:type notify_params: :py:class:`callable`

:param task_id: A task ID, in order for the handler instance to derive runtime information such as
the current task name and UUID.

.. note:: If absent (e.g. when unit testing), the handler will revert to having no task information available,
and will log output to standard output.
:type task_id: :py:class:`str`

:param upload_path: A string attribute to hold the original upload path of the :py:attr:`input_file`.

.. note:: This is intended for information purposes only (e.g. to appear in notification templates), since there
Expand Down Expand Up @@ -277,7 +290,7 @@ def __init__(self, input_file,
allowed_regexes=None,
archive_input_file=False,
archive_path_function=None,
celery_task=None,
task_id=None,
check_params=None,
config=None,
custom_params=None,
Expand All @@ -287,9 +300,12 @@ def __init__(self, input_file,
harvest_params=None,
harvest_type='talend',
include_regexes=None,
logger=None,
notify_params=None,
upload_path=None,
resolve_params=None
pipeline_name='NO_PIPELINE',
resolve_params=None,
state_change_callback=None,
upload_path=None
):

# property backing variables
Expand All @@ -307,12 +323,14 @@ def __init__(self, input_file,
self._include_regexes = None
self._input_file_archive_path = None
self._instance_working_directory = None
self._notification_results = None
self._is_archived = False
self._logger = None
self._logger = logger
self._notification_results = None
self._pipeline_name = pipeline_name
self._result = HandlerResult.UNKNOWN
self._should_notify = None
self._start_time = datetime.now()
self._task_id = task_id

# public attributes
self.input_file = input_file
Expand All @@ -322,7 +340,6 @@ def __init__(self, input_file,
self.allowed_regexes = allowed_regexes
self.archive_input_file = archive_input_file
self.archive_path_function = archive_path_function
self.celery_task = celery_task
self.check_params = check_params
self.custom_params = custom_params
self.config = config
Expand All @@ -333,6 +350,7 @@ def __init__(self, input_file,
self.harvest_type = harvest_type
self.include_regexes = include_regexes
self.notify_params = notify_params
self.state_change_callback = state_change_callback
self.upload_path = upload_path
self.resolve_params = resolve_params

Expand All @@ -348,8 +366,8 @@ def __init__(self, input_file,
after_state_change='_after_state_change')

def __iter__(self):
ignored_attributes = {'celery_task', 'config', 'default_addition_publish_type', 'default_deletion_publish_type',
'input_file_object', 'logger', 'state', 'state_query', 'trigger'}
ignored_attributes = {'config', 'default_addition_publish_type', 'default_deletion_publish_type',
'input_file_object', 'logger', 'state', 'state_change_callback', 'state_query', 'trigger'}
ignored_attributes.update("is_{state}".format(state=s) for s in self.all_states)

return iter_public_attributes(self, ignored_attributes)
Expand All @@ -362,22 +380,22 @@ def __str__(self):
#

@property
def celery_task_id(self):
"""Read-only property to access Celery task ID
def task_id(self):
"""Read-only property to access task ID

:return: Celery task ID (if applicable)
:return: task ID (if applicable)
:rtype: :class:`str`, :class:`None`
"""
return self._celery_task_id
return self._task_id

@property
def celery_task_name(self):
"""Read-only property to access Celery task name
def pipeline_name(self):
"""Read-only property to access pipeline name

:return: Celery task name (if applicable)
:return: pipeline name (if applicable)
:rtype: :class:`str`, :class:`None`
"""
return self._celery_task_name
return self._pipeline_name

@property
def config(self):
Expand Down Expand Up @@ -499,7 +517,7 @@ def input_file_archive_path(self):
:rtype: :class:`str`
"""
if not self._input_file_archive_path:
self.input_file_archive_path = os.path.join(self._pipeline_name, os.path.basename(self.input_file))
self.input_file_archive_path = os.path.join(self.pipeline_name, os.path.basename(self.input_file))
return self._input_file_archive_path

@input_file_archive_path.setter
Expand All @@ -521,10 +539,12 @@ def input_file_object(self):
def logger(self):
"""Read-only property to access the instance Logger

If a logger parameter was not supplied (e.g. in a unittest or IDE), a basic default logger will be instantiated.

:return: :py:class:`Logger`
"""
if self._logger is None:
self._init_logger()
self._init_default_logger()
return self._logger

@lazyproperty
Expand Down Expand Up @@ -832,8 +852,8 @@ def _complete_with_errors(self):
def _after_state_change(self):
self.logger.sysinfo(
"{self.__class__.__name__} transitioned to state: {self.state}".format(self=self))
if self.celery_task is not None:
self.celery_task.update_state(state=self.state)
if self.state_change_callback is not None:
self.state_change_callback(self.state)

def _file_update_callback(self, **kwargs):
raw_name = kwargs.get('name')
Expand All @@ -853,38 +873,23 @@ def _check_input_file_name(self):
raise InvalidInputFileError("input file '{self.file_basename}' does not match any patterns "
"in the allowed_regexes list: {self.allowed_regexes}".format(self=self))

def _init_logger(self):
try:
celery_task_id = self.celery_task.request.id
celery_task_name = self.celery_task.name
pipeline_name = self.celery_task.pipeline_name
self._logger = self.celery_task.logger
except AttributeError as e:
# the absence of a celery task indicates we're in a unittest or IDE, so fall-back to basic logging config
celery_task_id = None
celery_task_name = 'NO_TASK'
pipeline_name = 'NO_PIPELINE'
logging.basicConfig(level=FALLBACK_LOG_LEVEL, format=FALLBACK_LOG_FORMAT)

logging_extra = {
'celery_task_id': celery_task_id,
'celery_task_name': celery_task_name,
'pipeline_name': pipeline_name
}
logger = get_pipeline_logger('', logging_extra)

# turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key
liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN')
for lib in ('botocore', 'paramiko', 's3transfer', 'transitions'):
logging.getLogger(lib).setLevel(liblevel)

logger.warning('no logger parameter or celery task found, falling back to root logger')
logger.debug('_init_logging exception: {e}'.format(e=e))
self._logger = logger

self._celery_task_id = celery_task_id
self._celery_task_name = celery_task_name
self._pipeline_name = pipeline_name
def _init_default_logger(self):
logging.basicConfig(level=FALLBACK_LOG_LEVEL, format=FALLBACK_LOG_FORMAT)

logging_extra = {
'task_id': self.task_id,
'celery_task_id': self.task_id, # deprecated name, retained for backwards compatibility
'pipeline_name': self.pipeline_name
}
logger = get_pipeline_logger(None, extra=logging_extra)

# turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key
liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN')
for lib in ('botocore', 'paramiko', 's3transfer', 'transitions'):
logging.getLogger(lib).setLevel(liblevel)

logger.warning('no logger attribute found, falling back to root logger')
self._logger = logger

def _init_working_directory(self):
for subdirectory in ('collection', 'products', 'temp'):
Expand Down
14 changes: 10 additions & 4 deletions aodncore/pipeline/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ def run(self, incoming_file):
try:
logging.config.dictConfig(config.get_worker_logging_config(task_name))
logging_extra = {
'celery_task_id': self.request.id,
'celery_task_name': task_name
'task_id': self.request.id,
'celery_task_id': self.request.id, # deprecated name, retained for backwards compatibility
'pipeline_name': pipeline_name
}
self.logger = get_pipeline_logger(task_name, extra=logging_extra, logger_function=get_task_logger)

Expand All @@ -209,8 +210,13 @@ def run(self, incoming_file):
file_state_manager.move_to_processing()

try:
handler = handler_class(file_state_manager.processing_path, celery_task=self, config=config,
upload_path=file_state_manager.relative_path, **kwargs)
handler = handler_class(file_state_manager.processing_path,
task_id=self.request.id,
config=config,
logger=self.logger,
state_change_callback=self.celery_task.update_state,
upload_path=file_state_manager.relative_path,
**kwargs)
except Exception as e:
file_state_manager.move_to_error()
self.logger.error("failed to instantiate handler class: {e}".format(e=format_exception(e)))
Expand Down
2 changes: 1 addition & 1 deletion aodncore/testlib/conf/pipeline.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"logging": {
"level": "SYSINFO",
"pipeline_format": "%(asctime)s %(levelname)s %(name)s[%(celery_task_id)s] %(message)s",
"pipeline_format": "%(asctime)s %(levelname)s %(name)s[%(task_id)s] %(message)s",
"log_root": "",
"watchservice_format": "%(asctime)s %(levelname)s [%(name)s] %(message)s"
},
Expand Down
2 changes: 1 addition & 1 deletion test_aodncore/pipeline/test_configlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
},
'logging': {
'level': 'SYSINFO',
'pipeline_format': '%(asctime)s %(levelname)s %(name)s[%(celery_task_id)s] %(message)s',
'pipeline_format': '%(asctime)s %(levelname)s %(name)s[%(task_id)s] %(message)s',
'log_root': '',
'watchservice_format': '%(asctime)s %(levelname)s [%(name)s] %(message)s'
},
Expand Down