Home > Blog > tech

Event-Driven Architecture คืออะไร? สอนออกแบบระบบ Event Sourcing, CQRS สำหรับ Backend 2026

event driven architecture guide
Event-Driven Architecture Guide 2026
2026-04-08 | tech | 3500 words

ในโลกของ Backend Development ปี 2026 ระบบที่ต้องรองรับผู้ใช้หลายล้านคนพร้อมกันไม่สามารถพึ่งพาสถาปัตยกรรมแบบ Request-Response แบบดั้งเดิมได้อีกต่อไป Event-Driven Architecture (EDA) กลายเป็นรูปแบบสถาปัตยกรรมที่สำคัญที่สุดสำหรับระบบ Distributed Systems ขนาดใหญ่ ตั้งแต่ระบบ E-commerce ไปจนถึงระบบ Financial Trading

บทความนี้จะพาคุณเข้าใจ Event-Driven Architecture อย่างลึกซึ้ง ตั้งแต่แนวคิดพื้นฐานไปจนถึง Pattern ขั้นสูงอย่าง Event Sourcing, CQRS, Saga Pattern และ Outbox Pattern พร้อมตัวอย่างการใช้งานจริงกับ Apache Kafka และ RabbitMQ

Event-Driven Architecture (EDA) คืออะไร?

Event-Driven Architecture คือรูปแบบสถาปัตยกรรมซอฟต์แวร์ที่ใช้ "เหตุการณ์" (Events) เป็นตัวขับเคลื่อนการทำงานของระบบ แทนที่จะเรียกใช้งานบริการอื่นโดยตรงแบบ Synchronous เหมือนระบบ Request-Response ทั่วไป ระบบจะ "ปล่อย" (Emit/Publish) เหตุการณ์ออกมา แล้วบริการอื่นที่สนใจจะ "รับฟัง" (Subscribe/Consume) เหตุการณ์เหล่านั้นไปทำงานต่อ

ลองนึกภาพง่ายๆ: ในร้านอาหาร ลูกค้าสั่งอาหาร (Event) → พนักงานเสิร์ฟเขียน Order → ส่งไปห้องครัว → เชฟรับ Order ไปทำ → อาหารเสร็จ (Event) → พนักงานเสิร์ฟรับไปส่ง ทุกขั้นตอนขับเคลื่อนด้วย "เหตุการณ์" ไม่ใช่การสั่งงานโดยตรง

ความแตกต่างระหว่าง Events, Commands และ Queries

การเข้าใจความแตกต่างระหว่างสามแนวคิดนี้เป็นพื้นฐานสำคัญ:

ประเภทความหมายตัวอย่างลักษณะ
Eventสิ่งที่เกิดขึ้นแล้ว (Past tense)OrderPlaced, UserRegisteredImmutable, ย้อนไม่ได้
Commandคำสั่งให้ทำบางอย่าง (Imperative)PlaceOrder, RegisterUserอาจสำเร็จหรือล้มเหลว
Queryคำถามเพื่อดึงข้อมูลGetOrderById, ListUsersไม่เปลี่ยนแปลง State

Event จะใช้ชื่อเป็น Past Tense เสมอ เช่น OrderPlaced ไม่ใช่ PlaceOrder (นั่นคือ Command) เพราะ Event คือสิ่งที่ "เกิดขึ้นแล้ว" เป็นข้อเท็จจริง (fact) ที่ไม่สามารถเปลี่ยนแปลงได้

ประเภทของ Events

Domain Events

เหตุการณ์ที่เกิดขึ้นภายใน Bounded Context เดียวกัน เป็นเรื่องของ Business Logic ภายใน เช่น PaymentReceived, InventoryReserved ใช้ภายใน Service เดียวกันเพื่อแยก Logic ให้ชัดเจน

# ตัวอย่าง Domain Event
class OrderPlaced:
    order_id: str
    customer_id: str
    items: list
    total_amount: float
    placed_at: datetime

    # Domain Event อยู่ภายใน Order Service
    # ใช้ trigger การคำนวณ loyalty points,
    # อัปเดต order history ภายใน service เดียวกัน

Integration Events

เหตุการณ์ที่ส่งข้าม Bounded Context หรือข้ามระหว่าง Microservices ใช้สำหรับการสื่อสารระหว่างบริการที่แตกต่างกัน เช่น Order Service ส่ง OrderPlaced event ไปยัง Payment Service และ Inventory Service

# ตัวอย่าง Integration Event
class OrderPlacedIntegrationEvent:
    event_id: str           # Unique ID สำหรับ idempotency
    event_type: str         # "order.placed"
    timestamp: datetime
    version: int            # Schema version
    payload:
        order_id: str
        customer_id: str
        total_amount: float
    metadata:
        correlation_id: str
        source_service: str
