Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions aperturedb/ConnectionPool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import threading
import queue
from contextlib import contextmanager

from aperturedb.CommonLibrary import create_connector


class ConnectionPool:
"""
A thread-safe connection pool for aperturedb.Connector.

This pool manages a fixed number of Connector instances, allowing multiple
threads to safely execute queries by borrowing and returning connections.
"""

def __init__(self, pool_size: int = 10, connection_factory=create_connector):
"""
Initializes the connection pool.

Args:
pool_size (int): The number of connections to keep in the pool.
connection_factory (callable): A factory function to create new connections.
"""
if pool_size <= 0:
raise ValueError("Pool size must be greater than 0.")

self._pool_size = pool_size
self._connection_factory = connection_factory
# A thread-safe queue to hold the available connections
self._pool = queue.Queue(maxsize=pool_size)

# A lock to ensure the initial population is thread-safe, just in case.
self._lock = threading.Lock()

Comment on lines +32 to +34
# Pre-populate the pool with connections
for _ in range(pool_size):
try:
connection = self._connection_factory()
if not connection:
raise ConnectionError("Failed to create a connection.")
self._pool.put(connection)
except Exception as e:
print(f"Failed to create a connection for the pool: {e}")
# Depending on requirements, you might want to raise an error
# if the pool cannot be fully populated.

if self.available() == 0:
raise ConnectionError(
"Failed to initialize any connections for the pool. "
"Please check connection parameters and network."
)

def available(self) -> int:
"""Returns the number of available connections in the pool."""
return self._pool.qsize()

def total(self) -> int:
"""Returns the total number of connections in the pool."""
return self._pool_size

@contextmanager
def get_connection(self):
"""
A context manager to get a connection from the pool.
This is the recommended way to use a connection.
It automatically gets a connection and releases it back to the pool.

Usage:
with pool.get_connection() as conn:
conn.query(...)
"""
# The get() call will block until a connection is available.
connection = self._pool.get()
try:
# Yield the connection for the user to use
yield connection
finally:
# This block is guaranteed to execute, ensuring the connection
# is always returned to the pool.
self._pool.put(connection)

def query(self, query: str, blobs: list = [], **kwargs):
"""
A convenience method to execute a query directly from the pool.

This method handles getting a connection, executing the query,
and returning the connection to the pool.

Args:
query (str): The query string to execute.
blobs (list): A list of blobs to include with the query.
**kwargs: Other arguments for the Connector's query method.

Returns:
Response from the executed query.
Blobs
"""
with self.get_connection() as connection:
return connection.query(query, blobs, **kwargs)
Comment on lines +82 to +99
43 changes: 43 additions & 0 deletions test/test_ConnectionPool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest
from unittest.mock import MagicMock
from aperturedb.ConnectionPool import ConnectionPool

def test_connection_pool_initialization():
mock_factory = MagicMock()
mock_connection = MagicMock()
mock_factory.return_value = mock_connection

pool = ConnectionPool(pool_size=3, connection_factory=mock_factory)

assert pool.total() == 3
assert pool.available() == 3
assert mock_factory.call_count == 3

def test_get_connection():
mock_factory = MagicMock()
mock_connection = MagicMock()
mock_factory.return_value = mock_connection

pool = ConnectionPool(pool_size=1, connection_factory=mock_factory)

assert pool.available() == 1

with pool.get_connection() as conn:
assert pool.available() == 0
assert conn == mock_connection

assert pool.available() == 1

def test_query():
mock_factory = MagicMock()
mock_connection = MagicMock()
mock_connection.query.return_value = ("response", [])
mock_factory.return_value = mock_connection

pool = ConnectionPool(pool_size=1, connection_factory=mock_factory)

response, blobs = pool.query("some query", [1, 2, 3], arg=True)

assert response == "response"
assert blobs == []
mock_connection.query.assert_called_once_with("some query", [1, 2, 3], arg=True)
Comment on lines +39 to +43
Loading