Veritabanı bağlantıları, web uygulamalarının en kritik ve pahalı kaynaklarından biridir. Her yeni bağlantı açmak TCP handshake, authentication ve resource allocation gibi maliyetli işlemler gerektirir. Connection pooling, bu maliyeti azaltarak uygulama performansını önemli ölçüde artırır. Bu yazıda, connection pooling kavramını, Python ile implementasyonunu ve optimizasyon tekniklerini ele alacağız.
Connection Pooling Nedir?
Connection pooling, veritabanı bağlantılarının yeniden kullanılabilir bir havuzda saklanması ve yönetilmesi tekniğidir. Her request için yeni bağlantı açmak yerine, mevcut bağlantılar havuzdan alınır ve işlem bitince havuza geri döner.
Connection Pooling Avantajları
- Performans Artışı: Bağlantı açma maliyetini elimine eder (5-10x daha hızlı)
- Resource Yönetimi: Veritabanı üzerinde aynı anda açık bağlantı sayısını kontrol eder
- Ölçeklenebilirlik: Yüksek trafikte daha iyi performans
- Connection Reuse: TCP ve SSL handshake maliyetlerinden kaçınır
- Timeout Yönetimi: Kullanılmayan bağlantıları otomatik temizler
Connection Pooling vs Direct Connection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| # WITHOUT Connection Pool (BAD!)
import psycopg2
def get_user_without_pool(user_id):
"""Her requestte yeni bağlantı - YAVAŞ"""
# TCP handshake, auth, resource allocation
conn = psycopg2.connect(
host="localhost",
database="mydb",
user="user",
password="pass"
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
cursor.close()
conn.close() # Bağlantıyı kapat
return user
# WITH Connection Pool (GOOD!)
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Pool bir kez oluşturulur
engine = create_engine(
'postgresql://user:pass@localhost/mydb',
poolclass=QueuePool,
pool_size=10,
max_overflow=20
)
def get_user_with_pool(user_id):
"""Havuzdan bağlantı al - HIZLI"""
with engine.connect() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,)
)
return result.fetchone()
# Bağlantı havuza geri döner, kapanmaz!
|
SQLAlchemy ile Connection Pooling
SQLAlchemy, Python’da en yaygın kullanılan ORM ve veritabanı toolkit’idir.
Temel Pool Konfigürasyonu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool, NullPool, StaticPool
# QueuePool (Default) - En yaygın kullanılan
engine = create_engine(
'postgresql://user:password@localhost:5432/dbname',
# Pool parametreleri
poolclass=QueuePool,
pool_size=10, # Havuzda sürekli açık bağlantı sayısı
max_overflow=20, # Geçici olarak ekstra açılabilecek bağlantı
pool_timeout=30, # Havuzdan bağlantı alırken max bekleme (saniye)
pool_recycle=3600, # Bağlantıların yeniden kullanım süresi (1 saat)
pool_pre_ping=True, # Bağlantı kullanmadan önce health check
# Connection parametreleri
echo=False, # SQL loglamayı kapat
echo_pool=False, # Pool loglamayı kapat
connect_args={
'connect_timeout': 10,
'application_name': 'my_app',
}
)
# Örnek kullanım
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
def get_users():
"""Session ile kullanım"""
session = Session()
try:
users = session.query(User).all()
return users
finally:
session.close() # Bağlantı havuza döner
# Context manager ile daha temiz
from contextlib import contextmanager
@contextmanager
def get_session():
"""Session context manager"""
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Kullanım
with get_session() as session:
users = session.query(User).filter(User.is_active == True).all()
|
Pool Türleri
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| from sqlalchemy.pool import (
QueuePool, # Default, thread-safe queue
NullPool, # Pool yok, her seferinde yeni connection
StaticPool, # Tek connection, thread-safe değil
SingletonThreadPool, # Thread başına bir connection
AssertionPool # Development için, memory leak tespiti
)
# QueuePool - Production kullanımı
queue_engine = create_engine(
'postgresql://localhost/db',
poolclass=QueuePool,
pool_size=20,
max_overflow=10
)
# NullPool - Serverless/Lambda için
nullpool_engine = create_engine(
'postgresql://localhost/db',
poolclass=NullPool
)
# StaticPool - SQLite single-threaded
static_engine = create_engine(
'sqlite:///db.sqlite',
poolclass=StaticPool,
connect_args={'check_same_thread': False}
)
# AssertionPool - Development/testing
assertion_engine = create_engine(
'postgresql://localhost/db',
poolclass=AssertionPool
)
|
Pool Size Optimizasyonu
Pool size belirlemek critical bir karardır. Çok küçük pool connection beklemelerine, çok büyük pool gereksiz resource kullanımına neden olur.
Optimal Pool Size Hesaplama
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| import multiprocessing
def calculate_optimal_pool_size():
"""
Formül: connections = ((core_count * 2) + effective_spindle_count)
- core_count: CPU çekirdek sayısı
- effective_spindle_count: Disk sayısı (SSD için 1)
"""
core_count = multiprocessing.cpu_count()
disk_count = 1 # SSD için
optimal_size = (core_count * 2) + disk_count
return {
'pool_size': optimal_size,
'max_overflow': optimal_size, # %100 overflow
'recommendation': f'Use pool_size={optimal_size}, max_overflow={optimal_size}'
}
# Örnek çıktı
# {'pool_size': 9, 'max_overflow': 9, 'recommendation': 'Use pool_size=9, max_overflow=9'}
# Farklı senaryolar için ayarlar
POOL_CONFIGS = {
'low_traffic': {
'pool_size': 5,
'max_overflow': 5,
'pool_timeout': 30,
'pool_recycle': 3600,
},
'medium_traffic': {
'pool_size': 10,
'max_overflow': 20,
'pool_timeout': 30,
'pool_recycle': 3600,
},
'high_traffic': {
'pool_size': 20,
'max_overflow': 30,
'pool_timeout': 10, # Daha kısa timeout
'pool_recycle': 1800, # Daha sık recycle
},
'burst_traffic': {
'pool_size': 10,
'max_overflow': 50, # Yüksek burst capacity
'pool_timeout': 5,
'pool_recycle': 3600,
}
}
def create_engine_for_scenario(scenario='medium_traffic'):
"""Senaryoya göre engine oluştur"""
config = POOL_CONFIGS.get(scenario, POOL_CONFIGS['medium_traffic'])
return create_engine(
'postgresql://user:pass@localhost/db',
**config,
pool_pre_ping=True
)
|
AsyncIO ve Async Connection Pooling
Modern async uygulamalar için asyncpg ve SQLAlchemy async desteği:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
| # asyncpg ile native async pool
import asyncpg
import asyncio
class AsyncDatabasePool:
"""Async PostgreSQL pool yönetimi"""
def __init__(self, dsn, min_size=10, max_size=20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def connect(self):
"""Pool oluştur"""
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60,
max_queries=50000, # Her connection 50k query sonra yenilenir
max_inactive_connection_lifetime=300 # 5 dakika idle connection timeout
)
print(f"Pool created: {self.min_size}-{self.max_size} connections")
async def close(self):
"""Pool'u kapat"""
if self.pool:
await self.pool.close()
print("Pool closed")
async def fetch_user(self, user_id):
"""Kullanıcı getir"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)
return dict(row) if row else None
async def fetch_users_batch(self, user_ids):
"""Toplu kullanıcı getirme"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
'SELECT * FROM users WHERE id = ANY($1::int[])',
user_ids
)
return [dict(row) for row in rows]
async def execute_transaction(self):
"""Transaction örneği"""
async with self.pool.acquire() as conn:
async with conn.transaction():
# Transaction içinde birden fazla query
await conn.execute(
'INSERT INTO users (username, email) VALUES ($1, $2)',
'newuser', '[email protected]'
)
await conn.execute(
'INSERT INTO audit_log (action) VALUES ($1)',
'user_created'
)
# Hata olursa otomatik rollback
# Kullanım
async def main():
pool = AsyncDatabasePool(
dsn='postgresql://user:pass@localhost/db',
min_size=10,
max_size=30
)
await pool.connect()
try:
# Tek query
user = await pool.fetch_user(1)
print(f"User: {user}")
# Batch query
users = await pool.fetch_users_batch([1, 2, 3, 4, 5])
print(f"Found {len(users)} users")
# Paralel queries
tasks = [pool.fetch_user(i) for i in range(1, 11)]
results = await asyncio.gather(*tasks)
print(f"Parallel fetched: {len(results)} users")
finally:
await pool.close()
# asyncio.run(main())
|
SQLAlchemy 2.0 Async Support
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
# Async engine oluştur
async_engine = create_async_engine(
'postgresql+asyncpg://user:pass@localhost/db',
# Pool ayarları
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600,
# Async parametreler
echo=False,
)
# Async session factory
AsyncSessionLocal = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
# Model (örnek)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str]
email: Mapped[str]
is_active: Mapped[bool] = mapped_column(default=True)
# Async CRUD operations
class AsyncUserRepository:
"""Async user repository"""
@staticmethod
async def get_by_id(user_id: int) -> User:
"""ID ile kullanıcı getir"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
@staticmethod
async def get_active_users() -> list[User]:
"""Aktif kullanıcıları getir"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(User).where(User.is_active == True)
)
return result.scalars().all()
@staticmethod
async def create_user(username: str, email: str) -> User:
"""Kullanıcı oluştur"""
async with AsyncSessionLocal() as session:
user = User(username=username, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user
@staticmethod
async def bulk_create_users(users_data: list[dict]) -> int:
"""Toplu kullanıcı oluşturma"""
async with AsyncSessionLocal() as session:
users = [User(**data) for data in users_data]
session.add_all(users)
await session.commit()
return len(users)
# FastAPI entegrasyonu
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
async def get_db() -> AsyncSession:
"""Database session dependency"""
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
"""Kullanıcı getir endpoint"""
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {
'id': user.id,
'username': user.username,
'email': user.email
}
@app.post("/users/")
async def create_user(
username: str,
email: str,
db: AsyncSession = Depends(get_db)
):
"""Kullanıcı oluştur endpoint"""
user = User(username=username, email=email)
db.add(user)
await db.commit()
await db.refresh(user)
return {'id': user.id, 'username': user.username}
|
Connection Pool Monitoring ve Debugging
Pool durumunu izlemek performance sorunlarını tespit için kritiktir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
| import logging
from sqlalchemy import event
from sqlalchemy.pool import Pool
# Detaylı pool logging
logging.basicConfig()
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)
class PoolMonitor:
"""Connection pool monitoring"""
def __init__(self, engine):
self.engine = engine
self.pool = engine.pool
# Event listeners ekle
event.listen(Pool, 'connect', self.on_connect)
event.listen(Pool, 'checkout', self.on_checkout)
event.listen(Pool, 'checkin', self.on_checkin)
def on_connect(self, dbapi_conn, connection_record):
"""Yeni bağlantı oluşturulduğunda"""
print(f"[POOL] New connection created: {id(dbapi_conn)}")
def on_checkout(self, dbapi_conn, connection_record, connection_proxy):
"""Bağlantı havuzdan alındığında"""
print(f"[POOL] Connection checked out: {id(dbapi_conn)}")
self.log_pool_status()
def on_checkin(self, dbapi_conn, connection_record):
"""Bağlantı havuza döndüğünde"""
print(f"[POOL] Connection checked in: {id(dbapi_conn)}")
self.log_pool_status()
def log_pool_status(self):
"""Pool durumunu logla"""
print(f"""
Pool Status:
- Size: {self.pool.size()}
- Checked out: {self.pool.checkedout()}
- Overflow: {self.pool.overflow()}
- Checked in: {self.pool.checkedin()}
""")
def get_pool_stats(self):
"""Pool istatistikleri"""
return {
'size': self.pool.size(),
'checked_out': self.pool.checkedout(),
'overflow': self.pool.overflow(),
'checked_in': self.pool.checkedin(),
'timeout': self.pool._timeout,
'recycle': self.pool._recycle,
}
# Kullanım
engine = create_engine('postgresql://localhost/db', echo_pool=True)
monitor = PoolMonitor(engine)
# Pool durumunu kontrol et
stats = monitor.get_pool_stats()
print(f"Pool Stats: {stats}")
# Custom pool monitoring endpoint (FastAPI)
@app.get("/health/pool")
async def pool_health():
"""Pool health check endpoint"""
pool = engine.pool
return {
'status': 'healthy',
'pool': {
'size': pool.size(),
'checked_out': pool.checkedout(),
'overflow': pool.overflow(),
'checked_in': pool.checkedin(),
},
'config': {
'pool_size': pool._pool.maxsize,
'max_overflow': pool._max_overflow,
'timeout': pool._timeout,
'recycle': pool._recycle,
}
}
|
Connection Pool Best Practices
1. Pool Pre-Ping
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| # Her bağlantıyı kullanmadan önce test et
engine = create_engine(
'postgresql://localhost/db',
pool_pre_ping=True # Önemli: Stale connection'ları önler
)
# Custom pre-ping logic
from sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
"""Connection oluşturulduğunda özel setup"""
# Timezone ayarla
cursor = dbapi_conn.cursor()
cursor.execute("SET timezone='UTC'")
cursor.close()
|
2. Connection Recycle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # Uzun süreli bağlantıları yenile
engine = create_engine(
'postgresql://localhost/db',
pool_recycle=3600, # 1 saat sonra yenile
pool_pre_ping=True
)
# Database timeout'tan önce recycle et
# MySQL: wait_timeout genelde 8 saat
# PostgreSQL: idle_in_transaction_session_timeout
engine = create_engine(
'mysql://localhost/db',
pool_recycle=7200 # 2 saat (8 saatten önce)
)
|
3. Graceful Shutdown
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import signal
import sys
engine = create_engine('postgresql://localhost/db')
def graceful_shutdown(signum, frame):
"""Graceful pool kapatma"""
print("Shutting down gracefully...")
# Yeni connection'ları engelle
engine.dispose()
print("All connections closed")
sys.exit(0)
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)
|
4. Connection Timeout Handling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| from sqlalchemy.exc import TimeoutError, DBAPIError
def query_with_timeout_handling():
"""Timeout handling örneği"""
try:
with engine.connect() as conn:
result = conn.execute("SELECT * FROM users")
return result.fetchall()
except TimeoutError:
# Pool timeout - tüm connections kullanımda
print("Pool timeout: All connections busy")
# Retry logic, cache'den dön, vs.
return []
except DBAPIError as e:
# Database-level error
print(f"Database error: {e}")
return []
|
Multi-Database Connection Pooling
Birden fazla veritabanı ile çalışma:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
| from sqlalchemy.orm import Session
class MultiDatabaseManager:
"""Çoklu veritabanı yönetimi"""
def __init__(self):
# PostgreSQL - Ana veritabanı
self.pg_engine = create_engine(
'postgresql://localhost/maindb',
pool_size=20,
max_overflow=10
)
# MySQL - Analytics veritabanı
self.mysql_engine = create_engine(
'mysql://localhost/analytics',
pool_size=10,
max_overflow=5
)
# MongoDB - Logs (pymongo pool)
from pymongo import MongoClient
self.mongo_client = MongoClient(
'mongodb://localhost:27017/',
maxPoolSize=50,
minPoolSize=10
)
# Redis - Cache (redis-py pool)
import redis
self.redis_pool = redis.ConnectionPool(
host='localhost',
port=6379,
max_connections=20
)
self.redis_client = redis.Redis(connection_pool=self.redis_pool)
def get_user_from_pg(self, user_id):
"""PostgreSQL'den kullanıcı"""
with self.pg_engine.connect() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,)
)
return result.fetchone()
def get_analytics_from_mysql(self, user_id):
"""MySQL'den analytics"""
with self.mysql_engine.connect() as conn:
result = conn.execute(
"SELECT * FROM user_analytics WHERE user_id = %s",
(user_id,)
)
return result.fetchone()
def log_to_mongo(self, event):
"""MongoDB'ye log"""
db = self.mongo_client['logs']
collection = db['events']
collection.insert_one(event)
def cache_user(self, user_id, user_data):
"""Redis'e cache"""
self.redis_client.setex(
f'user:{user_id}',
3600, # 1 saat TTL
json.dumps(user_data)
)
def get_cached_user(self, user_id):
"""Redis'ten cache oku"""
data = self.redis_client.get(f'user:{user_id}')
return json.loads(data) if data else None
def dispose_all(self):
"""Tüm pool'ları kapat"""
self.pg_engine.dispose()
self.mysql_engine.dispose()
self.mongo_client.close()
self.redis_client.connection_pool.disconnect()
# Kullanım
db_manager = MultiDatabaseManager()
# Composite query - birden fazla database
def get_user_complete_info(user_id):
"""Tüm veritabanlarından veri topla"""
# Önce cache kontrol et
cached = db_manager.get_cached_user(user_id)
if cached:
return cached
# Cache'de yok, veritabanlarından çek
user = db_manager.get_user_from_pg(user_id)
analytics = db_manager.get_analytics_from_mysql(user_id)
complete_info = {
'user': dict(user),
'analytics': dict(analytics)
}
# Cache'e kaydet
db_manager.cache_user(user_id, complete_info)
# Activity log
db_manager.log_to_mongo({
'event': 'user_info_fetched',
'user_id': user_id,
'timestamp': datetime.utcnow()
})
return complete_info
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| import time
import statistics
from concurrent.futures import ThreadPoolExecutor
def benchmark_pool_performance():
"""Pool performance testi"""
# With pool
engine_with_pool = create_engine(
'postgresql://localhost/db',
pool_size=20,
max_overflow=10
)
# Without pool (NullPool)
engine_without_pool = create_engine(
'postgresql://localhost/db',
poolclass=NullPool
)
def execute_query(engine, query_count=100):
"""Belirli sayıda query çalıştır"""
times = []
for _ in range(query_count):
start = time.time()
with engine.connect() as conn:
conn.execute("SELECT 1")
times.append(time.time() - start)
return times
# Test with pool
print("Testing WITH pool...")
with_pool_times = execute_query(engine_with_pool)
# Test without pool
print("Testing WITHOUT pool...")
without_pool_times = execute_query(engine_without_pool)
# Results
print("\n=== RESULTS ===")
print(f"With Pool:")
print(f" Avg: {statistics.mean(with_pool_times)*1000:.2f}ms")
print(f" Min: {min(with_pool_times)*1000:.2f}ms")
print(f" Max: {max(with_pool_times)*1000:.2f}ms")
print(f"\nWithout Pool:")
print(f" Avg: {statistics.mean(without_pool_times)*1000:.2f}ms")
print(f" Min: {min(without_pool_times)*1000:.2f}ms")
print(f" Max: {max(without_pool_times)*1000:.2f}ms")
speedup = statistics.mean(without_pool_times) / statistics.mean(with_pool_times)
print(f"\nSpeedup: {speedup:.2f}x faster with pool")
# Concurrent load test
def concurrent_load_test(engine, workers=10, queries_per_worker=100):
"""Concurrent yük testi"""
def worker_task():
times = []
for _ in range(queries_per_worker):
start = time.time()
try:
with engine.connect() as conn:
conn.execute("SELECT pg_sleep(0.01)") # 10ms query
except Exception as e:
print(f"Error: {e}")
times.append(time.time() - start)
return times
print(f"Running {workers} workers, {queries_per_worker} queries each...")
start_time = time.time()
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(worker_task) for _ in range(workers)]
results = [f.result() for f in futures]
elapsed = time.time() - start_time
all_times = [t for worker_times in results for t in worker_times]
print(f"\nTotal queries: {len(all_times)}")
print(f"Total time: {elapsed:.2f}s")
print(f"Throughput: {len(all_times)/elapsed:.2f} queries/sec")
print(f"Avg latency: {statistics.mean(all_times)*1000:.2f}ms")
# benchmark_pool_performance()
|
Sonuç
Connection pooling, production-grade uygulamalar için kritik bir optimizasyon tekniğidir. Doğru konfigüre edilmiş bir connection pool:
- Response time’ı 5-10x azaltır
- Veritabanı üzerindeki yükü kontrol eder
- Resource kullanımını optimize eder
- Yüksek trafikte application stability sağlar
Bu yazıda ele aldığımız konular:
- Connection pooling temel kavramları ve avantajları
- SQLAlchemy ile pool konfigürasyonu
- Optimal pool size hesaplama
- AsyncIO ve async connection pooling
- Pool monitoring ve debugging
- Best practices ve error handling
- Multi-database connection management
- Performance testing ve benchmarking
Connection pool ayarlarınızı application ihtiyaçlarına göre fine-tune etmek, performans ve stability için elzemdir.
Kaynaklar: