Gönderi

gRPC ile Yüksek Performanslı API'ler

gRPC ve Protocol Buffers ile yüksek performanslı API geliştirme. Python implementasyonu, streaming, interceptors, load balancing ve microservice best practices.

gRPC ile Yüksek Performanslı API'ler

Modern mikroservis mimarilerinde servisler arası iletişim kritik bir rol oynar. REST API’ler uzun yıllardır standart olsa da, gRPC (Google Remote Procedure Call) yüksek performans, düşük latency ve güçlü type-safety özellikleriyle öne çıkıyor. Bu yazıda, gRPC’nin temellerini, Protocol Buffers’ı, Python ile implementasyonunu ve best practice’leri ele alacağız.

gRPC Nedir ve Neden Önemli?

gRPC, Google tarafından geliştirilen ve HTTP/2 üzerinde çalışan modern bir RPC (Remote Procedure Call) framework’üdür. REST API’lere alternatif olarak, özellikle mikroservis mimarilerinde tercih edilir.

gRPC’nin Temel Avantajları

  • Yüksek Performans: Binary serialization (Protocol Buffers) sayesinde JSON’dan 5-10x daha hızlı
  • HTTP/2: Multiplexing, server push, header compression gibi özellikler
  • Streaming: Unary, server streaming, client streaming, bidirectional streaming desteği
  • Type-Safety: Protocol Buffers ile güçlü tip kontrolü
  • Code Generation: Otomatik client ve server kodu üretimi
  • Çoklu Dil Desteği: 10+ programlama dili için resmi destek

gRPC vs REST Performans Karşılaştırması

gRPC vs REST: Karşılaştırma

ÖzellikgRPCREST
ProtocolHTTP/2HTTP/1.1
Payload FormatProtocol Buffers (binary)JSON (text)
StreamingBidirectionalSınırlı
PerformanceYüksekOrta
Browser SupportSınırlı (gRPC-Web gerekli)Tam
Human ReadableHayırEvet
Code GenerationOtomatikManuel/3rd party

Protocol Buffers: gRPC’nin Kalbindeki Teknoloji

Protocol Buffers (protobuf), Google’ın geliştirdiği language-agnostic binary serialization formatıdır.

Protocol Buffers ve gRPC İlişkisi

.proto Dosyası Tanımlama

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// user.proto
syntax = "proto3";

package user;

// User message tanımı
message User {
  int32 id = 1;
  string username = 2;
  string email = 3;
  bool is_active = 4;
  repeated string roles = 5;  // repeated = list/array
  google.protobuf.Timestamp created_at = 6;
}

// Request/Response mesajları
message GetUserRequest {
  int32 user_id = 1;
}

message GetUserResponse {
  User user = 1;
  string message = 2;
}

message CreateUserRequest {
  string username = 1;
  string email = 2;
  string password = 3;
}

message CreateUserResponse {
  User user = 1;
  bool success = 2;
}

message ListUsersRequest {
  int32 page = 1;
  int32 page_size = 2;
  string filter = 3;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total_count = 2;
  int32 page = 3;
}

// Service tanımı
service UserService {
  // Unary RPC - tek request, tek response
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  
  // Unary RPC - kullanıcı oluşturma
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
  
  // Server streaming - sunucu birden fazla response gönderir
  rpc ListUsers(ListUsersRequest) returns (stream User);
  
  // Client streaming - client birden fazla request gönderir
  rpc BulkCreateUsers(stream CreateUserRequest) returns (CreateUserResponse);
  
  // Bidirectional streaming - hem client hem server stream
  rpc ChatWithSupport(stream ChatMessage) returns (stream ChatMessage);
}

// Chat mesajı için ek message
message ChatMessage {
  int32 user_id = 1;
  string message = 2;
  google.protobuf.Timestamp timestamp = 3;
}

Protocol Buffers Data Types

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
syntax = "proto3";

