Webhook Handler ile Olay Güdümlü Mimari
FastAPI ile production-ready webhook handler sistemi. Event-driven mimari, signature verification, retry mechanism, idempotency ve async processing.
Modern uygulama mimarisinde, sistemler arası gerçek zamanlı iletişim kritik bir gereksinim haline geldi. Webhooklar, bu gereksinimi karşılamak için event-driven (olay güdümlü) mimari deseninde kullanılan güçlü bir araçtır. Bu yazıda, Python ve FastAPI kullanarak production-ready bir webhook handler sistemi geliştireceğiz.
Webhook Nedir?
Webhook, bir uygulamada gerçekleşen olayları diğer uygulamalara HTTP POST istekleriyle bildiren “ters API” mekanizmasıdır. Geleneksel API’larda istemci sürekli veri sorgulaması yaparken (polling), webhooklarda sunucu bir olay olduğunda proaktif olarak bildirim gönderir.
Webhook vs Polling
Polling Yaklaşımı:
1
2
3
4
5
6
7
8
9
10
import time
import requests
# Geleneksel polling - Verimsiz!
while True:
response = requests.get("https://api.example.com/orders/status")
if response.json()["status"] == "completed":
process_order(response.json())
break
time.sleep(10) # 10 saniye bekle ve tekrar dene
Webhook Yaklaşımı:
1
2
3
4
5
6
7
8
9
10
11
from fastapi import FastAPI, Request
app = FastAPI()
# Event-driven - Verimli!
@app.post("/webhooks/order-completed")
async def handle_order_completion(request: Request):
"""Sipariş tamamlandığında otomatik çağrılır"""
payload = await request.json()
await process_order(payload)
return {"status": "received"}
Webhook yaklaşımı gereksiz API çağrılarını ortadan kaldırarak bandwidth tasarrufu sağlar ve gerçek zamanlı tepki verme imkanı sunar.
FastAPI ile event-driven webhook sistemi
FastAPI ile Webhook Endpoint Oluşturma
Temel Webhook Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, Any
from datetime import datetime
import hmac
import hashlib
import json
app = FastAPI(title="Webhook Handler Service")
class WebhookPayload(BaseModel):
"""Webhook payload validasyonu"""
event_type: str = Field(..., description="Olay tipi")
event_id: str = Field(..., description="Benzersiz olay ID'si")
timestamp: datetime = Field(..., description="Olay zamanı")
data: Dict[str, Any] = Field(..., description="Olay verisi")
@validator('event_type')
def validate_event_type(cls, v):
"""Desteklenen event tiplerini kontrol et"""
allowed_events = [
'order.created',
'order.completed',
'payment.received',
'shipment.dispatched'
]
if v not in allowed_events:
raise ValueError(f"Unsupported event type: {v}")
return v
@app.post("/webhooks/receive")
async def receive_webhook(
request: Request,
background_tasks: BackgroundTasks
):
"""
Webhook alma endpoint'i
Bu endpoint:
1. Webhook imzasını doğrular
2. Payload'ı parse eder ve validate eder
3. İşlemi background task olarak zamanlar
4. Hızlıca 200 OK döner
"""
# 1. İmza doğrulama
signature = request.headers.get("X-Webhook-Signature")
if not signature:
raise HTTPException(status_code=401, detail="Missing signature")
body = await request.body()
if not verify_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# 2. Payload parsing
try:
payload_dict = json.loads(body)
payload = WebhookPayload(**payload_dict)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid payload: {str(e)}")
# 3. Background processing
background_tasks.add_task(process_webhook, payload)
# 4. Hızlı response (webhook gönderen taraf timeout'a düşmesin)
return {
"status": "accepted",
"event_id": payload.event_id,
"received_at": datetime.utcnow().isoformat()
}
def verify_signature(body: bytes, signature: str) -> bool:
"""
HMAC-SHA256 ile webhook imzasını doğrula
Güvenlik için kritik! Webhook'un gerçekten beklenen
kaynaktan geldiğini garanti eder.
"""
secret = "your-webhook-secret-key" # Ortam değişkeninden al
expected_signature = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
# Timing attack'lara karşı güvenli karşılaştırma
return hmac.compare_digest(expected_signature, signature)
async def process_webhook(payload: WebhookPayload):
"""
Webhook işleme logic'i
Bu fonksiyon background task olarak çalışır,
böylece webhook endpoint hızlı response dönebilir.
"""
print(f"Processing webhook: {payload.event_type}")
# Event tipine göre işlem yap
if payload.event_type == "order.created":
await handle_order_created(payload.data)
elif payload.event_type == "payment.received":
await handle_payment_received(payload.data)
# ... diğer event tipleri
Payload Validation
Webhook payloadları dış kaynaklardan geldiği için sıkı validation kritiktir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from pydantic import BaseModel, Field, validator, root_validator
from typing import Literal, Union
from decimal import Decimal
class OrderCreatedPayload(BaseModel):
"""order.created eventi için özel payload modeli"""
order_id: str = Field(..., regex=r'^ORD-\d{8}$')
customer_email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
total_amount: Decimal = Field(..., gt=0, max_digits=10, decimal_places=2)
currency: Literal["USD", "EUR", "TRY"] = "USD"
items: list[dict] = Field(..., min_items=1)
@validator('items')
def validate_items(cls, items):
"""Her item'ın gerekli alanları içerdiğini doğrula"""
required_fields = {'product_id', 'quantity', 'price'}
for item in items:
if not all(field in item for field in required_fields):
raise ValueError(f"Item missing required fields: {required_fields}")
if item['quantity'] <= 0:
raise ValueError("Item quantity must be positive")
return items
@root_validator
def validate_total(cls, values):
"""Total amount ile item fiyatlarının toplamını karşılaştır"""
items = values.get('items', [])
calculated_total = sum(
Decimal(str(item['price'])) * item['quantity']
for item in items
)
if abs(calculated_total - values['total_amount']) > Decimal('0.01'):
raise ValueError("Total amount mismatch")
return values
# Event-specific handler
async def handle_order_created(data: dict):
"""Sipariş oluşturma eventi işle"""
try:
order = OrderCreatedPayload(**data)
# İş logic'i
print(f"New order: {order.order_id}")
await send_confirmation_email(order.customer_email)
await update_inventory(order.items)
await notify_warehouse(order)
except Exception as e:
# Hata loglama ve alerting
print(f"Error processing order.created: {e}")
await send_alert(f"Webhook processing failed: {e}")
Webhook retry ve error handling mekanizması
Retry Logic ve Hata Yönetimi
Webhook işleme sırasında hatalar oluşabilir. Production sistemlerinde robust retry mekanizması şarttır:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import asyncio
from functools import wraps
from typing import Callable
import aioredis
class RetryConfig:
"""Retry stratejisi konfigürasyonu"""
MAX_RETRIES = 5
INITIAL_DELAY = 1 # saniye
MAX_DELAY = 300 # 5 dakika
BACKOFF_FACTOR = 2 # Exponential backoff
def exponential_backoff_retry(
max_retries: int = RetryConfig.MAX_RETRIES,
initial_delay: int = RetryConfig.INITIAL_DELAY,
max_delay: int = RetryConfig.MAX_DELAY,
backoff_factor: int = RetryConfig.BACKOFF_FACTOR
):
"""
Exponential backoff retry decorator
Retry delay'leri: 1s, 2s, 4s, 8s, 16s...
Her başarısız denemeden sonra bekleme süresi katlanarak artar.
"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == max_retries - 1:
# Son deneme de başarısız
raise
# Delay hesapla (exponential backoff)
delay = min(
initial_delay * (backoff_factor ** attempt),
max_delay
)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
@exponential_backoff_retry(max_retries=5)
async def process_webhook_with_retry(payload: WebhookPayload):
"""
Retry mekanizmasıyla webhook işleme
Geçici hatalar (network, rate limit) için otomatik retry yapar.
"""
# Harici API çağrısı (başarısız olabilir)
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.partner.com/notify",
json=payload.dict(),
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response.raise_for_status()
return await response.json()
# Dead Letter Queue (DLQ) için
class DeadLetterQueue:
"""
Tekrar tekrar başarısız olan webhook'ları sakla
DLQ, manual investigation veya delayed retry için kullanılır.
"""
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
async def add(self, payload: WebhookPayload, error: str):
"""Başarısız webhook'u DLQ'ya ekle"""
dlq_item = {
"payload": payload.dict(),
"error": error,
"failed_at": datetime.utcnow().isoformat(),
"retry_count": payload.dict().get("retry_count", 0)
}
# Redis sorted set'e ekle (timestamp'e göre sıralı)
await self.redis.zadd(
"webhook:dlq",
{json.dumps(dlq_item): datetime.utcnow().timestamp()}
)
# Alert gönder
await send_dlq_alert(payload.event_id, error)
async def get_failed_webhooks(self, limit: int = 100):
"""DLQ'daki başarısız webhook'ları getir"""
items = await self.redis.zrange("webhook:dlq", 0, limit - 1)
return [json.loads(item) for item in items]
async def retry_dlq_item(self, item_json: str):
"""DLQ'daki bir item'ı tekrar dene"""
item = json.loads(item_json)
payload = WebhookPayload(**item["payload"])
try:
await process_webhook_with_retry(payload)
# Başarılı olursa DLQ'dan çıkar
await self.redis.zrem("webhook:dlq", item_json)
except Exception as e:
# Hala başarısız, retry_count'u artır
item["retry_count"] = item.get("retry_count", 0) + 1
await self.redis.zadd(
"webhook:dlq",
{json.dumps(item): datetime.utcnow().timestamp()}
)
# Kullanım
dlq = DeadLetterQueue("redis://localhost:6379")
@app.post("/webhooks/receive")
async def receive_webhook_with_dlq(
request: Request,
background_tasks: BackgroundTasks
):
"""DLQ entegrasyonlu webhook handler"""
# ... signature validation ve parsing ...
try:
await process_webhook_with_retry(payload)
except Exception as e:
# Tüm retry'lar başarısız, DLQ'ya gönder
await dlq.add(payload, str(e))
return {"status": "accepted", "event_id": payload.event_id}
# DLQ monitoring endpoint
@app.get("/admin/dlq")
async def get_dlq_status():
"""DLQ durumunu getir (admin endpoint)"""
failed_items = await dlq.get_failed_webhooks()
return {
"count": len(failed_items),
"items": failed_items
}
@app.post("/admin/dlq/{item_index}/retry")
async def retry_dlq_item(item_index: int):
"""DLQ'daki belirli bir item'ı tekrar dene"""
items = await dlq.get_failed_webhooks()
if item_index >= len(items):
raise HTTPException(status_code=404, detail="Item not found")
item = items[item_index]
await dlq.retry_dlq_item(json.dumps(item))
return {"status": "retry_scheduled"}
Async Queue ile Yüksek Throughput
Yüksek trafikli webhook sistemlerinde async queue kullanımı performans için kritiktir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import asyncio
from asyncio import Queue
from typing import List
import signal
class WebhookProcessor:
"""
Async queue tabanlı webhook işlemci
Multiple worker'larla paralel işleme sağlar.
"""
def __init__(self, num_workers: int = 10, queue_size: int = 1000):
self.queue: Queue[WebhookPayload] = Queue(maxsize=queue_size)
self.num_workers = num_workers
self.workers: List[asyncio.Task] = []
self.running = False
async def start(self):
"""Worker'ları başlat"""
self.running = True
self.workers = [
asyncio.create_task(self._worker(i))
for i in range(self.num_workers)
]
print(f"Started {self.num_workers} webhook workers")
async def stop(self):
"""Graceful shutdown"""
print("Stopping webhook processor...")
self.running = False
# Queue'daki kalan item'ları işle
await self.queue.join()
# Worker'ları durdur
for worker in self.workers:
worker.cancel()
await asyncio.gather(*self.workers, return_exceptions=True)
print("Webhook processor stopped")
async def enqueue(self, payload: WebhookPayload) -> bool:
"""
Webhook'u queue'ya ekle
Returns:
True if enqueued, False if queue is full
"""
try:
# Non-blocking put with timeout
await asyncio.wait_for(
self.queue.put(payload),
timeout=1.0
)
return True
except asyncio.TimeoutError:
# Queue full, webhook rejected
print(f"Queue full, rejecting webhook: {payload.event_id}")
return False
async def _worker(self, worker_id: int):
"""
Worker task - queue'dan webhook alıp işler
"""
print(f"Worker {worker_id} started")
while self.running:
try:
# Queue'dan al (timeout ile)
payload = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
try:
# Webhook'u işle
await self._process_with_timeout(payload, timeout=30)
except Exception as e:
print(f"Worker {worker_id} error: {e}")
# DLQ'ya gönder
await dlq.add(payload, str(e))
finally:
# Queue task'ını işaretle
self.queue.task_done()
except asyncio.TimeoutError:
# Queue'da item yok, devam et
continue
except asyncio.CancelledError:
print(f"Worker {worker_id} cancelled")
break
print(f"Worker {worker_id} stopped")
async def _process_with_timeout(
self,
payload: WebhookPayload,
timeout: int = 30
):
"""
Webhook'u timeout ile işle
Uzun süren işlemlerin sistem'i bloklamasını önler.
"""
try:
await asyncio.wait_for(
process_webhook_with_retry(payload),
timeout=timeout
)
except asyncio.TimeoutError:
raise Exception(f"Webhook processing timeout: {payload.event_id}")
async def get_stats(self) -> dict:
"""Queue ve worker istatistikleri"""
return {
"queue_size": self.queue.qsize(),
"queue_maxsize": self.queue.maxsize,
"num_workers": self.num_workers,
"active_workers": sum(1 for w in self.workers if not w.done())
}
# Global processor instance
processor = WebhookProcessor(num_workers=10, queue_size=1000)
@app.on_event("startup")
async def startup_event():
"""FastAPI startup - worker'ları başlat"""
await processor.start()
# Graceful shutdown için signal handler
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown())
)
@app.on_event("shutdown")
async def shutdown_event():
"""FastAPI shutdown - worker'ları durdur"""
await processor.stop()
async def shutdown():
"""Graceful shutdown handler"""
print("Received shutdown signal")
await processor.stop()
@app.post("/webhooks/receive")
async def receive_webhook_queued(request: Request):
"""
Queue-based webhook receiver
Webhook'u queue'ya ekler ve hızlıca döner.
Gerçek işlem background worker'larda yapılır.
"""
# Validation...
signature = request.headers.get("X-Webhook-Signature")
if not signature:
raise HTTPException(status_code=401, detail="Missing signature")
body = await request.body()
if not verify_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = WebhookPayload(**json.loads(body))
# Queue'ya ekle
enqueued = await processor.enqueue(payload)
if not enqueued:
# Queue full, rate limit response
raise HTTPException(
status_code=429,
detail="Queue full, please retry later",
headers={"Retry-After": "60"}
)
return {
"status": "queued",
"event_id": payload.event_id,
"queue_position": processor.queue.qsize()
}
@app.get("/health/processor")
async def processor_health():
"""Processor health check endpoint"""
stats = await processor.get_stats()
# Queue %80 doluysa warning
queue_usage = stats["queue_size"] / stats["queue_maxsize"]
status = "healthy"
if queue_usage > 0.8:
status = "warning"
if queue_usage >= 1.0:
status = "critical"
return {
"status": status,
"stats": stats,
"queue_usage_percent": round(queue_usage * 100, 2)
}
Idempotency pattern ile duplicate webhook handling
Idempotency - Aynı Webhook’u Tekrar İşlememe
Webhook provider’lar bazen aynı eventi birden fazla gönderebilir (network retry, bug, vb.). Sisteminiz idempotent olmalıdır:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import aioredis
from datetime import timedelta
import hashlib
class IdempotencyManager:
"""
Webhook idempotency yönetimi
Aynı event_id'nin birden fazla işlenmesini önler.
"""
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
self.ttl = timedelta(days=7) # 7 gün cache
def _generate_key(self, event_id: str) -> str:
"""Idempotency key oluştur"""
return f"webhook:idempotency:{event_id}"
async def is_processed(self, event_id: str) -> bool:
"""Event daha önce işlendi mi?"""
key = self._generate_key(event_id)
exists = await self.redis.exists(key)
return bool(exists)
async def mark_processed(
self,
event_id: str,
result: dict = None
):
"""Event'i işlenmiş olarak işaretle"""
key = self._generate_key(event_id)
value = {
"processed_at": datetime.utcnow().isoformat(),
"result": result or {}
}
# TTL ile sakla (7 gün sonra otomatik silinir)
await self.redis.setex(
key,
self.ttl,
json.dumps(value)
)
async def get_result(self, event_id: str) -> dict:
"""Daha önce işlenen event'in sonucunu getir"""
key = self._generate_key(event_id)
data = await self.redis.get(key)
if data:
return json.loads(data)
return None
async def cleanup_old_entries(self):
"""
TTL expired olmayan eski entry'leri temizle
Background task olarak periyodik çalıştırılır.
"""
# Redis'te TTL otomatik çalışıyor,
# bu method ek temizlik için kullanılabilir
cursor = 0
pattern = "webhook:idempotency:*"
while True:
cursor, keys = await self.redis.scan(
cursor,
match=pattern,
count=100
)
for key in keys:
ttl = await self.redis.ttl(key)
if ttl < 0: # TTL yoksa
await self.redis.delete(key)
if cursor == 0:
break
# Global idempotency manager
idempotency = IdempotencyManager("redis://localhost:6379")
@app.post("/webhooks/receive")
async def receive_webhook_idempotent(request: Request):
"""
Idempotent webhook receiver
Aynı event_id'yi birden fazla işlemez.
"""
# Validation...
body = await request.body()
payload = WebhookPayload(**json.loads(body))
# Idempotency check
if await idempotency.is_processed(payload.event_id):
# Daha önce işlenmiş, cached result dön
cached_result = await idempotency.get_result(payload.event_id)
return {
"status": "already_processed",
"event_id": payload.event_id,
"processed_at": cached_result["processed_at"],
"result": cached_result["result"]
}
# İlk defa işleniyor
try:
result = await process_webhook_with_retry(payload)
# Başarıyla işlendi, mark et
await idempotency.mark_processed(payload.event_id, result)
return {
"status": "processed",
"event_id": payload.event_id,
"result": result
}
except Exception as e:
# Hata durumunda mark etme!
# Retry'da tekrar deneyebilsin
raise
# Content-based idempotency (event_id yoksa)
def generate_content_hash(payload: dict) -> str:
"""
Payload içeriğinden hash üret
event_id olmayan durumlarda kullanılır.
"""
# Timestamp hariç tüm field'ları hashle
payload_copy = payload.copy()
payload_copy.pop('timestamp', None)
content = json.dumps(payload_copy, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
@app.post("/webhooks/receive-content-hash")
async def receive_webhook_content_hash(request: Request):
"""
Content-based idempotency
event_id olmayan webhook'lar için.
"""
body = await request.body()
payload_dict = json.loads(body)
# Content hash'i idempotency key olarak kullan
content_hash = generate_content_hash(payload_dict)
if await idempotency.is_processed(content_hash):
cached_result = await idempotency.get_result(content_hash)
return {
"status": "duplicate_detected",
"content_hash": content_hash,
"processed_at": cached_result["processed_at"]
}
# Process...
payload = WebhookPayload(**payload_dict)
result = await process_webhook_with_retry(payload)
await idempotency.mark_processed(content_hash, result)
return {"status": "processed", "content_hash": content_hash}
Rate Limiting ve Backpressure
Webhook endpoint’lerinizi DDoS ve abuse’den korumak için rate limiting şarttır:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.requests import Request
# Rate limiter
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/webhooks/receive")
@limiter.limit("100/minute") # Dakikada 100 webhook
async def receive_webhook_rate_limited(request: Request):
"""Rate-limited webhook endpoint"""
# ... webhook processing ...
pass
# IP-based rate limiting
class IPRateLimiter:
"""
Redis-based IP rate limiter
Distributed rate limiting için.
"""
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
async def is_allowed(
self,
ip: str,
limit: int = 100,
window: int = 60
) -> bool:
"""
IP'nin rate limit'e takılıp takılmadığını kontrol et
Args:
ip: Client IP
limit: Max request count
window: Time window (seconds)
"""
key = f"rate_limit:{ip}"
# Current count
current = await self.redis.get(key)
if current is None:
# İlk request
await self.redis.setex(key, window, 1)
return True
current = int(current)
if current >= limit:
# Limit aşıldı
return False
# Counter'ı artır
await self.redis.incr(key)
return True
async def get_remaining(self, ip: str, limit: int = 100) -> int:
"""Kalan request quota'sını getir"""
key = f"rate_limit:{ip}"
current = await self.redis.get(key)
if current is None:
return limit
return max(0, limit - int(current))
rate_limiter = IPRateLimiter("redis://localhost:6379")
@app.post("/webhooks/receive")
async def receive_webhook_ip_limited(request: Request):
"""IP-based rate limiting"""
client_ip = request.client.host
# Rate limit check
if not await rate_limiter.is_allowed(client_ip, limit=100, window=60):
remaining = await rate_limiter.get_remaining(client_ip, limit=100)
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={
"X-RateLimit-Limit": "100",
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(60),
"Retry-After": "60"
}
)
# Process webhook...
return {"status": "accepted"}
# Backpressure management
@app.post("/webhooks/receive")
async def receive_webhook_with_backpressure(request: Request):
"""
Backpressure management
System overload durumunda 503 döner.
"""
# Queue capacity check
stats = await processor.get_stats()
queue_usage = stats["queue_size"] / stats["queue_maxsize"]
if queue_usage > 0.95: # Queue %95 dolu
raise HTTPException(
status_code=503,
detail="Service temporarily overloaded",
headers={"Retry-After": "120"}
)
# Process...
return {"status": "accepted"}
Webhook Gönderme (Outbound Webhooks)
Sisteminiz başka servislere webhook gönderecekse:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import aiohttp
from typing import List, Optional
from enum import Enum
class WebhookSubscription(BaseModel):
"""Webhook subscription modeli"""
id: str
url: str
secret: str
events: List[str] # Subscribe edilen event'ler
active: bool = True
retry_config: Optional[dict] = None
class WebhookDeliveryStatus(str, Enum):
"""Delivery status enum"""
PENDING = "pending"
DELIVERED = "delivered"
FAILED = "failed"
RETRYING = "retrying"
class OutboundWebhookSender:
"""
Webhook gönderme servisi
Retry, rate limiting ve delivery tracking ile.
"""
def __init__(self):
self.session: Optional[aiohttp.ClientSession] = None
async def start(self):
"""HTTP session başlat"""
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10),
connector=aiohttp.TCPConnector(limit=100)
)
async def stop(self):
"""HTTP session kapat"""
if self.session:
await self.session.close()
def _generate_signature(self, body: bytes, secret: str) -> str:
"""Webhook signature oluştur"""
return hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
async def send_webhook(
self,
subscription: WebhookSubscription,
event_type: str,
payload: dict,
max_retries: int = 3
) -> dict:
"""
Tek bir subscription'a webhook gönder
Returns:
Delivery result
"""
if not subscription.active:
return {
"status": WebhookDeliveryStatus.FAILED,
"reason": "Subscription inactive"
}
if event_type not in subscription.events:
return {
"status": WebhookDeliveryStatus.FAILED,
"reason": f"Not subscribed to {event_type}"
}
# Payload hazırla
webhook_payload = {
"event_type": event_type,
"event_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"data": payload
}
body = json.dumps(webhook_payload).encode()
signature = self._generate_signature(body, subscription.secret)
# Headers
headers = {
"Content-Type": "application/json",
"X-Webhook-Signature": signature,
"X-Webhook-Event": event_type,
"User-Agent": "YourApp-Webhook/1.0"
}
# Retry loop
for attempt in range(max_retries):
try:
async with self.session.post(
subscription.url,
data=body,
headers=headers
) as response:
# 2xx = success
if 200 <= response.status < 300:
return {
"status": WebhookDeliveryStatus.DELIVERED,
"status_code": response.status,
"attempt": attempt + 1
}
# 4xx = client error, don't retry
if 400 <= response.status < 500:
return {
"status": WebhookDeliveryStatus.FAILED,
"status_code": response.status,
"reason": f"Client error: {response.status}",
"attempt": attempt + 1
}
# 5xx = server error, retry
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
return {
"status": WebhookDeliveryStatus.FAILED,
"status_code": response.status,
"reason": f"Server error after {max_retries} attempts",
"attempt": attempt + 1
}
except asyncio.TimeoutError:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return {
"status": WebhookDeliveryStatus.FAILED,
"reason": "Timeout",
"attempt": attempt + 1
}
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return {
"status": WebhookDeliveryStatus.FAILED,
"reason": str(e),
"attempt": attempt + 1
}
return {
"status": WebhookDeliveryStatus.FAILED,
"reason": "Max retries exceeded"
}
async def broadcast_webhook(
self,
event_type: str,
payload: dict,
subscriptions: List[WebhookSubscription]
) -> List[dict]:
"""
Tüm subscriber'lara webhook broadcast et
Parallel gönderim yapar.
"""
tasks = [
self.send_webhook(sub, event_type, payload)
for sub in subscriptions
if sub.active and event_type in sub.events
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
{
"subscription_id": sub.id,
"result": result if not isinstance(result, Exception) else {
"status": WebhookDeliveryStatus.FAILED,
"reason": str(result)
}
}
for sub, result in zip(subscriptions, results)
]
# Webhook sender instance
webhook_sender = OutboundWebhookSender()
@app.on_event("startup")
async def startup_webhook_sender():
await webhook_sender.start()
@app.on_event("shutdown")
async def shutdown_webhook_sender():
await webhook_sender.stop()
# Event trigger endpoint
@app.post("/events/trigger")
async def trigger_event(
event_type: str,
payload: dict,
background_tasks: BackgroundTasks
):
"""
Event trigger eder ve webhook'ları gönderir
Örnek: Sipariş tamamlandığında webhook gönder
"""
# Subscriptions'ları getir (DB'den)
subscriptions = await get_active_subscriptions(event_type)
# Background'da webhook'ları gönder
background_tasks.add_task(
webhook_sender.broadcast_webhook,
event_type,
payload,
subscriptions
)
return {
"status": "event_triggered",
"event_type": event_type,
"subscriber_count": len(subscriptions)
}
# Subscription management endpoints
@app.post("/webhooks/subscriptions")
async def create_subscription(subscription: WebhookSubscription):
"""Yeni webhook subscription oluştur"""
# DB'ye kaydet
await save_subscription(subscription)
return {"status": "created", "subscription_id": subscription.id}
@app.delete("/webhooks/subscriptions/{subscription_id}")
async def delete_subscription(subscription_id: str):
"""Webhook subscription'ı sil"""
await remove_subscription(subscription_id)
return {"status": "deleted"}
@app.post("/webhooks/subscriptions/{subscription_id}/test")
async def test_subscription(subscription_id: str):
"""
Subscription'ı test et
Test webhook'u gönderir.
"""
subscription = await get_subscription(subscription_id)
result = await webhook_sender.send_webhook(
subscription,
"test.event",
{"message": "This is a test webhook"}
)
return result
Monitoring ve Observability
Production webhook sistemlerinde monitoring kritiktir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from prometheus_client import Counter, Histogram, Gauge
import time
# Prometheus metrics
webhook_received_total = Counter(
'webhook_received_total',
'Total webhooks received',
['event_type', 'status']
)
webhook_processing_duration = Histogram(
'webhook_processing_duration_seconds',
'Webhook processing duration',
['event_type']
)
webhook_queue_size = Gauge(
'webhook_queue_size',
'Current webhook queue size'
)
webhook_dlq_size = Gauge(
'webhook_dlq_size',
'Dead letter queue size'
)
@app.post("/webhooks/receive")
async def receive_webhook_monitored(request: Request):
"""Monitoring entegrasyonlu webhook handler"""
start_time = time.time()
try:
# Parse payload
body = await request.body()
payload = WebhookPayload(**json.loads(body))
# Process
await processor.enqueue(payload)
# Metrics
webhook_received_total.labels(
event_type=payload.event_type,
status='success'
).inc()
processing_time = time.time() - start_time
webhook_processing_duration.labels(
event_type=payload.event_type
).observe(processing_time)
# Queue metrics
stats = await processor.get_stats()
webhook_queue_size.set(stats["queue_size"])
return {"status": "accepted"}
except Exception as e:
webhook_received_total.labels(
event_type='unknown',
status='error'
).inc()
raise
# Metrics endpoint
from prometheus_client import generate_latest
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return Response(
generate_latest(),
media_type="text/plain"
)
# Health check
@app.get("/health")
async def health_check():
"""Comprehensive health check"""
try:
# Redis check
await idempotency.redis.ping()
redis_healthy = True
except:
redis_healthy = False
# Queue check
stats = await processor.get_stats()
queue_healthy = stats["queue_size"] < stats["queue_maxsize"] * 0.9
# DLQ check
dlq_items = await dlq.get_failed_webhooks(limit=1)
dlq_count = len(dlq_items)
overall_healthy = redis_healthy and queue_healthy
return {
"status": "healthy" if overall_healthy else "unhealthy",
"components": {
"redis": "ok" if redis_healthy else "error",
"queue": "ok" if queue_healthy else "warning",
"dlq_count": dlq_count
},
"stats": stats
}
# Structured logging
import structlog
logger = structlog.get_logger()
@app.post("/webhooks/receive")
async def receive_webhook_logged(request: Request):
"""Structured logging ile webhook handler"""
body = await request.body()
payload = WebhookPayload(**json.loads(body))
logger.info(
"webhook_received",
event_type=payload.event_type,
event_id=payload.event_id,
client_ip=request.client.host
)
try:
await processor.enqueue(payload)
logger.info(
"webhook_queued",
event_id=payload.event_id,
queue_size=processor.queue.qsize()
)
except Exception as e:
logger.error(
"webhook_error",
event_id=payload.event_id,
error=str(e),
exc_info=True
)
raise
return {"status": "accepted"}
Best Practices ve Güvenlik
Güvenlik Checklist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# 1. HTTPS Zorunlu
@app.middleware("http")
async def enforce_https(request: Request, call_next):
"""Production'da HTTPS zorunlu tut"""
if not request.url.scheme == "https" and not request.url.hostname == "localhost":
return Response("HTTPS required", status_code=403)
return await call_next(request)
# 2. Signature Verification (Zorunlu!)
# Her webhook'ta signature doğrula
# 3. IP Whitelist
ALLOWED_IPS = ["192.168.1.100", "10.0.0.50"] # Partner IP'leri
@app.middleware("http")
async def ip_whitelist(request: Request, call_next):
"""Sadece bilinen IP'lerden webhook kabul et"""
client_ip = request.client.host
if request.url.path.startswith("/webhooks/"):
if client_ip not in ALLOWED_IPS:
logger.warning(
"webhook_blocked",
ip=client_ip,
path=request.url.path
)
return Response("Forbidden", status_code=403)
return await call_next(request)
# 4. Request Size Limit
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
class RequestSizeLimitMiddleware(BaseHTTPMiddleware):
"""Request body size'ı sınırla"""
def __init__(self, app, max_size: int = 1_000_000): # 1MB
super().__init__(app)
self.max_size = max_size
async def dispatch(self, request: Request, call_next):
content_length = request.headers.get("content-length")
if content_length and int(content_length) > self.max_size:
return Response(
"Request body too large",
status_code=413
)
return await call_next(request)
app.add_middleware(RequestSizeLimitMiddleware, max_size=1_000_000)
# 5. Timeout Protection
# Her webhook işlemi için timeout belirle (30s)
# 6. Secrets Management
import os
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET")
if not WEBHOOK_SECRET:
raise RuntimeError("WEBHOOK_SECRET environment variable not set")
# 7. Audit Logging
async def audit_log(event_id: str, action: str, details: dict):
"""Webhook işlemlerini audit log'a kaydet"""
logger.info(
"webhook_audit",
event_id=event_id,
action=action,
details=details,
timestamp=datetime.utcnow().isoformat()
)
Örnek: Tam Production Setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# main.py
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from contextlib import asynccontextmanager
import signal
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan management"""
# Startup
print("Starting webhook service...")
await processor.start()
await webhook_sender.start()
# Setup graceful shutdown
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(graceful_shutdown())
)
yield
# Shutdown
print("Shutting down webhook service...")
await processor.stop()
await webhook_sender.stop()
app = FastAPI(
title="Production Webhook Service",
version="1.0.0",
lifespan=lifespan
)
# Middleware
app.add_middleware(RequestSizeLimitMiddleware, max_size=1_000_000)
@app.post("/webhooks/{provider}")
async def receive_webhook_multi_provider(
provider: str,
request: Request,
background_tasks: BackgroundTasks
):
"""
Multi-provider webhook endpoint
/webhooks/stripe
/webhooks/shopify
/webhooks/github
"""
# Provider-specific validation
if provider == "stripe":
signature = request.headers.get("stripe-signature")
# Stripe-specific validation
elif provider == "shopify":
signature = request.headers.get("x-shopify-hmac-sha256")
# Shopify-specific validation
else:
raise HTTPException(status_code=404, detail="Unknown provider")
# Common processing
body = await request.body()
payload = WebhookPayload(**json.loads(body))
# Idempotency check
if await idempotency.is_processed(payload.event_id):
return {"status": "duplicate"}
# Queue for processing
await processor.enqueue(payload)
await idempotency.mark_processed(payload.event_id)
return {"status": "accepted", "event_id": payload.event_id}
async def graceful_shutdown():
"""Graceful shutdown handler"""
print("Graceful shutdown initiated...")
# Yeni webhook'ları reddet
app.state.accepting_webhooks = False
# Queue'daki işleri bitir
await processor.stop()
# Bağlantıları kapat
await webhook_sender.stop()
print("Graceful shutdown complete")
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
workers=4,
log_level="info",
access_log=True
)
Sonuç
Production-ready bir webhook sistemi geliştirmek için:
- Signature Verification: Her webhook’u kriptografik olarak doğrula
- Idempotency: Aynı eventi birden fazla işleme
- Retry Logic: Exponential backoff ile akıllı retry
- Queue Management: Async queue ile yüksek throughput
- DLQ: Başarısız webhook’ları sakla ve yönet
- Rate Limiting: Abuse’den korun
- Monitoring: Prometheus metrics ve structured logging
- Security: HTTPS, IP whitelist, size limit
- Graceful Shutdown: Hiçbir webhook kaybetme
Webhook’lar event-driven mimaride kritik rol oynar. Doğru implement edildiğinde, sistemler arası gerçek zamanlı, güvenilir ve ölçeklenebilir iletişim sağlar.
Kaynaklar:
