Gönderi

Veritabanı Bağlantı Havuzu ve Optimizasyon

SQLAlchemy ve asyncpg ile veritabanı bağlantı havuzu optimizasyonu. Connection pooling, performans tuning, monitoring ve production best practices.

Veritabanı Bağlantı Havuzu ve Optimizasyon

Veritabanı bağlantıları, web uygulamalarının en kritik ve pahalı kaynaklarından biridir. Her yeni bağlantı açmak TCP handshake, authentication ve resource allocation gibi maliyetli işlemler gerektirir. Connection pooling, bu maliyeti azaltarak uygulama performansını önemli ölçüde artırır. Bu yazıda, connection pooling kavramını, Python ile implementasyonunu ve optimizasyon tekniklerini ele alacağız.

Connection Pooling Nedir?

Connection pooling, veritabanı bağlantılarının yeniden kullanılabilir bir havuzda saklanması ve yönetilmesi tekniğidir. Her request için yeni bağlantı açmak yerine, mevcut bağlantılar havuzdan alınır ve işlem bitince havuza geri döner.

Connection Pooling Avantajları

  • Performans Artışı: Bağlantı açma maliyetini elimine eder (5-10x daha hızlı)
  • Resource Yönetimi: Veritabanı üzerinde aynı anda açık bağlantı sayısını kontrol eder
  • Ölçeklenebilirlik: Yüksek trafikte daha iyi performans
  • Connection Reuse: TCP ve SSL handshake maliyetlerinden kaçınır
  • Timeout Yönetimi: Kullanılmayan bağlantıları otomatik temizler

SQLAlchemy Connection Pool Yapısı

Connection Pooling vs Direct Connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# WITHOUT Connection Pool (BAD!)
import psycopg2

def get_user_without_pool(user_id):
    """Her requestte yeni bağlantı - YAVAŞ"""
    # TCP handshake, auth, resource allocation
    conn = psycopg2.connect(
        host="localhost",
        database="mydb",
        user="user",
        password="pass"
    )
    
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
    user = cursor.fetchone()
    
    cursor.close()
    conn.close()  # Bağlantıyı kapat
    
    return user

# WITH Connection Pool (GOOD!)
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Pool bir kez oluşturulur
engine = create_engine(
    'postgresql://user:pass@localhost/mydb',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20
)

def get_user_with_pool(user_id):
    """Havuzdan bağlantı al - HIZLI"""
    with engine.connect() as conn:
        result = conn.execute(
            "SELECT * FROM users WHERE id = %s",
            (user_id,)
        )
        return result.fetchone()
    # Bağlantı havuza geri döner, kapanmaz!

SQLAlchemy ile Connection Pooling

SQLAlchemy, Python’da en yaygın kullanılan ORM ve veritabanı toolkit’idir.

Temel Pool Konfigürasyonu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool, NullPool, StaticPool

# QueuePool (Default) - En yaygın kullanılan
engine = create_engine(
    'postgresql://user:password@localhost:5432/dbname',
    
    # Pool parametreleri
    poolclass=QueuePool,
    pool_size=10,              # Havuzda sürekli açık bağlantı sayısı
    max_overflow=20,           # Geçici olarak ekstra açılabilecek bağlantı
    pool_timeout=30,           # Havuzdan bağlantı alırken max bekleme (saniye)
    pool_recycle=3600,         # Bağlantıların yeniden kullanım süresi (1 saat)
    pool_pre_ping=True,        # Bağlantı kullanmadan önce health check
    
    # Connection parametreleri
    echo=False,                # SQL loglamayı kapat
    echo_pool=False,           # Pool loglamayı kapat
    connect_args={
        'connect_timeout': 10,
        'application_name': 'my_app',
    }
)

# Örnek kullanım
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)

def get_users():
    """Session ile kullanım"""
    session = Session()
    try:
        users = session.query(User).all()
        return users
    finally:
        session.close()  # Bağlantı havuza döner