message DataTypes {
  // Sayılar
  int32 age = 1;           // 32-bit integer
  int64 big_number = 2;    // 64-bit integer
  uint32 count = 3;        // unsigned 32-bit
  float price = 4;         // 32-bit float
  double precise = 5;      // 64-bit double
  
  // String ve bytes
  string name = 6;         // UTF-8 string
  bytes data = 7;          // arbitrary bytes
  
  // Boolean
  bool is_active = 8;
  
  // Enum
  Status status = 9;
  
  // Nested message
  Address address = 10;
  
  // Repeated (list/array)
  repeated string tags = 11;
  
  // Map
  map<string, int32> scores = 12;
  
  // Oneof - birden fazla alan aynı anda kullanılamaz
  oneof payment_method {
    CreditCard credit_card = 13;
    PayPal paypal = 14;
  }
}

enum Status {
  UNKNOWN = 0;  // Default değer 0 olmalı
  ACTIVE = 1;
  INACTIVE = 2;
  SUSPENDED = 3;
}

message Address {
  string street = 1;
  string city = 2;
  string country = 3;
  string zip_code = 4;
}

message CreditCard {
  string number = 1;
  string cvv = 2;
}

message PayPal {
  string email = 1;
}

Python ile gRPC Server Implementasyonu

Kurulum ve Proto Derleme

1
2
3
4
5
6
7
8
9
10
11
12
13
# gRPC ve protobuf kurulumu
pip install grpcio grpcio-tools

# Proto dosyasını derle (Python kodu oluştur)
python -m grpc_tools.protoc \
    -I. \
    --python_out=. \
    --grpc_python_out=. \
    user.proto

# Oluşturulan dosyalar:
# user_pb2.py - message tanımları
# user_pb2_grpc.py - service tanımları

gRPC Server Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# server.py
import grpc
from concurrent import futures
import time
from datetime import datetime
import user_pb2
import user_pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp

# In-memory veritabanı (örnek için)
users_db = {}
user_id_counter = 1

class UserServiceServicer(user_pb2_grpc.UserServiceServicer):
    """UserService implementasyonu"""
    
    def GetUser(self, request, context):
        """Tek kullanıcı getir - Unary RPC"""
        user_id = request.user_id
        
        if user_id not in users_db:
            # gRPC error döndür
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f'User with ID {user_id} not found')
            return user_pb2.GetUserResponse()
        
        user = users_db[user_id]
        return user_pb2.GetUserResponse(
            user=user,
            message="User retrieved successfully"
        )
    
    def CreateUser(self, request, context):
        """Kullanıcı oluştur - Unary RPC"""
        global user_id_counter
        
        # Validasyon
        if not request.username or not request.email:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details('Username and email are required')
            return user_pb2.CreateUserResponse(success=False)
        
        # Email kontrolü
        for user in users_db.values():
            if user.email == request.email:
                context.set_code(grpc.StatusCode.ALREADY_EXISTS)
                context.set_details('Email already exists')
                return user_pb2.CreateUserResponse(success=False)
        
        # Timestamp oluştur
        timestamp = Timestamp()
        timestamp.GetCurrentTime()
        
        # User oluştur
        user = user_pb2.User(
            id=user_id_counter,
            username=request.username,
            email=request.email,
            is_active=True,
            roles=['user'],
            created_at=timestamp
        )
        
        users_db[user_id_counter] = user
        user_id_counter += 1
        
        return user_pb2.CreateUserResponse(
            user=user,
            success=True
        )
    
    def ListUsers(self, request, context):
        """Kullanıcı listesi - Server Streaming"""
        page = request.page if request.page > 0 else 1
        page_size = request.page_size if request.page_size > 0 else 10
        filter_text = request.filter.lower()
        
        # Filtreleme
        filtered_users = []
        for user in users_db.values():
            if not filter_text or \
               filter_text in user.username.lower() or \
               filter_text in user.email.lower():
                filtered_users.append(user)
        
        # Pagination
        start_idx = (page - 1) * page_size
        end_idx = start_idx + page_size
        page_users = filtered_users[start_idx:end_idx]
        
        # Stream olarak kullanıcıları gönder
        for user in page_users:
            yield user
            time.sleep(0.1)  # Simulated delay
    
    def BulkCreateUsers(self, request_iterator, context):
        """Toplu kullanıcı oluşturma - Client Streaming"""
        global user_id_counter
        created_count = 0
        last_user = None
        
        for create_request in request_iterator:
            # Her gelen request için kullanıcı oluştur
            timestamp = Timestamp()
            timestamp.GetCurrentTime()
            
            user = user_pb2.User(
                id=user_id_counter,
                username=create_request.username,
                email=create_request.email,
                is_active=True,
                roles=['user'],
                created_at=timestamp
            )
            
            users_db[user_id_counter] = user
            user_id_counter += 1
            created_count += 1
            last_user = user
        
        return user_pb2.CreateUserResponse(
            user=last_user,
            success=True
        )
    
    def ChatWithSupport(self, request_iterator, context):
        """Çift yönlü chat - Bidirectional Streaming"""
        for message in request_iterator:
            # Gelen mesajı al
            print(f"Received: {message.message} from user {message.user_id}")
            
            # Otomatik cevap gönder
            timestamp = Timestamp()
            timestamp.GetCurrentTime()
            
            response = user_pb2.ChatMessage(
                user_id=0,  # System user
                message=f"Echo: {message.message}",
                timestamp=timestamp
            )
            
            yield response
            time.sleep(0.5)  # Simulated processing time

