Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
0227e9b
Merge pull request #655 from aperture-data/release-0.4.56
luisremis Apr 22, 2026
286bbd9
Merge pull request #660 from aperture-data/release-0.4.57
luisremis Apr 23, 2026
ab9dbd8
Merge pull request #664 from aperture-data/release-0.4.58
luisremis Apr 23, 2026
5389028
fix: update pre-commit CI to use python 3.12 (#676)
ad-claw000 May 18, 2026
893a4eb
test: add initial suite of tests for Images.py (#674)
ad-claw000 May 19, 2026
c1f511a
feat: Add ConnectionPool class for thread-safe connections
ad-claw000 May 4, 2026
ada265c
style: run pre-commit autopep8
ad-claw000 May 4, 2026
b57c59b
fix(tests): use create_connection factory for ConnectionPool tests
ad-claw000 May 4, 2026
cac3fdb
fix(tests): resolve pre-commit autopep8
ad-claw000 May 4, 2026
4c3c7bb
fix(tests): resolve failing ConnectionPool test setup
ad-claw000 May 4, 2026
b8717e8
style: run pre-commit autopep8
ad-claw000 May 4, 2026
5028cc1
test(ConnectionPool): remove invalid import and use unittest.TestCase
ad-claw000 May 4, 2026
710414b
style: autopep8 fixes
ad-claw000 May 4, 2026
156805e
style: autopep8 remaining test and example files
ad-claw000 May 4, 2026
b721fac
test: add sleep to connection pool tests to reduce flakiness
ad-claw000 May 6, 2026
d0df03f
style: autopep8 test_ConnectionPool.py
ad-claw000 May 6, 2026
823ddc7
test: increase connection pool sleep interval
ad-claw000 May 7, 2026
b6f7414
style: fix autopep8 empty line violation in test_ConnectionPool.py
ad-claw000 May 7, 2026
2f87610
test: increase connection pool sleep interval
ad-claw000 May 7, 2026
b01f44e
style: fix autopep8 empty line violation in test_ConnectionPool.py
ad-claw000 May 7, 2026
16a6069
test: further increase connection pool sleep interval
ad-claw000 May 7, 2026
0a2913b
style: fix autopep8 empty line violation in test_ConnectionPool.py
ad-claw000 May 7, 2026
5bda154
test: instantiate Connector using dbinfo for test_ConnectionPool
ad-claw000 May 7, 2026
4d0d51f
trigger ci
ad-claw000 May 19, 2026
3757d30
fix(ConnectionPool): address PR review comments
ad-claw000 May 19, 2026
6a6f537
style: fix autopep8 violations in ConnectionPool
ad-claw000 May 19, 2026
8b788b2
trigger ci
ad-claw000 May 19, 2026
37417bf
style: revert unrelated formatting changes per review
ad-claw000 May 19, 2026
ac22ace
refactor: remove unused lock
ad-claw000 May 19, 2026
4bca587
fix: Combined logger exception/warning (#684)
ad-claw000 May 19, 2026
72b21fb
docs: fixup reference links (#685)
ad-claw000 May 19, 2026
3f8767f
feat(connector): add boolean parameter to connect at construction (#681)
ad-claw000 May 19, 2026
5f44696
fix(ConnectionPool): Address PR review comments
ad-claw000 May 19, 2026
39c70f6
style: pre-commit autopep8 fixes
ad-claw000 May 19, 2026
fc52842
Merge branch 'develop' into feat/598-connection-pool
luisremis May 20, 2026
09b3b08
fix: address review comments on ConnectionPool
ad-claw000 May 20, 2026
f632f74
Fix pre-commit issues
ad-claw000 May 20, 2026
a3bd500
fix: report correct pool size and fix string formatting when draining
ad-claw000 May 20, 2026
ee1305d
Merge branch 'develop' into feat/598-connection-pool
luisremis May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions aperturedb/ConnectionPool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import threading
import queue
import logging
from contextlib import contextmanager

from aperturedb.CommonLibrary import create_connector

logger = logging.getLogger(__name__)


class ConnectionPool:
Comment thread
luisremis marked this conversation as resolved.
"""
A thread-safe connection pool for aperturedb.Connector.

This pool manages a fixed number of Connector instances, allowing multiple
threads to safely execute queries by borrowing and returning connections.
"""

def __init__(self, pool_size: int = 10, connection_factory=create_connector):
"""
Initializes the connection pool.

Args:
pool_size (int): The number of connections to keep in the pool.
connection_factory (callable): A factory function to create new connections.
"""
if pool_size <= 0:
raise ValueError("Pool size must be greater than 0.")

self._pool_size = pool_size
self._connection_factory = connection_factory
# A thread-safe queue to hold the available connections
self._pool = queue.Queue(maxsize=pool_size)

# Pre-populate the pool with connections
last_error = None
for _ in range(pool_size):
try:
connection = self._connection_factory()
if not connection:
raise ConnectionError("Failed to create a connection.")
self._pool.put(connection)
except Exception as e:
logger.error(
f"Failed to create a connection for the pool: {e}")
last_error = e
break
Comment thread
luisremis marked this conversation as resolved.

if self._pool.qsize() < pool_size:
created = self._pool.qsize()
# Drain any successfully created connections to prevent leaks
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
if hasattr(conn, 'close'):
conn.close()
except queue.Empty:
Comment on lines +52 to +57
break
msg = (
f"Failed to initialize pool: expected {pool_size} "
f"connections, got {created}."
)
raise ConnectionError(msg) from last_error

def available(self) -> int:
"""Returns the number of available connections in the pool."""
return self._pool.qsize()

def total(self) -> int:
"""Returns the total number of connections in the pool."""
return self._pool_size
Comment thread
luisremis marked this conversation as resolved.
Comment thread
luisremis marked this conversation as resolved.

@contextmanager
def get_connection(self, timeout=None):
"""
A context manager to get a connection from the pool.
This is the recommended way to use a connection.
It automatically gets a connection and releases it back to the pool.

Args:
timeout (float, optional): How long to wait for a connection to become available before raising TimeoutError.

Note:
Callers are responsible for the connection's health. If an exception occurs
within the context manager, the connection is still returned to the pool.

Usage:
with pool.get_connection() as conn:
conn.query(...)
"""
try:
connection = self._pool.get(timeout=timeout)
except queue.Empty:
raise TimeoutError(
f"No connection available in the pool within the specified timeout ({timeout}s).")

try:
# Yield the connection for the user to use
yield connection
finally:
# This block is guaranteed to execute, ensuring the connection
# is always returned to the pool unless an exception occurred.
Comment on lines +101 to +102
self._pool.put(connection)
Comment on lines +97 to +103
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 09b3b08


def query(self, query, blobs: list = None):
"""
A convenience method to execute a query directly from the pool.

This method handles getting a connection, executing the query,
and returning the connection to the pool.

Args:
query: The query string or list to execute.
blobs (list): A list of blobs to include with the query.

Returns:
Response from the executed query.
Blobs
"""
if blobs is None:
blobs = []
with self.get_connection() as connection:
return connection.query(query, blobs)

def close(self):
"""
Closes all connections in the pool.
"""
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
if hasattr(conn, 'close'):
conn.close()
except queue.Empty:
break
Comment on lines +125 to +135
86 changes: 86 additions & 0 deletions test/test_ConnectionPool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import unittest
import threading
from aperturedb.ConnectionPool import ConnectionPool
from aperturedb.Connector import Connector
try:
from . import dbinfo
except ImportError:
import dbinfo


def _make_connector():
return Connector(
host=dbinfo.DB_TCP_HOST,
port=dbinfo.DB_TCP_PORT,
user=dbinfo.DB_USER,
password=dbinfo.DB_PASSWORD,
use_ssl=True,
ca_cert=dbinfo.CA_CERT,
verify_hostname=dbinfo.VERIFY_HOSTNAME,
retry_max_attempts=3,
retry_interval_seconds=0
)


class TestConnectionPool(unittest.TestCase):
def setUp(self):
pass

Comment thread
luisremis marked this conversation as resolved.
Comment thread
luisremis marked this conversation as resolved.
def test_pool_initialization(self):
pool = ConnectionPool(
pool_size=3, connection_factory=_make_connector)
self.assertEqual(pool.total(), 3)
self.assertEqual(pool.available(), 3)
Comment on lines +25 to +33

def test_pool_borrow_return(self):
pool = ConnectionPool(
pool_size=2, connection_factory=_make_connector)
with pool.get_connection() as conn:
self.assertEqual(pool.available(), 1)
# Test a simple query to ensure the connection is real
response, _ = conn.query([{"GetStatus": {}}])
self.assertTrue(isinstance(response, list))

# Should be returned to the pool
self.assertEqual(pool.available(), 2)

def test_pool_convenience_query(self):
pool = ConnectionPool(
pool_size=1, connection_factory=_make_connector)
response, blobs = pool.query([{"GetStatus": {}}])
self.assertTrue(isinstance(response, list))
self.assertTrue(isinstance(blobs, list))

def test_pool_concurrency(self):
pool = ConnectionPool(
pool_size=5, connection_factory=_make_connector)
results = []

def worker():
try:
res, _ = pool.query([{"GetStatus": {}}])
results.append(res)
except Exception as e:
results.append(e)

threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()

self.assertEqual(len(results), 10)
Comment thread
luisremis marked this conversation as resolved.
for r in results:
self.assertTrue(isinstance(r, list),
f"Worker failed with exception: {r}")

def test_pool_timeout(self):
pool = ConnectionPool(pool_size=1, connection_factory=_make_connector)
with pool.get_connection():
with self.assertRaises(TimeoutError):
with pool.get_connection(timeout=0.1):
pass


if __name__ == '__main__':
unittest.main()
1 change: 1 addition & 0 deletions test/test_Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def mock_connect(self):
class MockConn:
def close(self): pass
self.conn = MockConn()

monkeypatch.setattr(Connector, "connect", mock_connect)

new_db = Connector(
Expand Down
Loading