# Context manager ile daha temiz
from contextlib import contextmanager

@contextmanager
def get_session():
    """Session context manager"""
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# Kullanım
with get_session() as session:
    users = session.query(User).filter(User.is_active == True).all()

Pool Türleri

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from sqlalchemy.pool import (
    QueuePool,      # Default, thread-safe queue
    NullPool,       # Pool yok, her seferinde yeni connection
    StaticPool,     # Tek connection, thread-safe değil
    SingletonThreadPool,  # Thread başına bir connection
    AssertionPool   # Development için, memory leak tespiti
)

# QueuePool - Production kullanımı
queue_engine = create_engine(
    'postgresql://localhost/db',
    poolclass=QueuePool,
    pool_size=20,
    max_overflow=10
)

# NullPool - Serverless/Lambda için
nullpool_engine = create_engine(
    'postgresql://localhost/db',
    poolclass=NullPool
)

# StaticPool - SQLite single-threaded
static_engine = create_engine(
    'sqlite:///db.sqlite',
    poolclass=StaticPool,
    connect_args={'check_same_thread': False}
)

# AssertionPool - Development/testing
assertion_engine = create_engine(
    'postgresql://localhost/db',
    poolclass=AssertionPool
)

Pool Size Optimizasyonu

Pool size belirlemek critical bir karardır. Çok küçük pool connection beklemelerine, çok büyük pool gereksiz resource kullanımına neden olur.

Python Connection Pool Performance Tuning

Optimal Pool Size Hesaplama

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import multiprocessing

def calculate_optimal_pool_size():
    """
    Formül: connections = ((core_count * 2) + effective_spindle_count)
    
    - core_count: CPU çekirdek sayısı
    - effective_spindle_count: Disk sayısı (SSD için 1)
    """
    core_count = multiprocessing.cpu_count()
    disk_count = 1  # SSD için
    
    optimal_size = (core_count * 2) + disk_count
    
    return {
        'pool_size': optimal_size,
        'max_overflow': optimal_size,  # %100 overflow
        'recommendation': f'Use pool_size={optimal_size}, max_overflow={optimal_size}'
    }

# Örnek çıktı
# {'pool_size': 9, 'max_overflow': 9, 'recommendation': 'Use pool_size=9, max_overflow=9'}

# Farklı senaryolar için ayarlar
POOL_CONFIGS = {
    'low_traffic': {
        'pool_size': 5,
        'max_overflow': 5,
        'pool_timeout': 30,
        'pool_recycle': 3600,
    },
    'medium_traffic': {
        'pool_size': 10,
        'max_overflow': 20,
        'pool_timeout': 30,
        'pool_recycle': 3600,
    },
    'high_traffic': {
        'pool_size': 20,
        'max_overflow': 30,
        'pool_timeout': 10,  # Daha kısa timeout
        'pool_recycle': 1800,  # Daha sık recycle
    },
    'burst_traffic': {
        'pool_size': 10,
        'max_overflow': 50,  # Yüksek burst capacity
        'pool_timeout': 5,
        'pool_recycle': 3600,
    }
}

def create_engine_for_scenario(scenario='medium_traffic'):
    """Senaryoya göre engine oluştur"""
    config = POOL_CONFIGS.get(scenario, POOL_CONFIGS['medium_traffic'])
    
    return create_engine(
        'postgresql://user:pass@localhost/db',
        **config,
        pool_pre_ping=True
    )

AsyncIO ve Async Connection Pooling

Modern async uygulamalar için asyncpg ve SQLAlchemy async desteği:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# asyncpg ile native async pool
import asyncpg
import asyncio