def serve():
    """gRPC sunucusunu başlat"""
    # Thread pool oluştur
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
    # Servicer'ı ekle
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(), server
    )
    
    # Port dinle
    port = '50051'
    server.add_insecure_port(f'[::]:{port}')
    
    print(f"gRPC Server starting on port {port}...")
    server.start()
    
    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        print("\nShutting down server...")
        server.stop(0)

if __name__ == '__main__':
    serve()

gRPC Client Implementasyonu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# client.py
import grpc
import user_pb2
import user_pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp

def run_unary_example(stub):
    """Unary RPC örneği"""
    print("\n=== Unary RPC Example ===")
    
    # Kullanıcı oluştur
    create_request = user_pb2.CreateUserRequest(
        username="john_doe",
        email="[email protected]",
        password="secure123"
    )
    
    try:
        response = stub.CreateUser(create_request)
        print(f"User created: {response.user.username} (ID: {response.user.id})")
    except grpc.RpcError as e:
        print(f"Error: {e.code()} - {e.details()}")
        return None
    
    # Kullanıcıyı getir
    get_request = user_pb2.GetUserRequest(user_id=response.user.id)
    
    try:
        get_response = stub.GetUser(get_request)
        print(f"Retrieved user: {get_response.user.username}")
        print(f"Message: {get_response.message}")
        return get_response.user.id
    except grpc.RpcError as e:
        print(f"Error: {e.code()} - {e.details()}")
        return None

def run_server_streaming_example(stub):
    """Server Streaming örneği"""
    print("\n=== Server Streaming Example ===")
    
    request = user_pb2.ListUsersRequest(
        page=1,
        page_size=5,
        filter=""
    )
    
    try:
        # Server'dan stream olarak kullanıcıları al
        user_stream = stub.ListUsers(request)
        
        print("Receiving users stream:")
        for user in user_stream:
            print(f"  - {user.username} ({user.email})")
    except grpc.RpcError as e:
        print(f"Error: {e.code()} - {e.details()}")