Best Practice: Integration Events ควรมีข้อมูลน้อยที่สุดที่จำเป็น อย่าส่งข้อมูลทั้ง Entity ไป เพราะจะสร้าง Coupling ระหว่าง Services

Event-Driven Patterns หลักที่ต้องรู้

1. Event Notification

Pattern ที่ง่ายที่สุด Service A ส่ง Event แจ้งว่ามีอะไรเกิดขึ้น โดย Event มีข้อมูลน้อยมาก (แค่ ID) Service ที่รับต้องกลับไปถาม Service A ถ้าต้องการข้อมูลเพิ่มเติม

# Event Notification - ข้อมูลน้อย
{
    "event_type": "order.placed",
    "order_id": "ORD-12345",
    "timestamp": "2026-04-08T10:00:00Z"
}
# Consumer ต้องเรียก GET /orders/ORD-12345 เพื่อดึงรายละเอียด

ข้อดีคือ Coupling ต่ำมาก แต่ข้อเสียคือต้องเรียก API กลับไปหา Producer ซึ่งสร้าง Load เพิ่มเติม

2. Event-Carried State Transfer

Event พกข้อมูลทั้งหมดที่ Consumer ต้องการมาด้วย ไม่ต้องเรียก API กลับไป ทำให้ Consumer สามารถเก็บ Local Copy ของข้อมูลไว้ได้

# Event-Carried State Transfer - ข้อมูลครบ
{
    "event_type": "order.placed",
    "order_id": "ORD-12345",
    "customer_id": "CUST-789",
    "customer_name": "สมชาย ใจดี",
    "customer_email": "somchai@example.com",
    "items": [
        {"product_id": "P001", "name": "Laptop", "qty": 1, "price": 35000},
        {"product_id": "P002", "name": "Mouse", "qty": 2, "price": 500}
    ],
    "total_amount": 36000,
    "shipping_address": "123 กรุงเทพฯ",
    "timestamp": "2026-04-08T10:00:00Z"
}

ข้อดีคือ Consumer ไม่ต้องเรียก API กลับ ลด latency และ coupling แต่ข้อเสียคือ Event มีขนาดใหญ่ และอาจมี Stale Data ได้

3. Event Sourcing

แทนที่จะเก็บสถานะปัจจุบัน (Current State) ในฐานข้อมูล Event Sourcing เก็บ "ทุกเหตุการณ์" ที่เกิดขึ้นตามลำดับ แล้ว Replay events เหล่านั้นเพื่อสร้าง Current State ขึ้นมาใหม่ได้ทุกเมื่อ

4. CQRS (Command Query Responsibility Segregation)

แยก Model สำหรับการเขียน (Command) และการอ่าน (Query) ออกจากกัน ทำให้สามารถ Optimize แต่ละด้านได้อย่างอิสระ

Event Sourcing Deep Dive

Event Sourcing เป็น Pattern ที่ทรงพลังที่สุดใน EDA แนวคิดคือ "อย่าเก็บแค่ผลลัพธ์ จงเก็บทุกอย่างที่เกิดขึ้น" เหมือนบัญชีธนาคารที่ไม่ได้เก็บแค่ยอดเงินคงเหลือ แต่เก็บทุก Transaction ที่เคยเกิดขึ้น

Event Store

Event Store คือฐานข้อมูลสำหรับเก็บ Events ทั้งหมด โดยมีคุณสมบัติสำคัญคือ Append-Only (เพิ่มได้อย่างเดียว ลบไม่ได้ แก้ไขไม่ได้) และเรียงตามลำดับเวลา

# โครงสร้าง Event Store Table
CREATE TABLE event_store (
    event_id        UUID PRIMARY KEY,
    aggregate_id    UUID NOT NULL,          -- เช่น order_id
    aggregate_type  VARCHAR(100) NOT NULL,  -- เช่น 'Order'
    event_type      VARCHAR(100) NOT NULL,  -- เช่น 'OrderPlaced'
    event_data      JSONB NOT NULL,         -- ข้อมูล Event
    metadata        JSONB,                  -- correlation_id, user_id
    version         INT NOT NULL,           -- ลำดับของ Event ใน Aggregate
    created_at      TIMESTAMP DEFAULT NOW(),

    UNIQUE(aggregate_id, version)           -- ป้องกัน Concurrent writes
);

-- Index สำหรับ query ตาม aggregate
CREATE INDEX idx_event_store_aggregate
    ON event_store(aggregate_id, version);