class AsyncDatabasePool:
    """Async PostgreSQL pool yönetimi"""
    
    def __init__(self, dsn, min_size=10, max_size=20):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self.pool = None
    
    async def connect(self):
        """Pool oluştur"""
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=60,
            max_queries=50000,  # Her connection 50k query sonra yenilenir
            max_inactive_connection_lifetime=300  # 5 dakika idle connection timeout
        )
        print(f"Pool created: {self.min_size}-{self.max_size} connections")
    
    async def close(self):
        """Pool'u kapat"""
        if self.pool:
            await self.pool.close()
            print("Pool closed")
    
    async def fetch_user(self, user_id):
        """Kullanıcı getir"""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                'SELECT * FROM users WHERE id = $1',
                user_id
            )
            return dict(row) if row else None
    
    async def fetch_users_batch(self, user_ids):
        """Toplu kullanıcı getirme"""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                'SELECT * FROM users WHERE id = ANY($1::int[])',
                user_ids
            )
            return [dict(row) for row in rows]
    
    async def execute_transaction(self):
        """Transaction örneği"""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                # Transaction içinde birden fazla query
                await conn.execute(
                    'INSERT INTO users (username, email) VALUES ($1, $2)',
                    'newuser', '[email protected]'
                )
                await conn.execute(
                    'INSERT INTO audit_log (action) VALUES ($1)',
                    'user_created'
                )
                # Hata olursa otomatik rollback

# Kullanım
async def main():
    pool = AsyncDatabasePool(
        dsn='postgresql://user:pass@localhost/db',
        min_size=10,
        max_size=30
    )
    
    await pool.connect()
    
    try:
        # Tek query
        user = await pool.fetch_user(1)
        print(f"User: {user}")
        
        # Batch query
        users = await pool.fetch_users_batch([1, 2, 3, 4, 5])
        print(f"Found {len(users)} users")
        
        # Paralel queries
        tasks = [pool.fetch_user(i) for i in range(1, 11)]
        results = await asyncio.gather(*tasks)
        print(f"Parallel fetched: {len(results)} users")
        
    finally:
        await pool.close()

# asyncio.run(main())

SQLAlchemy 2.0 Async Support

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

# Async engine oluştur
async_engine = create_async_engine(
    'postgresql+asyncpg://user:pass@localhost/db',
    
    # Pool ayarları
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
    pool_recycle=3600,
    
    # Async parametreler
    echo=False,
)

# Async session factory
AsyncSessionLocal = sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

# Model (örnek)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = 'users'
    
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str]
    email: Mapped[str]
    is_active: Mapped[bool] = mapped_column(default=True)

# Async CRUD operations
class AsyncUserRepository:
    """Async user repository"""
    
    @staticmethod
    async def get_by_id(user_id: int) -> User:
        """ID ile kullanıcı getir"""
        async with AsyncSessionLocal() as session:
            result = await session.execute(
                select(User).where(User.id == user_id)
            )
            return result.scalar_one_or_none()
    
    @staticmethod
    async def get_active_users() -> list[User]:
        """Aktif kullanıcıları getir"""
        async with AsyncSessionLocal() as session:
            result = await session.execute(
                select(User).where(User.is_active == True)
            )
            return result.scalars().all()
    
    @staticmethod
    async def create_user(username: str, email: str) -> User:
        """Kullanıcı oluştur"""
        async with AsyncSessionLocal() as session:
            user = User(username=username, email=email)
            session.add(user)
            await session.commit()
            await session.refresh(user)
            return user
    
    @staticmethod
    async def bulk_create_users(users_data: list[dict]) -> int:
        """Toplu kullanıcı oluşturma"""
        async with AsyncSessionLocal() as session:
            users = [User(**data) for data in users_data]
            session.add_all(users)
            await session.commit()
            return len(users)

# FastAPI entegrasyonu
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

async def get_db() -> AsyncSession:
    """Database session dependency"""
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    """Kullanıcı getir endpoint"""
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    return {
        'id': user.id,
        'username': user.username,
        'email': user.email
    }

@app.post("/users/")
async def create_user(
    username: str,
    email: str,
    db: AsyncSession = Depends(get_db)
):
    """Kullanıcı oluştur endpoint"""
    user = User(username=username, email=email)
    db.add(user)
    await db.commit()
    await db.refresh(user)
    
    return {'id': user.id, 'username': user.username}