def run_client_streaming_example(stub):
    """Client Streaming örneği"""
    print("\n=== Client Streaming Example ===")
    
    def generate_users():
        """Kullanıcı stream'i oluştur"""
        users_to_create = [
            {"username": "alice", "email": "[email protected]", "password": "pass1"},
            {"username": "bob", "email": "[email protected]", "password": "pass2"},
            {"username": "charlie", "email": "[email protected]", "password": "pass3"},
        ]
        
        for user_data in users_to_create:
            yield user_pb2.CreateUserRequest(**user_data)
            print(f"Sending: {user_data['username']}")
    
    try:
        response = stub.BulkCreateUsers(generate_users())
        print(f"Bulk creation completed. Last user: {response.user.username}")
    except grpc.RpcError as e:
        print(f"Error: {e.code()} - {e.details()}")

def run_bidirectional_streaming_example(stub):
    """Bidirectional Streaming örneği"""
    print("\n=== Bidirectional Streaming Example ===")
    
    def generate_messages():
        """Chat mesajları oluştur"""
        messages = ["Hello", "How are you?", "Need help with gRPC"]
        
        for msg in messages:
            timestamp = Timestamp()
            timestamp.GetCurrentTime()
            
            yield user_pb2.ChatMessage(
                user_id=1,
                message=msg,
                timestamp=timestamp
            )
            print(f"Sent: {msg}")
    
    try:
        # Bidirectional stream başlat
        responses = stub.ChatWithSupport(generate_messages())
        
        print("\nReceiving responses:")
        for response in responses:
            print(f"  Support: {response.message}")
    except grpc.RpcError as e:
        print(f"Error: {e.code()} - {e.details()}")

def main():
    """Ana client fonksiyonu"""
    # Channel oluştur (bağlantı)
    channel = grpc.insecure_channel('localhost:50051')
    
    # Stub (client) oluştur
    stub = user_pb2_grpc.UserServiceStub(channel)
    
    try:
        # Unary RPC
        run_unary_example(stub)
        
        # Server Streaming
        run_server_streaming_example(stub)
        
        # Client Streaming
        run_client_streaming_example(stub)
        
        # Bidirectional Streaming
        run_bidirectional_streaming_example(stub)
        
    finally:
        channel.close()
        print("\nConnection closed.")

if __name__ == '__main__':
    main()

Async gRPC ile Yüksek Performans

AsyncIO ile non-blocking gRPC implementasyonu:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# async_server.py
import asyncio
import grpc
from grpc import aio
import user_pb2
import user_pb2_grpc

class AsyncUserServiceServicer(user_pb2_grpc.UserServiceServicer):
    """Async UserService implementasyonu"""
    
    async def GetUser(self, request, context):
        """Async kullanıcı getirme"""
        # Async veritabanı sorgusu (örnek)
        await asyncio.sleep(0.1)  # Simulated DB query
        
        user = user_pb2.User(
            id=request.user_id,
            username=f"user_{request.user_id}",
            email=f"user{request.user_id}@example.com",
            is_active=True,
            roles=['user']
        )
        
        return user_pb2.GetUserResponse(
            user=user,
            message="User retrieved"
        )
    
    async def ListUsers(self, request, context):
        """Async server streaming"""
        for i in range(10):
            # Async işlem
            await asyncio.sleep(0.1)
            
            user = user_pb2.User(
                id=i,
                username=f"user_{i}",
                email=f"user{i}@example.com",
                is_active=True,
                roles=['user']
            )
            
            yield user

async def serve():
    """Async gRPC server"""
    server = aio.server()
    
    user_pb2_grpc.add_UserServiceServicer_to_server(
        AsyncUserServiceServicer(), server
    )
    
    listen_addr = '[::]:50051'
    server.add_insecure_port(listen_addr)
    
    print(f"Async gRPC Server starting on {listen_addr}")
    await server.start()
    
    try:
        await server.wait_for_termination()
    except KeyboardInterrupt:
        print("\nShutting down...")
        await server.stop(0)

if __name__ == '__main__':
    asyncio.run(serve())

