Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions logstash_async/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from logstash_async.constants import constants
from logstash_async.utils import ichunked


DATABASE_SCHEMA_STATEMENTS = [
'''
CREATE TABLE IF NOT EXISTS `event` (
Expand All @@ -29,6 +28,10 @@ class DatabaseLockedError(Exception):
pass


class DatabaseDiskIOError(Exception):
pass


class DatabaseCache(Cache):
"""
Backend implementation for python-logstash-async. Keeps messages on disk in a SQL-lite DB
Expand All @@ -47,8 +50,8 @@ def __init__(self, path, event_ttl=None):

@contextmanager
def _connect(self):
self._open()
try:
self._open()
with self._connection as connection:
yield connection
except sqlite3.OperationalError:
Expand Down Expand Up @@ -96,6 +99,12 @@ def _handle_sqlite_error(self):
_, exc, _ = sys.exc_info()
if str(exc) == 'database is locked':
raise DatabaseLockedError from exc
if str(exc) == 'disk I/O error':
Comment thread
eht16 marked this conversation as resolved.
raise DatabaseDiskIOError from exc
if str(exc) == "unable to open database file":
raise DatabaseDiskIOError from exc
if str(exc) == "attempt to write a readonly database":
raise DatabaseDiskIOError from exc

# ----------------------------------------------------------------------
def get_queued_events(self):
Expand Down
22 changes: 18 additions & 4 deletions logstash_async/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from limits.strategies import FixedWindowRateLimiter

from logstash_async.constants import constants
from logstash_async.database import DatabaseCache, DatabaseLockedError
from logstash_async.database import DatabaseCache, DatabaseLockedError, DatabaseDiskIOError
from logstash_async.memory_cache import MemoryCache
from logstash_async.utils import safe_log_via_print

Expand Down Expand Up @@ -128,7 +128,7 @@ def _fetch_events(self):
self._flush_queued_events(force=force_flush)
self._delay_processing()
self._expire_events()
except (DatabaseLockedError, ProcessingError):
except (DatabaseLockedError, ProcessingError, DatabaseDiskIOError):
if self._shutdown_requested():
return

Expand All @@ -150,6 +150,13 @@ def _process_event(self):
self._queue.qsize(),
exc=exc)
raise
except DatabaseDiskIOError as exc:
self._safe_log(
'debug',
'Disk I/O error, will try again later (queue length %d)',
self._queue.qsize(),
exc=exc)
raise
except Exception as exc:
self._log_processing_error(exc)
raise ProcessingError from exc
Expand All @@ -160,7 +167,7 @@ def _process_event(self):
def _expire_events(self):
try:
self._database.expire_events()
except DatabaseLockedError:
except (DatabaseLockedError, DatabaseDiskIOError):
# Nothing to handle, if it fails, we will either successfully publish
# these messages next time or we will delete them on the next pass.
pass
Expand Down Expand Up @@ -242,6 +249,13 @@ def _fetch_queued_events_for_flush(self):
'Database is locked, will try again later (queue length %d)',
self._queue.qsize(),
exc=exc)
except DatabaseDiskIOError as exc:
self._safe_log(
'debug',
'Disk I/O error, will try again later (queue length %d)',
self._queue.qsize(),
exc=exc)
raise
except Exception as exc:
# just log the exception and hope we can recover from the error
self._safe_log('exception', 'Error retrieving queued events: %s', exc, exc=exc)
Expand All @@ -252,7 +266,7 @@ def _fetch_queued_events_for_flush(self):
def _delete_queued_events_from_database(self):
try:
self._database.delete_queued_events()
except DatabaseLockedError:
except (DatabaseLockedError, DatabaseDiskIOError):
pass # nothing to handle, if it fails, we delete those events in a later run

# ----------------------------------------------------------------------
Expand Down
13 changes: 11 additions & 2 deletions tests/database_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import sqlite3
import time
import unittest
from stat import S_IREAD, S_IRGRP, S_IROTH, S_IWUSR

from logstash_async.constants import constants
from logstash_async.database import DATABASE_SCHEMA_STATEMENTS, DatabaseCache
from logstash_async.database import DATABASE_SCHEMA_STATEMENTS, DatabaseCache, DatabaseDiskIOError


# pylint: disable=protected-access


class DatabaseCacheTest(unittest.TestCase):

TEST_DB_FILENAME = "test.db"
_connection = None

Expand Down Expand Up @@ -63,6 +63,15 @@ def close_connection(cls):
cls._connection.close()
cls._connection = None

# ----------------------------------------------------------------------
def testIOException(self):
Comment thread
eht16 marked this conversation as resolved.
self.cache.add_event("message")
with self.assertRaises(DatabaseDiskIOError):
# change permissions to produce error
os.chmod(os.path.abspath("test.db"), S_IREAD | S_IRGRP | S_IROTH)
self.cache.add_event("message")
os.chmod(os.path.abspath("test.db"), S_IWUSR | S_IREAD)

# ----------------------------------------------------------------------
def test_add_event(self):
self.cache.add_event("message")
Expand Down