Veri Tutarlılığı : Outbox Pattern ve Debezium
Modern yazılım mimarilerinde, özellikle microservis mimarisine geçiş yapan projelerde veri tutarlılığı ve mesajlaşma yönetimi kritik bir önem taşır.
Bu yazıda Outbox Pattern’i detaylıca ele alacak, Kafka’ya doğrudan yazan bir servisin dezavantajlarını inceleyecek, aynı servise
Outbox Pattern ekleyerek karşılaştırma yapacak ve Debezium kullanımıyla nasıl bir alternatif senaryo oluşturulabileceğini göstereceğiz.
1. Kafka’ya Doğrudan Yazma Veri Akışı
Kafka’ya doğrudan yazan bir servis tasarladığımızı varsayalım. Bu senaryoda servis, gelen her istekte veritabanına yazma işlemini tamamladıktan sonra Kafka’ya mesaj göndermekle yükümlüdür.
Controller:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@RestController public class OrderController { private final OrderService orderService; @PostMapping("/orders") public ResponseEntity<String> createOrder(@RequestBody OrderRequest orderRequest) { orderService.createOrder(orderRequest); return ResponseEntity.ok("Order created and sent to Kafka successfully"); } } |
Service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@Service public class OrderService { private final OrderRepository orderRepository; private final KafkaTemplate<String, String> kafkaTemplate; public void createOrder(OrderRequest orderRequest) { Order order = new Order(orderRequest); orderRepository.save(order); // Kafka'ya doğrudan mesaj gönderimi kafkaTemplate.send("order-topic", order.toString()); } } |
Dezavantajları:
- Çift Yazma Problemi: Veritabanı işlemi başarılı olur ancak Kafka’ya mesaj gönderimi başarısız olursa, veri tutarsızlığı oluşur.
- Hata Yönetimi Zorluğu: Tek tek işlem yapıldığından geri alma (rollback) işlemleri karmaşıklaşır.
- Düşük Güvenilirlik: Messsage broker’ın geçici hataları, işlemlerin tamamlanamamasına neden olabilir.
2. Outbox Pattern Uygulanması
Outbox Pattern kullanarak, Kafka’ya mesaj gönderimini doğrudan değil, bir “outbox” tablosu üzerinden gerçekleştiriyoruz.
Bu yaklaşımda mesajlar önce veritabanına yazılır, ardından bir mesaj işleyicisi tarafından Kafka’ya gönderilir.
Outbox Tablosu
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE TABLE order_outbox ( id SERIAL PRIMARY KEY, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(255) NOT NULL, payload JSONB NOT NULL, status VARCHAR(50) DEFAULT 'PENDING', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); |
Not: Controller class’ında değişen bir kod parçası yok o yüzden eklemedim.
Service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@Service public class OrderService { private final OrderRepository orderRepository; private final OutboxRepository outboxRepository; @Transactional public void createOrder(OrderRequest orderRequest) { Order order = new Order(orderRequest); orderRepository.save(order); Outbox outbox = new Outbox(order.getId(), "Order", orderRequest); outboxRepository.save(outbox); } } |
Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
@Component public class OrderOutboxProducer { private final OrderOutboxRepository orderOutboxRepository; private final KafkaTemplate<String, String> kafkaTemplate; @Scheduled(fixedRate = 5000) public void produce() { List<OrderOutbox> pendingMessages = orderOutboxRepository.findByStatus(OutboxStatusType.PENDING); for (OrderOutbox message : pendingMessages) { kafkaTemplate.send("order-topic", message.getPayload().toString()); message.setStatus(OutboxStatusType.SENT); orderOutboxRepository.save(message); } } } |
3. Debeizum ile Outbox Pattern
Debezium, veritabanı değişikliklerini izleyen bir değişiklik veri yakalama (CDC) aracıdır. Outbox Pattern ile birlikte kullanıldığında, outbox tablosundaki değişiklikler Debezium tarafından otomatik olarak algılanır ve Kafka’ya gönderilir.
Debezium ile Outbox Veri Akışı
- Servis, veritabanına bir kayıt ekler.
- Debezium, outbox tablosundaki yeni eklenen veya güncellenen kayıtları algılar.
- Debezium, bu değişiklikleri Kafka’ya gönderir.
Docker Compose Konfigürasyonu
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
version: '3.8' services: postgres: image: postgres:15 container_name: postgres environment: POSTGRES_USER: user POSTGRES_PASSWORD: password POSTGRES_DB: orders_db ports: - "5432:5432" debezium: image: debezium/connect:latest container_name: debezium environment: BOOTSTRAP_SERVERS: "kafka:9092" GROUP_ID: "debezium" CONFIG_STORAGE_TOPIC: "connect-configs" OFFSET_STORAGE_TOPIC: "connect-offsets" STATUS_STORAGE_TOPIC: "connect-status" CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECTOR_CLASS: "io.debezium.connector.postgresql.PostgresConnector" depends_on: - postgres - kafka ports: - "8083:8083" kafka: image: bitnami/kafka:3.5 container_name: kafka environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ALLOW_PLAINTEXT_LISTENER: "yes" KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 ports: - "9092:9092" depends_on: - zookeeper zookeeper: image: bitnami/zookeeper:3.8 container_name: zookeeper ports: - "2181:2181" |
3.1 Docker Compose ile Servisleri Çalıştırın
1 2 3 4 5 |
docker-compose up -d |
Bu işlemden sonra:
- PostgreSQL veritabanı çalışır durumda olacak.
- Kafka ve Zookeeper iletişim için hazır olacak.
- Debezium Connect, Kafka ve PostgreSQL ile entegre edilecek.
3.2 Debezium Connector Yapılandırması
Debezium’u çalıştırmak için bir connector konfigürasyonu oluşturmanız gerekiyor. Aşağıdaki adımları izleyin:
Debezium için bir connector.json
dosyası oluşturun. Örneğin:
Debezium, PostgreSQL veritabanı üzerinde bir “outbox” tablosunu izlemek için aşağıdaki yapılandırmayı kullanır:
Debezium Connector Ayarları
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
{ "name": "outbox-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "user", "database.password": "password", "database.dbname": "orders_db", "table.include.list": "public.outbox", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } |
Bu yapılandırmada:
table.include.list
: Debezium’un yalnızcapublic.outbox
tablosunu izlemesini sağlıyor.transforms.unwrap
: Kafka’ya yalnızcanew state
mesajlarını göndermek için kullanılan bir dönüşüm sağlar.
3.3 Debezium Connector’u Kafka Connect’e Ekleme
1 2 3 4 5 6 7 |
curl -X POST -H "Content-Type: application/json" \ --data @connector.json \ http://localhost:8083/connectors |
Bu komutla Debezium’a yeni bir outbox-connector
eklemiş olursunuz. localhost:8083
Debezium Connect’in varsayılan portudur.
Sonuç
Debezium, Outbox Pattern’i uygulamak için güçlü bir alternatif sunar. Servis katmanında ek bir mesaj işleyiciye ihtiyaç duymadan, veritabanı değişikliklerini doğrudan Kafka’ya iletir. Bu yaklaşım, özellikle büyük ölçekli sistemlerde ek yükleri azaltabilir.
Kaynak;
- https://debezium.io/documentation/reference/stable/operations/monitoring.html
- https://www.baeldung.com/cs/outbox-pattern-microservices
Faydalı olması dileğiyle.