Async client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# async_client.py
import asyncio
import grpc
from grpc import aio
import user_pb2
import user_pb2_grpc

async def run_async_calls():
    """Async gRPC çağrıları"""
    async with aio.insecure_channel('localhost:50051') as channel:
        stub = user_pb2_grpc.UserServiceStub(channel)
        
        # Paralel unary çağrılar
        tasks = [
            stub.GetUser(user_pb2.GetUserRequest(user_id=i))
            for i in range(5)
        ]
        
        responses = await asyncio.gather(*tasks)
        
        print("Received users:")
        for response in responses:
            print(f"  - {response.user.username}")
        
        # Server streaming
        print("\nStreaming users:")
        request = user_pb2.ListUsersRequest(page=1, page_size=5)
        
        async for user in stub.ListUsers(request):
            print(f"  - {user.username}")

if __name__ == '__main__':
    asyncio.run(run_async_calls())

Interceptors: Middleware Benzeri İşlemler

gRPC interceptor’ları ile logging, authentication, rate limiting gibi cross-cutting concerns implement edebilirsiniz:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# interceptors.py
import grpc
import time
import logging
from grpc_interceptor import ServerInterceptor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LoggingInterceptor(ServerInterceptor):
    """Request/response loglama"""
    
    def intercept(self, method, request, context, method_name):
        """Her RPC çağrısında çalışır"""
        start_time = time.time()
        
        logger.info(f"RPC Started: {method_name}")
        logger.info(f"Request: {request}")
        
        try:
            # Gerçek RPC çağrısı
            response = method(request, context)
            
            elapsed = time.time() - start_time
            logger.info(f"RPC Completed: {method_name} ({elapsed:.3f}s)")
            logger.info(f"Response: {response}")
            
            return response
        except Exception as e:
            elapsed = time.time() - start_time
            logger.error(f"RPC Failed: {method_name} ({elapsed:.3f}s) - {str(e)}")
            raise

class AuthInterceptor(ServerInterceptor):
    """Authentication kontrolü"""
    
    def intercept(self, method, request, context, method_name):
        """Token kontrolü yap"""
        # Metadata'dan token al
        metadata = dict(context.invocation_metadata())
        token = metadata.get('authorization', '')
        
        # Token validation (örnek)
        if not self.validate_token(token):
            context.abort(
                grpc.StatusCode.UNAUTHENTICATED,
                'Invalid or missing token'
            )
        
        # Token geçerli, devam et
        return method(request, context)
    
    def validate_token(self, token):
        """Token doğrulama"""
        # Gerçek implementasyonda JWT validation yapılır
        return token.startswith('Bearer ')

class RateLimitInterceptor(ServerInterceptor):
    """Rate limiting"""
    
    def __init__(self, max_requests_per_minute=60):
        self.max_requests = max_requests_per_minute
        self.requests = {}  # IP -> (count, window_start)
    
    def intercept(self, method, request, context, method_name):
        """Rate limit kontrolü"""
        # Client IP al
        peer = context.peer()
        
        current_time = time.time()
        
        if peer not in self.requests:
            self.requests[peer] = (1, current_time)
        else:
            count, window_start = self.requests[peer]
            
            # 1 dakikalık pencere kontrolü
            if current_time - window_start > 60:
                # Yeni pencere başlat
                self.requests[peer] = (1, current_time)
            else:
                # Aynı penceredeyiz
                if count >= self.max_requests:
                    context.abort(
                        grpc.StatusCode.RESOURCE_EXHAUSTED,
                        'Rate limit exceeded'
                    )
                
                self.requests[peer] = (count + 1, window_start)
        
        return method(request, context)

# Server'a interceptor ekleme
def serve_with_interceptors():
    """Interceptor'lı server"""
    from grpc_interceptor.server import ServerInterceptor as GRPCInterceptor
    
    interceptors = [
        LoggingInterceptor(),
        AuthInterceptor(),
        RateLimitInterceptor(max_requests_per_minute=100)
    ]
    
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=interceptors
    )
    
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(), server
    )
    
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