Connection Pool Monitoring ve Debugging

Pool durumunu izlemek performance sorunlarını tespit için kritiktir:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import logging
from sqlalchemy import event
from sqlalchemy.pool import Pool

# Detaylı pool logging
logging.basicConfig()
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)

class PoolMonitor:
    """Connection pool monitoring"""
    
    def __init__(self, engine):
        self.engine = engine
        self.pool = engine.pool
        
        # Event listeners ekle
        event.listen(Pool, 'connect', self.on_connect)
        event.listen(Pool, 'checkout', self.on_checkout)
        event.listen(Pool, 'checkin', self.on_checkin)
    
    def on_connect(self, dbapi_conn, connection_record):
        """Yeni bağlantı oluşturulduğunda"""
        print(f"[POOL] New connection created: {id(dbapi_conn)}")
    
    def on_checkout(self, dbapi_conn, connection_record, connection_proxy):
        """Bağlantı havuzdan alındığında"""
        print(f"[POOL] Connection checked out: {id(dbapi_conn)}")
        self.log_pool_status()
    
    def on_checkin(self, dbapi_conn, connection_record):
        """Bağlantı havuza döndüğünde"""
        print(f"[POOL] Connection checked in: {id(dbapi_conn)}")
        self.log_pool_status()
    
    def log_pool_status(self):
        """Pool durumunu logla"""
        print(f"""
        Pool Status:
        - Size: {self.pool.size()}
        - Checked out: {self.pool.checkedout()}
        - Overflow: {self.pool.overflow()}
        - Checked in: {self.pool.checkedin()}
        """)
    
    def get_pool_stats(self):
        """Pool istatistikleri"""
        return {
            'size': self.pool.size(),
            'checked_out': self.pool.checkedout(),
            'overflow': self.pool.overflow(),
            'checked_in': self.pool.checkedin(),
            'timeout': self.pool._timeout,
            'recycle': self.pool._recycle,
        }

# Kullanım
engine = create_engine('postgresql://localhost/db', echo_pool=True)
monitor = PoolMonitor(engine)

# Pool durumunu kontrol et
stats = monitor.get_pool_stats()
print(f"Pool Stats: {stats}")

# Custom pool monitoring endpoint (FastAPI)
@app.get("/health/pool")
async def pool_health():
    """Pool health check endpoint"""
    pool = engine.pool
    
    return {
        'status': 'healthy',
        'pool': {
            'size': pool.size(),
            'checked_out': pool.checkedout(),
            'overflow': pool.overflow(),
            'checked_in': pool.checkedin(),
        },
        'config': {
            'pool_size': pool._pool.maxsize,
            'max_overflow': pool._max_overflow,
            'timeout': pool._timeout,
            'recycle': pool._recycle,
        }
    }

Connection Pool Best Practices

1. Pool Pre-Ping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Her bağlantıyı kullanmadan önce test et
engine = create_engine(
    'postgresql://localhost/db',
    pool_pre_ping=True  # Önemli: Stale connection'ları önler
)

# Custom pre-ping logic
from sqlalchemy import event

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    """Connection oluşturulduğunda özel setup"""
    # Timezone ayarla
    cursor = dbapi_conn.cursor()
    cursor.execute("SET timezone='UTC'")
    cursor.close()

2. Connection Recycle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Uzun süreli bağlantıları yenile
engine = create_engine(
    'postgresql://localhost/db',
    pool_recycle=3600,  # 1 saat sonra yenile
    pool_pre_ping=True
)

# Database timeout'tan önce recycle et
# MySQL: wait_timeout genelde 8 saat
# PostgreSQL: idle_in_transaction_session_timeout
engine = create_engine(
    'mysql://localhost/db',
    pool_recycle=7200  # 2 saat (8 saatten önce)
)

3. Graceful Shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import signal
import sys

engine = create_engine('postgresql://localhost/db')

