diff --git a/logstash_async/database.py b/logstash_async/database.py index 7860976..0de52e8 100644 --- a/logstash_async/database.py +++ b/logstash_async/database.py @@ -11,7 +11,6 @@ from logstash_async.constants import constants from logstash_async.utils import ichunked - DATABASE_SCHEMA_STATEMENTS = [ ''' CREATE TABLE IF NOT EXISTS `event` ( @@ -29,6 +28,10 @@ class DatabaseLockedError(Exception): pass +class DatabaseDiskIOError(Exception): + pass + + class DatabaseCache(Cache): """ Backend implementation for python-logstash-async. Keeps messages on disk in a SQL-lite DB @@ -47,8 +50,8 @@ def __init__(self, path, event_ttl=None): @contextmanager def _connect(self): - self._open() try: + self._open() with self._connection as connection: yield connection except sqlite3.OperationalError: @@ -96,6 +99,12 @@ def _handle_sqlite_error(self): _, exc, _ = sys.exc_info() if str(exc) == 'database is locked': raise DatabaseLockedError from exc + if str(exc) == 'disk I/O error': + raise DatabaseDiskIOError from exc + if str(exc) == "unable to open database file": + raise DatabaseDiskIOError from exc + if str(exc) == "attempt to write a readonly database": + raise DatabaseDiskIOError from exc # ---------------------------------------------------------------------- def get_queued_events(self): diff --git a/logstash_async/worker.py b/logstash_async/worker.py index 4677b3b..6826105 100644 --- a/logstash_async/worker.py +++ b/logstash_async/worker.py @@ -14,7 +14,7 @@ from limits.strategies import FixedWindowRateLimiter from logstash_async.constants import constants -from logstash_async.database import DatabaseCache, DatabaseLockedError +from logstash_async.database import DatabaseCache, DatabaseLockedError, DatabaseDiskIOError from logstash_async.memory_cache import MemoryCache from logstash_async.utils import safe_log_via_print @@ -128,7 +128,7 @@ def _fetch_events(self): self._flush_queued_events(force=force_flush) self._delay_processing() self._expire_events() - except (DatabaseLockedError, ProcessingError): + except (DatabaseLockedError, ProcessingError, DatabaseDiskIOError): if self._shutdown_requested(): return @@ -150,6 +150,13 @@ def _process_event(self): self._queue.qsize(), exc=exc) raise + except DatabaseDiskIOError as exc: + self._safe_log( + 'debug', + 'Disk I/O error, will try again later (queue length %d)', + self._queue.qsize(), + exc=exc) + raise except Exception as exc: self._log_processing_error(exc) raise ProcessingError from exc @@ -160,7 +167,7 @@ def _process_event(self): def _expire_events(self): try: self._database.expire_events() - except DatabaseLockedError: + except (DatabaseLockedError, DatabaseDiskIOError): # Nothing to handle, if it fails, we will either successfully publish # these messages next time or we will delete them on the next pass. pass @@ -242,6 +249,13 @@ def _fetch_queued_events_for_flush(self): 'Database is locked, will try again later (queue length %d)', self._queue.qsize(), exc=exc) + except DatabaseDiskIOError as exc: + self._safe_log( + 'debug', + 'Disk I/O error, will try again later (queue length %d)', + self._queue.qsize(), + exc=exc) + raise except Exception as exc: # just log the exception and hope we can recover from the error self._safe_log('exception', 'Error retrieving queued events: %s', exc, exc=exc) @@ -252,7 +266,7 @@ def _fetch_queued_events_for_flush(self): def _delete_queued_events_from_database(self): try: self._database.delete_queued_events() - except DatabaseLockedError: + except (DatabaseLockedError, DatabaseDiskIOError): pass # nothing to handle, if it fails, we delete those events in a later run # ---------------------------------------------------------------------- diff --git a/tests/database_test.py b/tests/database_test.py index 52fbdc4..bd14ba8 100644 --- a/tests/database_test.py +++ b/tests/database_test.py @@ -7,16 +7,16 @@ import sqlite3 import time import unittest +from stat import S_IREAD, S_IRGRP, S_IROTH, S_IWUSR from logstash_async.constants import constants -from logstash_async.database import DATABASE_SCHEMA_STATEMENTS, DatabaseCache +from logstash_async.database import DATABASE_SCHEMA_STATEMENTS, DatabaseCache, DatabaseDiskIOError # pylint: disable=protected-access class DatabaseCacheTest(unittest.TestCase): - TEST_DB_FILENAME = "test.db" _connection = None @@ -63,6 +63,15 @@ def close_connection(cls): cls._connection.close() cls._connection = None + # ---------------------------------------------------------------------- + def testIOException(self): + self.cache.add_event("message") + with self.assertRaises(DatabaseDiskIOError): + # change permissions to produce error + os.chmod(os.path.abspath("test.db"), S_IREAD | S_IRGRP | S_IROTH) + self.cache.add_event("message") + os.chmod(os.path.abspath("test.db"), S_IWUSR | S_IREAD) + # ---------------------------------------------------------------------- def test_add_event(self): self.cache.add_event("message")