Error Handling ve Best Practices

Proper Error Handling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def GetUser(self, request, context):
    """Doğru error handling"""
    try:
        user_id = request.user_id
        
        # Validasyon
        if user_id <= 0:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details('User ID must be positive')
            return user_pb2.GetUserResponse()
        
        # Kullanıcıyı bul
        user = database.get_user(user_id)
        
        if not user:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f'User {user_id} not found')
            return user_pb2.GetUserResponse()
        
        # Permission kontrolü
        if not has_permission(context, user):
            context.set_code(grpc.StatusCode.PERMISSION_DENIED)
            context.set_details('Insufficient permissions')
            return user_pb2.GetUserResponse()
        
        return user_pb2.GetUserResponse(user=user, message="Success")
        
    except DatabaseError as e:
        context.set_code(grpc.StatusCode.INTERNAL)
        context.set_details(f'Database error: {str(e)}')
        return user_pb2.GetUserResponse()
    
    except Exception as e:
        logger.exception("Unexpected error")
        context.set_code(grpc.StatusCode.UNKNOWN)
        context.set_details('Internal server error')
        return user_pb2.GetUserResponse()

gRPC Status Codes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Yaygın kullanılan status code'lar
COMMON_STATUS_CODES = {
    grpc.StatusCode.OK: "Success",
    grpc.StatusCode.CANCELLED: "Request cancelled",
    grpc.StatusCode.INVALID_ARGUMENT: "Invalid argument provided",
    grpc.StatusCode.DEADLINE_EXCEEDED: "Deadline exceeded",
    grpc.StatusCode.NOT_FOUND: "Resource not found",
    grpc.StatusCode.ALREADY_EXISTS: "Resource already exists",
    grpc.StatusCode.PERMISSION_DENIED: "Permission denied",
    grpc.StatusCode.RESOURCE_EXHAUSTED: "Resource exhausted (rate limit)",
    grpc.StatusCode.FAILED_PRECONDITION: "Failed precondition",
    grpc.StatusCode.UNAUTHENTICATED: "Authentication required",
    grpc.StatusCode.INTERNAL: "Internal server error",
    grpc.StatusCode.UNAVAILABLE: "Service unavailable",
}

Load Balancing ve Service Discovery

Client-Side Load Balancing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# load_balanced_client.py
import grpc

def create_load_balanced_channel():
    """Load balanced gRPC channel"""
    # DNS resolution ile load balancing
    # kubernetes gibi ortamlarda DNS round-robin
    options = [
        ('grpc.lb_policy_name', 'round_robin'),
        ('grpc.enable_retries', 1),
        ('grpc.keepalive_time_ms', 10000),
    ]
    
    # Service discovery DNS
    channel = grpc.insecure_channel(
        'user-service:50051',
        options=options
    )
    
    return channel

# Retry configuration
def create_channel_with_retry():
    """Retry politikası ile channel"""
    service_config = {
        'methodConfig': [{
            'name': [{'service': 'user.UserService'}],
            'retryPolicy': {
                'maxAttempts': 3,
                'initialBackoff': '0.1s',
                'maxBackoff': '1s',
                'backoffMultiplier': 2,
                'retryableStatusCodes': ['UNAVAILABLE', 'DEADLINE_EXCEEDED']
            }
        }]
    }
    
    channel = grpc.insecure_channel(
        'localhost:50051',
        options=[('grpc.service_config', json.dumps(service_config))]
    )
    
    return channel

Testing gRPC Services

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# test_user_service.py
import pytest
import grpc
from grpc_testing import server_from_dictionary, strict_real_time
import user_pb2
import user_pb2_grpc
from server import UserServiceServicer