def graceful_shutdown(signum, frame):
    """Graceful pool kapatma"""
    print("Shutting down gracefully...")
    
    # Yeni connection'ları engelle
    engine.dispose()
    
    print("All connections closed")
    sys.exit(0)

signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)

4. Connection Timeout Handling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy.exc import TimeoutError, DBAPIError

def query_with_timeout_handling():
    """Timeout handling örneği"""
    try:
        with engine.connect() as conn:
            result = conn.execute("SELECT * FROM users")
            return result.fetchall()
    
    except TimeoutError:
        # Pool timeout - tüm connections kullanımda
        print("Pool timeout: All connections busy")
        # Retry logic, cache'den dön, vs.
        return []
    
    except DBAPIError as e:
        # Database-level error
        print(f"Database error: {e}")
        return []

Multi-Database Connection Pooling

Birden fazla veritabanı ile çalışma:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from sqlalchemy.orm import Session

class MultiDatabaseManager:
    """Çoklu veritabanı yönetimi"""
    
    def __init__(self):
        # PostgreSQL - Ana veritabanı
        self.pg_engine = create_engine(
            'postgresql://localhost/maindb',
            pool_size=20,
            max_overflow=10
        )
        
        # MySQL - Analytics veritabanı
        self.mysql_engine = create_engine(
            'mysql://localhost/analytics',
            pool_size=10,
            max_overflow=5
        )
        
        # MongoDB - Logs (pymongo pool)
        from pymongo import MongoClient
        self.mongo_client = MongoClient(
            'mongodb://localhost:27017/',
            maxPoolSize=50,
            minPoolSize=10
        )
        
        # Redis - Cache (redis-py pool)
        import redis
        self.redis_pool = redis.ConnectionPool(
            host='localhost',
            port=6379,
            max_connections=20
        )
        self.redis_client = redis.Redis(connection_pool=self.redis_pool)
    
    def get_user_from_pg(self, user_id):
        """PostgreSQL'den kullanıcı"""
        with self.pg_engine.connect() as conn:
            result = conn.execute(
                "SELECT * FROM users WHERE id = %s",
                (user_id,)
            )
            return result.fetchone()
    
    def get_analytics_from_mysql(self, user_id):
        """MySQL'den analytics"""
        with self.mysql_engine.connect() as conn:
            result = conn.execute(
                "SELECT * FROM user_analytics WHERE user_id = %s",
                (user_id,)
            )
            return result.fetchone()
    
    def log_to_mongo(self, event):
        """MongoDB'ye log"""
        db = self.mongo_client['logs']
        collection = db['events']
        collection.insert_one(event)
    
    def cache_user(self, user_id, user_data):
        """Redis'e cache"""
        self.redis_client.setex(
            f'user:{user_id}',
            3600,  # 1 saat TTL
            json.dumps(user_data)
        )
    
    def get_cached_user(self, user_id):
        """Redis'ten cache oku"""
        data = self.redis_client.get(f'user:{user_id}')
        return json.loads(data) if data else None
    
    def dispose_all(self):
        """Tüm pool'ları kapat"""
        self.pg_engine.dispose()
        self.mysql_engine.dispose()
        self.mongo_client.close()
        self.redis_client.connection_pool.disconnect()

# Kullanım
db_manager = MultiDatabaseManager()

# Composite query - birden fazla database
def get_user_complete_info(user_id):
    """Tüm veritabanlarından veri topla"""
    # Önce cache kontrol et
    cached = db_manager.get_cached_user(user_id)
    if cached:
        return cached
    
    # Cache'de yok, veritabanlarından çek
    user = db_manager.get_user_from_pg(user_id)
    analytics = db_manager.get_analytics_from_mysql(user_id)
    
    complete_info = {
        'user': dict(user),
        'analytics': dict(analytics)
    }
    
    # Cache'e kaydet
    db_manager.cache_user(user_id, complete_info)
    
    # Activity log
    db_manager.log_to_mongo({
        'event': 'user_info_fetched',
        'user_id': user_id,
        'timestamp': datetime.utcnow()
    })
    
    return complete_info