# ตัวอย่าง Events ของ Order Aggregate
# Version 1: สร้าง Order
{
    "event_type": "OrderCreated",
    "aggregate_id": "ORD-001",
    "version": 1,
    "event_data": {
        "customer_id": "CUST-789",
        "items": [{"product": "Laptop", "qty": 1, "price": 35000}]
    }
}

# Version 2: เพิ่มสินค้า
{
    "event_type": "ItemAdded",
    "aggregate_id": "ORD-001",
    "version": 2,
    "event_data": {
        "product": "Mouse",
        "qty": 2,
        "price": 500
    }
}

# Version 3: ยืนยัน Order
{
    "event_type": "OrderConfirmed",
    "aggregate_id": "ORD-001",
    "version": 3,
    "event_data": {
        "confirmed_by": "CUST-789",
        "total_amount": 36000
    }
}

# Version 4: ชำระเงิน
{
    "event_type": "PaymentReceived",
    "aggregate_id": "ORD-001",
    "version": 4,
    "event_data": {
        "payment_method": "credit_card",
        "amount": 36000,
        "transaction_id": "TXN-456"
    }
}

Projections (Read Models)

เนื่องจาก Event Store เก็บ Events แต่ไม่สะดวกสำหรับการ Query ข้อมูลจึงต้องมี Projections คือกระบวนการอ่าน Events แล้วสร้าง Read Model (View) ขึ้นมาในรูปแบบที่เหมาะสำหรับการ Query

# Python - Event Projection สำหรับ Order
class OrderProjection:
    def __init__(self):
        self.orders = {}  # In-memory store (ใช้ DB จริงใน Production)

    def handle(self, event):
        handler = getattr(self, f"on_{event['event_type']}", None)
        if handler:
            handler(event)

    def on_OrderCreated(self, event):
        data = event['event_data']
        self.orders[event['aggregate_id']] = {
            'order_id': event['aggregate_id'],
            'customer_id': data['customer_id'],
            'items': data['items'],
            'status': 'created',
            'total': sum(i['price'] * i['qty'] for i in data['items']),
            'created_at': event['created_at']
        }

    def on_ItemAdded(self, event):
        order = self.orders[event['aggregate_id']]
        data = event['event_data']
        order['items'].append(data)
        order['total'] += data['price'] * data['qty']

    def on_OrderConfirmed(self, event):
        self.orders[event['aggregate_id']]['status'] = 'confirmed'

    def on_PaymentReceived(self, event):
        order = self.orders[event['aggregate_id']]
        order['status'] = 'paid'
        order['payment_txn'] = event['event_data']['transaction_id']

    def on_OrderShipped(self, event):
        order = self.orders[event['aggregate_id']]
        order['status'] = 'shipped'
        order['tracking_number'] = event['event_data']['tracking_number']

Snapshots

เมื่อ Aggregate มี Events จำนวนมาก (เช่น บัญชีธนาคารที่มีหลายพัน Transactions) การ Replay ทุก Event จะช้ามาก Snapshots คือการเก็บสถานะ ณ จุดใดจุดหนึ่ง เพื่อไม่ต้อง Replay ตั้งแต่ต้น

# Snapshot Strategy
class SnapshotRepository:
    SNAPSHOT_INTERVAL = 100  # สร้าง Snapshot ทุก 100 events

    def save_snapshot(self, aggregate_id, state, version):
        """เก็บ Snapshot ลง DB"""
        db.execute("""
            INSERT INTO snapshots (aggregate_id, state, version, created_at)
            VALUES (%s, %s, %s, NOW())
            ON CONFLICT (aggregate_id) DO UPDATE
            SET state = %s, version = %s, created_at = NOW()
        """, (aggregate_id, json.dumps(state), version,
              json.dumps(state), version))

    def load_aggregate(self, aggregate_id):
        """โหลด Aggregate จาก Snapshot + Events ที่เหลือ"""
        # 1. หา Snapshot ล่าสุด
        snapshot = db.query(
            "SELECT state, version FROM snapshots WHERE aggregate_id = %s",
            (aggregate_id,)
        )

        if snapshot:
            state = json.loads(snapshot['state'])
            from_version = snapshot['version'] + 1
        else:
            state = {}
            from_version = 1

        # 2. โหลด Events หลังจาก Snapshot
        events = db.query("""
            SELECT * FROM event_store
            WHERE aggregate_id = %s AND version >= %s
            ORDER BY version
        """, (aggregate_id, from_version))

        # 3. Replay events ที่เหลือ
        for event in events:
            state = apply_event(state, event)

        return state
เมื่อไหร่ควรใช้ Snapshots: เมื่อ Aggregate มีมากกว่า 50-100 Events ถ้าน้อยกว่านี้ Replay ทั้งหมดก็เร็วพอ อย่า Optimize ก่อนเวลาอันควร