@pytest.fixture
def grpc_server():
    """Test gRPC server fixture"""
    servicer = UserServiceServicer()
    descriptors_to_services = {
        user_pb2.DESCRIPTOR.services_by_name['UserService']: servicer
    }
    
    return server_from_dictionary(
        descriptors_to_services,
        strict_real_time()
    )

def test_create_user(grpc_server):
    """Kullanıcı oluşturma testi"""
    method = grpc_server.invoke_unary_unary(
        user_pb2.DESCRIPTOR.services_by_name['UserService'].methods_by_name['CreateUser'],
        invocation_metadata={},
        request=user_pb2.CreateUserRequest(
            username="testuser",
            email="[email protected]",
            password="pass123"
        ),
        timeout=1
    )
    
    response, metadata, code, details = method.termination()
    
    assert code == grpc.StatusCode.OK
    assert response.success is True
    assert response.user.username == "testuser"

def test_get_user_not_found(grpc_server):
    """Kullanıcı bulunamadı testi"""
    method = grpc_server.invoke_unary_unary(
        user_pb2.DESCRIPTOR.services_by_name['UserService'].methods_by_name['GetUser'],
        invocation_metadata={},
        request=user_pb2.GetUserRequest(user_id=999),
        timeout=1
    )
    
    response, metadata, code, details = method.termination()
    
    assert code == grpc.StatusCode.NOT_FOUND

Production Best Practices

1. TLS/SSL Güvenliği

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# secure_server.py
def serve_secure():
    """TLS ile güvenli server"""
    # SSL sertifikaları oku
    with open('server.key', 'rb') as f:
        private_key = f.read()
    
    with open('server.crt', 'rb') as f:
        certificate_chain = f.read()
    
    # Server credentials oluştur
    server_credentials = grpc.ssl_server_credentials(
        ((private_key, certificate_chain),)
    )
    
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(), server
    )
    
    # Güvenli port ekle
    server.add_secure_port('[::]:50051', server_credentials)
    server.start()
    server.wait_for_termination()

2. Health Check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// health.proto
syntax = "proto3";

service Health {
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}

message HealthCheckRequest {
  string service = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
  }
  ServingStatus status = 1;
}

3. Graceful Shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def serve_with_graceful_shutdown():
    """Graceful shutdown destekli server"""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(), server
    )
    
    server.add_insecure_port('[::]:50051')
    server.start()
    
    print("Server started. Press Ctrl+C to stop.")
    
    def handle_sigterm(*_):
        print("Received shutdown signal")
        all_rpcs_done_event = server.stop(30)  # 30 saniye grace period
        all_rpcs_done_event.wait(30)
        print("Server stopped gracefully")
    
    import signal
    signal.signal(signal.SIGTERM, handle_sigterm)
    signal.signal(signal.SIGINT, handle_sigterm)
    
    server.wait_for_termination()

Sonuç

gRPC, modern mikroservis mimarilerinde yüksek performans ve güçlü type-safety gerektiren senaryolar için mükemmel bir seçimdir. Protocol Buffers ile tanımlanan contract’lar sayesinde client ve server arasında güçlü bir tip güvenliği sağlanır, otomatik kod üretimi geliştirme sürecini hızlandırır.

Bu yazıda ele aldığımız konular:

  • gRPC’nin temel kavramları ve REST ile karşılaştırması
  • Protocol Buffers ile service tanımlama
  • Python ile gRPC server ve client implementasyonu
  • Unary, streaming (server/client/bidirectional) RPC tipleri
  • Async gRPC ile yüksek performanslı uygulamalar
  • Interceptor’lar ile middleware benzeri işlemler
  • Error handling ve status code’lar
  • Load balancing ve service discovery
  • Testing stratejileri
  • Production best practices (TLS, health check, graceful shutdown)

gRPC özellikle mikroservisler arası iletişim, IoT, mobil backend ve real-time uygulamalar için ideal bir seçimdir.

Kaynaklar:

Bu gönderi CC BY 4.0 lisansı altındadır.