Performance Testing ve Benchmarking

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import time
import statistics
from concurrent.futures import ThreadPoolExecutor

def benchmark_pool_performance():
    """Pool performance testi"""
    # With pool
    engine_with_pool = create_engine(
        'postgresql://localhost/db',
        pool_size=20,
        max_overflow=10
    )
    
    # Without pool (NullPool)
    engine_without_pool = create_engine(
        'postgresql://localhost/db',
        poolclass=NullPool
    )
    
    def execute_query(engine, query_count=100):
        """Belirli sayıda query çalıştır"""
        times = []
        
        for _ in range(query_count):
            start = time.time()
            with engine.connect() as conn:
                conn.execute("SELECT 1")
            times.append(time.time() - start)
        
        return times
    
    # Test with pool
    print("Testing WITH pool...")
    with_pool_times = execute_query(engine_with_pool)
    
    # Test without pool
    print("Testing WITHOUT pool...")
    without_pool_times = execute_query(engine_without_pool)
    
    # Results
    print("\n=== RESULTS ===")
    print(f"With Pool:")
    print(f"  Avg: {statistics.mean(with_pool_times)*1000:.2f}ms")
    print(f"  Min: {min(with_pool_times)*1000:.2f}ms")
    print(f"  Max: {max(with_pool_times)*1000:.2f}ms")
    
    print(f"\nWithout Pool:")
    print(f"  Avg: {statistics.mean(without_pool_times)*1000:.2f}ms")
    print(f"  Min: {min(without_pool_times)*1000:.2f}ms")
    print(f"  Max: {max(without_pool_times)*1000:.2f}ms")
    
    speedup = statistics.mean(without_pool_times) / statistics.mean(with_pool_times)
    print(f"\nSpeedup: {speedup:.2f}x faster with pool")

# Concurrent load test
def concurrent_load_test(engine, workers=10, queries_per_worker=100):
    """Concurrent yük testi"""
    def worker_task():
        times = []
        for _ in range(queries_per_worker):
            start = time.time()
            try:
                with engine.connect() as conn:
                    conn.execute("SELECT pg_sleep(0.01)")  # 10ms query
            except Exception as e:
                print(f"Error: {e}")
            times.append(time.time() - start)
        return times
    
    print(f"Running {workers} workers, {queries_per_worker} queries each...")
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = [executor.submit(worker_task) for _ in range(workers)]
        results = [f.result() for f in futures]
    
    elapsed = time.time() - start_time
    all_times = [t for worker_times in results for t in worker_times]
    
    print(f"\nTotal queries: {len(all_times)}")
    print(f"Total time: {elapsed:.2f}s")
    print(f"Throughput: {len(all_times)/elapsed:.2f} queries/sec")
    print(f"Avg latency: {statistics.mean(all_times)*1000:.2f}ms")

# benchmark_pool_performance()

Sonuç

Connection pooling, production-grade uygulamalar için kritik bir optimizasyon tekniğidir. Doğru konfigüre edilmiş bir connection pool:

  • Response time’ı 5-10x azaltır
  • Veritabanı üzerindeki yükü kontrol eder
  • Resource kullanımını optimize eder
  • Yüksek trafikte application stability sağlar

Bu yazıda ele aldığımız konular:

  • Connection pooling temel kavramları ve avantajları
  • SQLAlchemy ile pool konfigürasyonu
  • Optimal pool size hesaplama
  • AsyncIO ve async connection pooling
  • Pool monitoring ve debugging
  • Best practices ve error handling
  • Multi-database connection management
  • Performance testing ve benchmarking

Connection pool ayarlarınızı application ihtiyaçlarına göre fine-tune etmek, performans ve stability için elzemdir.

Kaynaklar:

Bu gönderi CC BY 4.0 lisansı altındadır.