CQRS Pattern (Command Query Responsibility Segregation)

CQRS คือการแยก Model สำหรับเขียนและอ่านออกจากกัน ในระบบทั่วไปเราใช้ Model เดียวกันทั้งอ่านและเขียน แต่ใน CQRS เราแยกเป็น Command Model (Write) และ Query Model (Read) โดยแต่ละด้านสามารถ Optimize ได้อย่างอิสระ

ทำไมต้อง CQRS?

# CQRS Architecture Example

# ===== COMMAND SIDE (Write) =====
class OrderCommandHandler:
    def __init__(self, event_store, event_bus):
        self.event_store = event_store
        self.event_bus = event_bus

    def handle_place_order(self, command):
        """รับ Command → Validate → สร้าง Event → บันทึก"""
        # 1. Validate business rules
        if not command.items:
            raise ValueError("Order must have at least one item")
        if command.total_amount <= 0:
            raise ValueError("Invalid total amount")

        # 2. Check inventory (call Inventory Service)
        for item in command.items:
            if not inventory_client.check_stock(item.product_id, item.qty):
                raise InsufficientStockError(item.product_id)

        # 3. Create Domain Event
        event = OrderPlacedEvent(
            order_id=generate_id(),
            customer_id=command.customer_id,
            items=command.items,
            total_amount=command.total_amount,
            placed_at=datetime.utcnow()
        )

        # 4. Save to Event Store
        self.event_store.append(event)

        # 5. Publish to Event Bus
        self.event_bus.publish(event)

        return event.order_id

# ===== QUERY SIDE (Read) =====
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db  # Optimized read database

    def get_order(self, order_id):
        """ดึง Order จาก Read Model (denormalized, fast)"""
        return self.read_db.query("""
            SELECT o.*, c.name as customer_name, c.email
            FROM order_views o
            JOIN customer_views c ON o.customer_id = c.id
            WHERE o.order_id = %s
        """, (order_id,))

    def list_orders_by_customer(self, customer_id, page=1, size=20):
        """ดึงรายการ Orders - Read Model ถูก optimize สำหรับ query นี้"""
        return self.read_db.query("""
            SELECT * FROM order_views
            WHERE customer_id = %s
            ORDER BY created_at DESC
            LIMIT %s OFFSET %s
        """, (customer_id, size, (page-1)*size))

# ===== EVENT HANDLER (Sync Read Model) =====
class OrderProjectionHandler:
    """รับ Events จาก Event Bus แล้วอัปเดต Read Model"""

    def on_order_placed(self, event):
        self.read_db.execute("""
            INSERT INTO order_views
            (order_id, customer_id, items, total_amount, status, created_at)
            VALUES (%s, %s, %s, %s, 'placed', %s)
        """, (event.order_id, event.customer_id,
              json.dumps(event.items), event.total_amount, event.placed_at))

    def on_order_shipped(self, event):
        self.read_db.execute("""
            UPDATE order_views SET status='shipped',
            tracking_number=%s, shipped_at=%s
            WHERE order_id = %s
        """, (event.tracking_number, event.shipped_at, event.order_id))

การ Implement EDA ด้วย Kafka และ RabbitMQ

Apache Kafka

Kafka เหมาะสำหรับ High-throughput, Event Streaming ที่ต้องการเก็บ Events ระยะยาว รองรับหลายล้าน Events ต่อวินาที

# Python - Kafka Producer
from confluent_kafka import Producer
import json

producer = Producer({
    'bootstrap.servers': 'kafka-broker:9092',
    'client.id': 'order-service',
    'acks': 'all',                    # รอ Broker ทุกตัว ACK
    'enable.idempotence': True,       # ป้องกัน Duplicate
    'max.in.flight.requests.per.connection': 5,
    'retries': 10
})

def publish_order_event(order_event):
    """Publish Event ไป Kafka Topic"""
    event_data = {
        'event_id': str(uuid4()),
        'event_type': 'order.placed',
        'timestamp': datetime.utcnow().isoformat(),
        'version': 1,
        'payload': {
            'order_id': order_event.order_id,
            'customer_id': order_event.customer_id,
            'total_amount': order_event.total_amount
        }
    }

    producer.produce(
        topic='orders.events',
        key=order_event.order_id,       # ใช้ order_id เป็น key
        value=json.dumps(event_data),   # เพื่อให้ Events ของ Order เดียวกัน
        callback=delivery_callback      # ไปอยู่ Partition เดียวกัน (ordering)
    )
    producer.flush()

# Kafka Consumer
from confluent_kafka import Consumer

consumer = Consumer({
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'payment-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False        # Manual commit เพื่อ at-least-once
})

consumer.subscribe(['orders.events'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        handle_error(msg.error())
        continue

    event = json.loads(msg.value())
    try:
        process_event(event)           # ประมวลผล Event
        consumer.commit(msg)           # Commit offset หลังประมวลผลสำเร็จ
    except Exception as e:
        handle_processing_error(e, event)

RabbitMQ

RabbitMQ เหมาะสำหรับ Task Queue และ Routing ที่ซับซ้อน รองรับหลาย Exchange types (Direct, Fanout, Topic, Headers)

# Python - RabbitMQ Publisher
import pika, json

connection = pika.BlockingConnection(
    pika.ConnectionParameters('rabbitmq-host', 5672,
        credentials=pika.PlainCredentials('user', 'pass'))
)
channel = connection.channel()

# ประกาศ Exchange แบบ Topic
channel.exchange_declare(
    exchange='order_events',
    exchange_type='topic',
    durable=True
)

def publish_event(event_type, payload):
    """Publish event ผ่าน Topic Exchange"""
    message = {
        'event_id': str(uuid4()),
        'event_type': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }

    channel.basic_publish(
        exchange='order_events',
        routing_key=f'order.{event_type}',    # เช่น order.placed
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,       # Persistent message
            content_type='application/json',
            message_id=message['event_id']
        )
    )

# RabbitMQ Consumer
channel.queue_declare(queue='payment_processor', durable=True)
channel.queue_bind(
    queue='payment_processor',
    exchange='order_events',
    routing_key='order.placed'        # รับเฉพาะ order.placed events
)

def callback(ch, method, properties, body):
    event = json.loads(body)
    try:
        process_payment(event)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_qos(prefetch_count=10)
channel.basic_consume(queue='payment_processor', on_message_callback=callback)

Kafka vs RabbitMQ เลือกอันไหน?

เกณฑ์Apache KafkaRabbitMQ
รูปแบบDistributed Log (Pull)Message Queue (Push)
Throughputสูงมาก (ล้าน msg/sec)ปานกลาง (หมื่น msg/sec)
Message Retentionเก็บถาวรได้ลบหลัง Consume
Orderingภายใน Partitionภายใน Queue
Replayได้ (offset reset)ไม่ได้
RoutingTopic + PartitionExchange + Routing Key (ยืดหยุ่นกว่า)
เหมาะกับEvent Streaming, Log AggregationTask Queue, Complex Routing
กฎง่ายๆ: ถ้าต้อง Replay Events หรือรองรับ Throughput สูงมาก เลือก Kafka ถ้าต้องการ Routing ที่ซับซ้อนและ Priority Queue เลือก RabbitMQ ในปี 2026 หลายทีมใช้ทั้งสองร่วมกัน

Saga Pattern สำหรับ Distributed Transactions

ในระบบ Microservices ไม่สามารถใช้ Database Transaction แบบเดิมข้ามหลาย Services ได้ Saga Pattern เป็นวิธีจัดการ Distributed Transaction โดยแบ่งเป็นหลายขั้นตอน (Steps) และมี Compensating Transaction สำหรับ Rollback แต่ละขั้นตอน

Choreography Saga

แต่ละ Service ตัดสินใจเองว่าจะทำอะไรต่อ โดย Listen Events จาก Services อื่น ไม่มีตัวกลางควบคุม

# Choreography Saga Flow - สั่งซื้อสินค้า
#
# Order Service  →  OrderPlaced event
#                         ↓
# Payment Service → PaymentProcessed event  (หรือ PaymentFailed)
#                         ↓
# Inventory Service → InventoryReserved event (หรือ InventoryFailed)
#                         ↓
# Shipping Service → ShipmentCreated event
#
# ถ้า Payment ล้มเหลว:
# Payment Service → PaymentFailed event
#                         ↓
# Order Service → ยกเลิก Order (Compensating Transaction)
#
# ถ้า Inventory ล้มเหลว:
# Inventory Service → InventoryFailed event
#                         ↓
# Payment Service → Refund (Compensating Transaction)
#                         ↓
# Order Service → ยกเลิก Order (Compensating Transaction)

Orchestration Saga

มี Saga Orchestrator เป็นตัวกลางควบคุมทุกขั้นตอน รู้ว่าต้องทำอะไรต่อ และ Rollback อย่างไรเมื่อเกิดความผิดพลาด

# Orchestration Saga - Order Saga Orchestrator
class OrderSagaOrchestrator:
    def __init__(self, payment_svc, inventory_svc, shipping_svc):
        self.payment = payment_svc
        self.inventory = inventory_svc
        self.shipping = shipping_svc

    def execute(self, order):
        """ควบคุมทุกขั้นตอนของ Saga"""
        saga_log = SagaLog(order.id)

        try:
            # Step 1: Reserve Inventory
            saga_log.start_step("reserve_inventory")
            inventory_result = self.inventory.reserve(order.items)
            saga_log.complete_step("reserve_inventory", inventory_result)

            # Step 2: Process Payment
            saga_log.start_step("process_payment")
            payment_result = self.payment.charge(
                order.customer_id, order.total_amount
            )
            saga_log.complete_step("process_payment", payment_result)

            # Step 3: Create Shipment
            saga_log.start_step("create_shipment")
            shipment = self.shipping.create(order.id, order.shipping_address)
            saga_log.complete_step("create_shipment", shipment)

            saga_log.mark_completed()
            return SagaResult.SUCCESS

        except PaymentFailedError:
            # Compensate: Release inventory
            self.inventory.release(order.items)
            saga_log.mark_compensated()
            return SagaResult.PAYMENT_FAILED

        except InventoryFailedError:
            saga_log.mark_compensated()
            return SagaResult.OUT_OF_STOCK

        except ShippingFailedError:
            # Compensate: Refund payment + Release inventory
            self.payment.refund(payment_result.transaction_id)
            self.inventory.release(order.items)
            saga_log.mark_compensated()
            return SagaResult.SHIPPING_FAILED

Choreography vs Orchestration

เกณฑ์ChoreographyOrchestration
Couplingต่ำ (Decentralized)สูงกว่า (Centralized)
Complexityซับซ้อนเมื่อ Services เยอะจัดการง่ายกว่า
Visibilityยากที่จะเห็นภาพรวม Flowเห็น Flow ชัดเจนใน Orchestrator
Single Point of Failureไม่มีOrchestrator เป็น SPOF
Testingยากกว่า (ต้อง Test Integration)ง่ายกว่า (Test Orchestrator)
เหมาะกับ2-4 Services, Flow ง่าย4+ Services, Flow ซับซ้อน

Outbox Pattern

ปัญหาสำคัญของ EDA คือ Dual Write Problem: เมื่อ Service ต้องทั้งบันทึกลง Database และ Publish Event ไป Message Broker พร้อมกัน ถ้าอย่างใดอย่างหนึ่งล้มเหลว ข้อมูลจะไม่สอดคล้องกัน

# ปัญหา Dual Write
def place_order(order):
    db.save(order)              # 1. บันทึกลง DB - สำเร็จ
    kafka.publish(order_event)  # 2. Publish Event - ล้มเหลว!
    # ผลลัพธ์: Order อยู่ใน DB แต่ไม่มี Event ส่งออกไป
    # Services อื่นไม่รู้ว่ามี Order ใหม่!

Outbox Pattern แก้ปัญหานี้โดยเขียน Event ลง Outbox Table ใน Database เดียวกันกับ Business Data ภายใน Transaction เดียวกัน จากนั้นมี Background Process (Relay/Poller) อ่าน Outbox แล้ว Publish ไป Message Broker

# Outbox Pattern Implementation
def place_order(order):
    """ใช้ Transaction เดียวกันสำหรับทั้ง Business Data และ Event"""
    with db.transaction():
        # 1. บันทึก Order
        db.execute("INSERT INTO orders (...) VALUES (...)", order)

        # 2. บันทึก Event ลง Outbox (Transaction เดียวกัน)
        db.execute("""
            INSERT INTO outbox (id, aggregate_type, aggregate_id,
                               event_type, payload, created_at)
            VALUES (%s, 'Order', %s, 'OrderPlaced', %s, NOW())
        """, (uuid4(), order.id, json.dumps(order.to_dict())))
    # ทั้งสองจะ Commit หรือ Rollback พร้อมกัน

# Background Relay Process (แยก Thread/Process)
class OutboxRelay:
    """อ่าน Outbox แล้ว Publish ไป Kafka"""
    def run(self):
        while True:
            events = db.query("""
                SELECT * FROM outbox
                WHERE published = FALSE
                ORDER BY created_at
                LIMIT 100
                FOR UPDATE SKIP LOCKED
            """)

            for event in events:
                try:
                    kafka.publish(event['event_type'], event['payload'])
                    db.execute(
                        "UPDATE outbox SET published = TRUE WHERE id = %s",
                        (event['id'],)
                    )
                except Exception as e:
                    log.error(f"Failed to publish {event['id']}: {e}")

            time.sleep(1)  # Poll interval
Alternative: Change Data Capture (CDC) ใช้ Debezium อ่าน Database WAL/Binlog โดยตรง แล้ว Publish Events ไป Kafka โดยไม่ต้องเขียน Outbox Relay เอง เป็นวิธีที่นิยมมากในปี 2026

Idempotency — ประมวลผลซ้ำได้อย่างปลอดภัย

ในระบบ Event-Driven ที่ใช้ at-least-once delivery อาจได้รับ Event เดียวกันมากกว่าหนึ่งครั้ง ทุก Consumer ต้องเป็น Idempotent คือประมวลผลซ้ำกี่ครั้งก็ได้ผลลัพธ์เหมือนเดิม

# Idempotency Implementation
class IdempotentConsumer:
    def __init__(self, db):
        self.db = db

    def process(self, event):
        event_id = event['event_id']

        # 1. ตรวจสอบว่าเคยประมวลผลแล้วหรือยัง
        existing = self.db.query(
            "SELECT id FROM processed_events WHERE event_id = %s",
            (event_id,)
        )

        if existing:
            log.info(f"Event {event_id} already processed, skipping")
            return  # ข้ามไป ไม่ทำซ้ำ

        # 2. ประมวลผล + บันทึกว่าทำแล้ว ใน Transaction เดียวกัน
        with self.db.transaction():
            self.handle_event(event)
            self.db.execute(
                "INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
                (event_id,)
            )

    def handle_event(self, event):
        """Business logic ที่ต้อง implement"""
        if event['event_type'] == 'order.placed':
            self.process_payment(event['payload'])
        elif event['event_type'] == 'payment.completed':
            self.reserve_inventory(event['payload'])

Eventual Consistency

ระบบ Event-Driven ไม่มี Strong Consistency แบบ Database Transaction เดี่ยว แต่ใช้ Eventual Consistency คือข้อมูลจะ "สอดคล้องกันในที่สุด" แม้ว่าในช่วงเวลาสั้นๆ อาจมีความไม่สอดคล้อง

ตัวอย่าง: เมื่อลูกค้าสั่งซื้อสินค้า Order Service บันทึก Order แล้ว แต่ Inventory Service อาจยังไม่ได้อัปเดตสต็อก ในช่วงเวลาสั้นๆ นี้ ข้อมูลไม่สอดคล้อง แต่เมื่อ Event ถูกประมวลผลเสร็จ ข้อมูลจะสอดคล้องกัน

วิธีจัดการ Eventual Consistency

Event Schema Evolution

เมื่อระบบพัฒนาไป Event Schema จะต้องเปลี่ยนแปลง ต้องจัดการ Versioning ให้ดีเพื่อไม่ให้ Consumer เก่าพัง

# Schema Evolution Strategies

# 1. Backward Compatible Changes (ปลอดภัย)
# - เพิ่ม Field ใหม่ (Optional)
# - เพิ่ม Default value

# Version 1
{"event_type": "order.placed", "order_id": "ORD-001", "amount": 1000}

# Version 2 (เพิ่ม currency field - backward compatible)
{"event_type": "order.placed", "order_id": "ORD-001", "amount": 1000, "currency": "THB"}
# Consumer เก่าที่ไม่รู้จัก "currency" จะข้ามไป ไม่พัง

# 2. Breaking Changes (ต้องจัดการ)
# - ลบ Field
# - เปลี่ยน Type
# - เปลี่ยนความหมาย

# ใช้ Event Type Versioning
{"event_type": "order.placed.v2", "version": 2, ...}

# หรือใช้ Schema Registry (Avro/Protobuf)
# Confluent Schema Registry จัดการ compatibility check อัตโนมัติ

Event-Driven Microservices Architecture

ในระบบ Microservices ขนาดใหญ่ EDA เป็นหัวใจสำคัญ แต่ละ Service สื่อสารกันผ่าน Events ทำให้ Loosely Coupled และ Scale ได้อย่างอิสระ

# ตัวอย่าง E-commerce Event-Driven Architecture
#
# [API Gateway]
#       |
#  [Order Service] --publish--> [Kafka: orders.events]
#       |                              |
#       |                    +---------+---------+----------+
#       |                    |         |         |          |
#                   [Payment Svc] [Inventory] [Notification] [Analytics]
#                        |            |           |             |
#                   payment.events  inventory.  notification.  (consume only)
#                        |          events      events
#                        |
#                  [Shipping Svc]

เครื่องมือสำหรับ Event-Driven Architecture

เครื่องมือประเภทจุดเด่น
Apache KafkaEvent StreamingHigh throughput, durability, replay
RabbitMQMessage BrokerFlexible routing, priority queue
EventStoreDBEvent StorePurpose-built สำหรับ Event Sourcing
Axon FrameworkCQRS/ES FrameworkJava/Kotlin, built-in Saga support
DebeziumCDCCapture DB changes เป็น Events
Apache PulsarEvent StreamingMulti-tenancy, geo-replication
AWS EventBridgeServerless Event BusManaged, schema registry
NATSMessage SystemLightweight, low latency

Testing Event-Driven Systems

# Unit Test - Event Handler
def test_order_placed_creates_payment():
    handler = PaymentEventHandler(mock_db, mock_payment_gateway)
    event = {
        'event_id': 'evt-001',
        'event_type': 'order.placed',
        'payload': {'order_id': 'ORD-001', 'amount': 1000}
    }

    handler.process(event)

    # Verify payment was created
    mock_payment_gateway.charge.assert_called_once_with(
        order_id='ORD-001', amount=1000
    )

# Integration Test - Saga
def test_order_saga_compensates_on_payment_failure():
    inventory_svc = MockInventoryService(should_succeed=True)
    payment_svc = MockPaymentService(should_succeed=False)

    saga = OrderSagaOrchestrator(payment_svc, inventory_svc, None)
    result = saga.execute(test_order)

    assert result == SagaResult.PAYMENT_FAILED
    # Verify inventory was released (compensation)
    assert inventory_svc.release_called
    assert inventory_svc.released_items == test_order.items

# Contract Test - Event Schema
def test_order_placed_event_schema():
    """ตรวจสอบว่า Event ตรง Schema ที่ตกลงกับ Consumer"""
    event = create_order_placed_event(test_order)

    schema = {
        'type': 'object',
        'required': ['event_id', 'event_type', 'timestamp', 'payload'],
        'properties': {
            'payload': {
                'type': 'object',
                'required': ['order_id', 'customer_id', 'total_amount']
            }
        }
    }

    jsonschema.validate(event, schema)  # ถ้า Schema ไม่ตรงจะ Fail

Debugging และ Observability

ระบบ Event-Driven ยากต่อการ Debug เพราะ Flow กระจายไปหลาย Services สิ่งที่จำเป็น:

# Correlation ID propagation
import uuid

class EventContext:
    def __init__(self, correlation_id=None):
        self.correlation_id = correlation_id or str(uuid.uuid4())
        self.causation_id = None    # ID ของ Event ที่ trigger Event นี้

    def create_child_event(self, event_type, payload):
        """สร้าง Event ลูกที่มี Correlation ID เดียวกัน"""
        return {
            'event_id': str(uuid.uuid4()),
            'event_type': event_type,
            'correlation_id': self.correlation_id,  # เหมือนกันตลอด Flow
            'causation_id': self.event_id,           # ชี้ไปที่ Event แม่
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }

เมื่อไหร่ควรใช้ EDA vs Request-Response?

ใช้ EDA เมื่อใช้ Request-Response เมื่อ
ต้องการ Loose Coupling ระหว่าง Servicesต้องการ Response ทันที
Throughput สูง ต้องรับ Load มากๆFlow ง่ายๆ 2-3 Services
ต้องการ Audit Trail / Event Historyต้องการ Strong Consistency
หลาย Services ต้องรับรู้เหตุการณ์เดียวกันทีมเล็ก ระบบไม่ซับซ้อน
ต้อง Scale แต่ละ Service แยกกันต้องการ Simple debugging
รองรับ Eventual Consistency ได้ต้องการ Transactional guarantee
คำแนะนำสำหรับผู้เริ่มต้น: อย่าใช้ EDA ทุกที่ เริ่มจาก Request-Response ก่อน แล้วค่อยแยกส่วนที่เหมาะสมออกมาเป็น Event-Driven เมื่อระบบเติบโต Over-engineering ด้วย EDA ตั้งแต่ต้นจะสร้างความซับซ้อนที่ไม่จำเป็น

สรุป

Event-Driven Architecture เป็นสถาปัตยกรรมที่ทรงพลังสำหรับระบบ Distributed Systems ในปี 2026 การเข้าใจ Patterns ต่างๆ อย่าง Event Sourcing, CQRS, Saga Pattern และ Outbox Pattern จะช่วยให้คุณออกแบบระบบที่ Scale ได้ มีความยืดหยุ่น และจัดการ Failure ได้ดี

สิ่งสำคัญคือต้องเลือกใช้ Pattern ที่เหมาะสมกับปัญหา ไม่ใช่ใช้ทุก Pattern ทุกที่ เริ่มจากสิ่งง่ายๆ แล้วค่อยเพิ่มความซับซ้อนเมื่อจำเป็น Event-Driven Architecture ไม่ได้เหมาะกับทุกระบบ แต่เมื่อเหมาะสมแล้ว มันจะเป็นหัวใจสำคัญที่ทำให้ระบบของคุณรองรับการเติบโตได้อย่างยั่งยืน


Back to Blog | iCafe Forex | SiamLanCard | Siam2R