Home > Blog > tech

Apache Kafka เจาะลึก สอน Kafka Streams, Connect, Schema Registry สำหรับ Data Pipeline 2026

apache kafka deep dive guide
Apache Kafka Deep Dive Guide 2026
2026-04-10 | tech | 3500 words

Apache Kafka เป็นแพลตฟอร์ม Event Streaming แบบ Distributed ที่ได้รับความนิยมมากที่สุดในโลก ถูกพัฒนาโดย LinkedIn และเปิดเป็น Open Source ภายใต้ Apache Software Foundation ในปี 2011 ปัจจุบันองค์กรขนาดใหญ่กว่า 80% ของ Fortune 100 ใช้ Kafka เป็นแกนหลักของ Data Pipeline

บทความนี้จะพาคุณเจาะลึกทุกแง่มุมของ Kafka ตั้งแต่สถาปัตยกรรมภายใน ไปจนถึง Kafka Streams, Kafka Connect, Schema Registry, ksqlDB และ KRaft mode ที่มาแทน ZooKeeper เหมาะสำหรับนักพัฒนาและ Data Engineer ที่ต้องการสร้าง Data Pipeline ระดับ Production ในปี 2026

สถาปัตยกรรม Kafka ทบทวนแบบเจาะลึก

ก่อนจะเข้าสู่เครื่องมือรอบข้าง Kafka Ecosystem มาทบทวนสถาปัตยกรรมหลักของ Kafka ให้เข้าใจอย่างถ่องแท้ Kafka ไม่ใช่แค่ Message Queue ธรรมดา แต่เป็น Distributed Commit Log ที่ออกแบบมาเพื่อรองรับ Throughput สูงมากและเก็บข้อมูลได้นาน

Broker, Topic และ Partition

Broker คือ Server แต่ละตัวใน Kafka Cluster โดยปกติจะมีอย่างน้อย 3 Broker เพื่อความ High Availability ข้อมูลจะถูกกระจายไปเก็บใน Broker ต่างๆ โดยอัตโนมัติ แต่ละ Broker สามารถจัดการ Partition ได้หลายพัน Partition และรองรับ Throughput หลายร้อย MB ต่อวินาที

Topic คือหมวดหมู่ของข้อมูล เปรียบเหมือน Table ในฐานข้อมูล แต่ละ Topic จะถูกแบ่งออกเป็น Partition หลายๆ Partition ซึ่งเป็นหน่วยพื้นฐานของ Parallelism ใน Kafka ข้อมูลใน Partition จะเรียงตาม Offset ซึ่งเป็นตัวเลขที่เพิ่มขึ้นเรื่อยๆ ไม่มีวันซ้ำกัน

# สร้าง Topic ใหม่
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders \
  --partitions 12 \
  --replication-factor 3

# ดูรายละเอียด Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

# ผลลัพธ์:
# Topic: orders  Partitions: 12  ReplicationFactor: 3
# Partition: 0  Leader: 1  Replicas: 1,2,3  Isr: 1,2,3
# Partition: 1  Leader: 2  Replicas: 2,3,1  Isr: 2,3,1

Consumer Groups

Consumer Group คือกลุ่มของ Consumer ที่ทำงานร่วมกันเพื่ออ่านข้อมูลจาก Topic เดียวกัน Kafka จะ Assign แต่ละ Partition ให้กับ Consumer เพียงตัวเดียวในกลุ่ม ทำให้สามารถ Scale การประมวลผลได้โดยเพิ่มจำนวน Consumer เท่ากับจำนวน Partition

ข้อสำคัญคือ จำนวน Consumer ไม่ควรมากกว่าจำนวน Partition เพราะ Consumer ส่วนเกินจะไม่ได้รับ Partition ใดเลยและจะ Idle อยู่เฉยๆ หากต้องการ Scale เพิ่มต้องเพิ่ม Partition ด้วย

# ดู Consumer Group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-processor

# ผลลัพธ์:
# GROUP           TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-processor orders  0          15234           15240           6
# order-processor orders  1          14890           14890           0

Replication และ ISR

ข้อมูลใน Kafka จะถูก Replicate ไปยัง Broker หลายตัวตาม replication-factor ที่กำหนด แต่ละ Partition จะมี Leader หนึ่งตัวที่รับ Read/Write ทั้งหมด และ Follower ที่คอย Sync ข้อมูลตาม กลุ่ม Follower ที่ Sync ทันเรียกว่า ISR (In-Sync Replicas)

หาก Leader ล่ม Kafka จะเลือก Leader ใหม่จาก ISR โดยอัตโนมัติ ถ้าตั้ง min.insync.replicas=2 Kafka จะไม่ยอมรับข้อมูลใหม่หาก ISR เหลือน้อยกว่า 2 ตัว เป็นการป้องกันการสูญเสียข้อมูล

Kafka Connect — เชื่อมต่อกับทุกระบบ

Kafka Connect เป็น Framework สำหรับย้ายข้อมูลเข้า-ออก Kafka โดยไม่ต้องเขียน Code เอง ใช้ Connector ที่มีให้เลือกมากกว่า 200 ตัวจาก Confluent Hub และชุมชน Open Source

Source Connector vs Sink Connector

Source Connector ดึงข้อมูลจากระบบภายนอก (เช่น Database, File, API) เข้ามาใน Kafka Topic ส่วน Sink Connector ส่งข้อมูลจาก Kafka Topic ออกไปยังระบบปลายทาง (เช่น Elasticsearch, S3, Database) ทั้งสองทำงานแบบ Distributed และ Fault-tolerant โดยอัตโนมัติ

# ตัวอย่าง JDBC Source Connector — ดึงข้อมูลจาก MySQL
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "mysql-source-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://mysql:3306/shop",
    "connection.user": "kafka_user",
    "connection.password": "secret",
    "table.whitelist": "orders,order_items",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "mysql-",
    "poll.interval.ms": 5000,
    "tasks.max": 3
  }
}'

# ดูสถานะ Connector
curl http://localhost:8083/connectors/mysql-source-orders/status

Debezium CDC — จับทุกการเปลี่ยนแปลงในฐานข้อมูล

Debezium เป็น Change Data Capture (CDC) Platform ที่ทำงานเป็น Kafka Source Connector จับทุกการเปลี่ยนแปลง (INSERT, UPDATE, DELETE) จากฐานข้อมูลแบบ Real-time โดยอ่านจาก Transaction Log ของฐานข้อมูลโดยตรง ไม่ต้อง Poll ซ้ำๆ

# Debezium PostgreSQL Connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "pg-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "shop",
    "database.server.name": "shop-db",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}'

Sink Connector ยอดนิยม

# Elasticsearch Sink Connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "mysql-orders",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": false,
    "schema.ignore": true,
    "tasks.max": 3
  }
}'

# S3 Sink Connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "mysql-orders",
    "s3.bucket.name": "data-lake-raw",
    "s3.region": "ap-southeast-1",
    "flush.size": 10000,
    "rotate.interval.ms": 3600000,
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "locale": "en-US",
    "timezone": "Asia/Bangkok",
    "tasks.max": 3
  }
}'
Production Tip: ใน Production ควรตั้ง errors.tolerance=all และ errors.deadletterqueue.topic.name เพื่อให้ข้อมูลที่ Error ไม่หยุดทั้ง Pipeline แต่ถูกส่งไป Dead Letter Queue แทน

Kafka Streams — Stream Processing ใน Java/Kotlin

Kafka Streams เป็น Library สำหรับ Stream Processing ที่ทำงานเป็นส่วนหนึ่งของ Application ไม่ต้องมี Cluster แยก (ต่างจาก Apache Flink หรือ Spark Streaming) ทำให้ Deploy ง่ายและ Scale ได้ด้วยการเพิ่ม Instance ของ Application

Stateless Operations

Operation ที่ไม่ต้องจำ State ได้แก่ filter, map, flatMap, branch และ merge

// Java — Kafka Streams Stateless Example
StreamsBuilder builder = new StreamsBuilder();

KStream<String, Order> orders = builder.stream("orders",
    Consumed.with(Serdes.String(), orderSerde));

// Filter เฉพาะ order ที่มูลค่าสูงกว่า 1000
KStream<String, Order> highValueOrders = orders
    .filter((key, order) -> order.getAmount() > 1000);

// แยก Stream ตาม Region
Map<String, KStream<String, Order>> branches = orders
    .split(Named.as("region-"))
    .branch((k, v) -> v.getRegion().equals("APAC"), Branched.as("apac"))
    .branch((k, v) -> v.getRegion().equals("EU"), Branched.as("eu"))
    .defaultBranch(Branched.as("other"));

// Map เปลี่ยน Value
KStream<String, EnrichedOrder> enriched = orders
    .mapValues(order -> new EnrichedOrder(
        order, lookupCustomer(order.getCustomerId())
    ));

highValueOrders.to("high-value-orders",
    Produced.with(Serdes.String(), orderSerde));

Stateful Operations กับ KTable

KTable คือ Changelog Stream ที่เก็บค่าล่าสุดของแต่ละ Key เหมือน Materialized View ของฐานข้อมูล ใช้สำหรับ Aggregation, Join และ Windowing

// KTable — นับจำนวน Order ต่อลูกค้า
KTable<String, Long> orderCounts = orders
    .groupBy((key, order) -> order.getCustomerId(),
        Grouped.with(Serdes.String(), orderSerde))
    .count(Materialized.as("order-counts-store"));

// Windowed Aggregation — ยอดขายต่อชั่วโมง
TimeWindowedKStream<String, Order> windowedOrders = orders
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)));

KTable<Windowed<String>, Double> hourlySales = windowedOrders
    .aggregate(
        () -> 0.0,
        (key, order, total) -> total + order.getAmount(),
        Materialized.<String, Double, WindowStore<Bytes, byte[]>>as("hourly-sales")
            .withValueSerde(Serdes.Double())
    );

// Join KStream กับ KTable
KTable<String, Customer> customers = builder.table("customers",
    Consumed.with(Serdes.String(), customerSerde));

KStream<String, OrderWithCustomer> enrichedOrders = orders
    .selectKey((k, v) -> v.getCustomerId())
    .join(customers,
        (order, customer) -> new OrderWithCustomer(order, customer),
        Joined.with(Serdes.String(), orderSerde, customerSerde));

Windowing ใน Kafka Streams

Kafka Streams รองรับ Window หลายแบบสำหรับ Time-based Aggregation ได้แก่ Tumbling Window (ช่วงเวลาตายตัวไม่ซ้อน), Hopping Window (ช่วงเวลาที่ซ้อนกันได้), Sliding Window (ขยับตาม Event) และ Session Window (Group ตาม Activity Gap)

// Tumbling Window — 5 นาที ไม่ซ้อนกัน
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))

// Hopping Window — 5 นาที ขยับทุก 1 นาที
.windowedBy(TimeWindows.ofSizeAndGrace(
    Duration.ofMinutes(5), Duration.ofMinutes(1))
    .advanceBy(Duration.ofMinutes(1)))

// Session Window — Gap 30 นาที
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))

ksqlDB — SQL บน Kafka Streams

ksqlDB ช่วยให้คุณเขียน Stream Processing ด้วย SQL โดยไม่ต้องเขียน Java/Kotlin เหมาะสำหรับทีมที่คุ้นเคยกับ SQL มากกว่า Programming Language

-- สร้าง Stream จาก Topic
CREATE STREAM orders_stream (
  order_id VARCHAR KEY,
  customer_id VARCHAR,
  product_id VARCHAR,
  amount DOUBLE,
  region VARCHAR,
  order_time TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'orders',
  VALUE_FORMAT = 'AVRO',
  TIMESTAMP = 'order_time'
);

-- สร้าง Table สำหรับ Customer
CREATE TABLE customers_table (
  customer_id VARCHAR PRIMARY KEY,
  name VARCHAR,
  email VARCHAR,
  tier VARCHAR
) WITH (
  KAFKA_TOPIC = 'customers',
  VALUE_FORMAT = 'AVRO'
);

-- Continuous Query: ยอดขายต่อชั่วโมงต่อ Region
CREATE TABLE hourly_sales AS
  SELECT region,
         WINDOWSTART AS window_start,
         WINDOWEND AS window_end,
         COUNT(*) AS order_count,
         SUM(amount) AS total_amount,
         AVG(amount) AS avg_amount
  FROM orders_stream
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY region
  EMIT CHANGES;

-- Join Stream กับ Table
CREATE STREAM enriched_orders AS
  SELECT o.order_id, o.amount, o.region,
         c.name AS customer_name, c.tier AS customer_tier
  FROM orders_stream o
  LEFT JOIN customers_table c ON o.customer_id = c.customer_id
  EMIT CHANGES;

-- Push Query (Real-time)
SELECT * FROM hourly_sales WHERE region = 'APAC' EMIT CHANGES;

-- Pull Query (Point-in-time)
SELECT * FROM hourly_sales WHERE region = 'APAC';
ksqlDB vs Kafka Streams: ksqlDB เหมาะสำหรับ Prototyping และ Use Case ที่ไม่ซับซ้อนมาก Kafka Streams เหมาะสำหรับ Logic ที่ซับซ้อนและต้องการ Custom State Store หรือ Processor API ในทางปฏิบัติหลายทีมใช้ทั้งสองอย่างร่วมกัน

Schema Registry — จัดการ Schema ของข้อมูล

เมื่อ Producer และ Consumer หลายตัวใช้ Kafka Topic ร่วมกัน ปัญหาใหญ่คือ Schema Compatibility ถ้า Producer เปลี่ยนโครงสร้างข้อมูลโดยไม่บอก Consumer จะพังทันที Schema Registry แก้ปัญหานี้โดยเป็นศูนย์กลางจัดการ Schema ทั้งหมด

รูปแบบ Schema ที่รองรับ

Formatข้อดีข้อเสียเหมาะกับ
AvroCompact binary, Schema evolution ดีต้องมี Schema ก่อนHigh-throughput data pipeline
Protobufเร็วมาก, หลายภาษา, gRPCต้อง Compile proto filesMicroservices, gRPC
JSON Schemaอ่านง่าย, Debug ง่ายขนาดใหญ่, ช้ากว่า binaryDevelopment, RESTful API

Schema Evolution

Schema Registry รองรับ Compatibility Mode หลายแบบ ที่สำคัญคือ Backward Compatible (Consumer ใหม่อ่านข้อมูลเก่าได้), Forward Compatible (Consumer เก่าอ่านข้อมูลใหม่ได้) และ Full Compatible (ทั้ง Backward และ Forward)

# ลงทะเบียน Avro Schema
curl -X POST http://schema-registry:8081/subjects/orders-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "AVRO",
    "schema": "{\"type\": \"record\", \"name\": \"Order\", \"fields\": [{\"name\": \"order_id\", \"type\": \"string\"}, {\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"customer_id\", \"type\": \"string\"}, {\"name\": \"region\", \"type\": [\"null\", \"string\"], \"default\": null}]}"
  }'

# ดู Schema ล่าสุด
curl http://schema-registry:8081/subjects/orders-value/versions/latest

# ตรวจสอบ Compatibility
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{...new schema...}'

# ตั้ง Compatibility Mode
curl -X PUT http://schema-registry:8081/config/orders-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

Kafka Security — รักษาความปลอดภัย

ใน Production Kafka ต้องมีระบบ Security ครบ 3 ด้าน: Authentication (ยืนยันตัวตน), Authorization (สิทธิ์การเข้าถึง) และ Encryption (เข้ารหัสข้อมูล)

SASL Authentication

# server.properties — SASL/SCRAM
listeners=SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

# สร้าง User
kafka-configs.sh --bootstrap-server localhost:9093 \
  --alter --add-config 'SCRAM-SHA-512=[password=secret123]' \
  --entity-type users --entity-name producer-app

# Client config
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="producer-app" password="secret123";

ACL Authorization

# อนุญาตให้ producer-app เขียน Topic orders
kafka-acls.sh --bootstrap-server localhost:9093 \
  --add --allow-principal User:producer-app \
  --operation Write --topic orders \
  --command-config admin.properties

# อนุญาตให้ consumer-app อ่าน Topic orders
kafka-acls.sh --bootstrap-server localhost:9093 \
  --add --allow-principal User:consumer-app \
  --operation Read --topic orders \
  --operation Read --group order-processor \
  --command-config admin.properties

# ดู ACL ทั้งหมด
kafka-acls.sh --bootstrap-server localhost:9093 \
  --list --command-config admin.properties

SSL/TLS Encryption

# สร้าง SSL Certificate
keytool -keystore kafka.broker1.keystore.jks -alias broker1 \
  -keyalg RSA -validity 365 -genkey \
  -storepass changeit -keypass changeit \
  -dname "CN=broker1.kafka.local"

# server.properties
ssl.keystore.location=/etc/kafka/kafka.broker1.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=/etc/kafka/kafka.truststore.jks
ssl.truststore.password=changeit
ssl.client.auth=required

Exactly-Once Semantics (EOS)

การรับประกันว่าข้อมูลจะถูกประมวลผลเพียงครั้งเดียว (Exactly-Once) เป็นเรื่องยากในระบบ Distributed แต่ Kafka รองรับ EOS ผ่าน 2 กลไกหลัก คือ Idempotent Producer และ Transactions

Idempotent Producer

เปิดใช้งานด้วย enable.idempotence=true (เป็น Default ตั้งแต่ Kafka 3.0) Kafka จะกำหนด Producer ID และ Sequence Number ให้แต่ละ Message ถ้ามี Message ซ้ำ (เช่น จาก Network Retry) Kafka จะ Deduplicate ให้อัตโนมัติ

Transactions

สำหรับ Pattern Read-Process-Write ที่ต้องการ Atomic Operation ข้ามหลาย Topic/Partition Kafka รองรับ Transactions ที่ทำงานคล้ายกับ Database Transaction

// Java — Transactional Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", true);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    // อ่านจาก source topic
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        // ประมวลผล
        String result = process(record.value());
        // เขียนไป output topic
        producer.send(new ProducerRecord<>("output-topic", record.key(), result));
    }

    // Commit offset + send ในTransaction เดียวกัน
    producer.sendOffsetsToTransaction(
        offsets, consumer.groupMetadata());
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

// Consumer ที่อ่านเฉพาะ Committed messages
props.put("isolation.level", "read_committed");
EOS Trade-off: Exactly-Once มาพร้อมกับ Latency ที่สูงขึ้น (ประมาณ 20-50%) เนื่องจาก Transaction Overhead ให้พิจารณาว่า Use Case ของคุณต้องการ EOS จริงหรือไม่ หลาย Use Case ใช้ At-Least-Once + Idempotent Processing ก็เพียงพอแล้ว

Kafka Performance Tuning

การ Tune Kafka ให้ได้ประสิทธิภาพสูงสุดต้องปรับหลายพารามิเตอร์ทั้งฝั่ง Producer, Consumer และ Broker

Producer Tuning

พารามิเตอร์ค่าแนะนำคำอธิบาย
batch.size32768-65536ขนาด Batch ที่ส่งครั้งละเท่าไหร่ (bytes)
linger.ms5-20รอรวบรวม Message ก่อนส่ง (ms)
compression.typelz4 หรือ zstdลดขนาดข้อมูล ประหยัด Network
buffer.memory67108864Memory สำหรับ Buffer ก่อนส่ง (64MB)
acksall (-1)รอ Acknowledge จากทุก ISR Replica
max.in.flight.requests5จำนวน Request ที่ส่งพร้อมกัน (ต้อง <= 5 สำหรับ EOS)

Consumer Tuning

# Consumer Performance Config
fetch.min.bytes=1048576           # รอรวบรวมข้อมูลก่อน Fetch (1MB)
fetch.max.wait.ms=500             # รอไม่เกิน 500ms
max.poll.records=1000             # จำนวน Records ต่อครั้ง Poll
max.partition.fetch.bytes=1048576 # ขนาดสูงสุดต่อ Partition ต่อ Fetch
session.timeout.ms=45000          # Timeout สำหรับ Consumer Group
heartbeat.interval.ms=15000       # ส่ง Heartbeat ทุก 15 วินาที

Partition Count Strategy

จำนวน Partition มีผลต่อ Throughput และ Parallelism โดยตรง สูตรคำนวณคร่าวๆ คือ: Partitions = max(Target Throughput / Producer Throughput per Partition, Target Throughput / Consumer Throughput per Partition) โดยทั่วไปเริ่มต้นที่ 6-12 Partition สำหรับ Topic ที่มี Traffic ปานกลาง และ 30-100 สำหรับ High-throughput Topic

Monitoring Kafka

Kafka เปิดเผย Metrics ผ่าน JMX ซึ่งสามารถ Scrape ด้วย Prometheus ผ่าน JMX Exporter แล้วแสดงผลใน Grafana

Metrics สำคัญที่ต้อง Monitor

MetricความหมายThreshold
UnderReplicatedPartitionsPartition ที่ ISR ไม่ครบต้องเป็น 0 เสมอ
ActiveControllerCountจำนวน Controllerต้องเป็น 1 เสมอ
OfflinePartitionsCountPartition ที่ไม่มี Leaderต้องเป็น 0 เสมอ
Consumer Lagข้อมูลที่ Consumer ยังไม่ได้อ่านขึ้นอยู่กับ SLA
RequestsPerSecจำนวน Request ต่อวินาทีMonitor trend
BytesInPerSec/BytesOutPerSecNetwork throughputไม่ควรเกิน 70% ของ NIC
# docker-compose สำหรับ Kafka monitoring stack
services:
  kafka-exporter:
    image: danielqsj/kafka-exporter
    command:
      - --kafka.server=kafka1:9092
      - --kafka.server=kafka2:9092
      - --kafka.server=kafka3:9092
    ports:
      - "9308:9308"

  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

Cruise Control — Auto Rebalancing

LinkedIn Cruise Control เป็นเครื่องมือสำหรับ Auto-rebalance Partition ใน Kafka Cluster ทำงานโดยวิเคราะห์ Load ของแต่ละ Broker แล้วย้าย Partition เพื่อกระจาย Load ให้สม่ำเสมอ มี REST API สำหรับ Trigger Rebalance และ Monitor สถานะ

Kafka บน Kubernetes — Strimzi

Strimzi เป็น Kubernetes Operator สำหรับ Deploy และจัดการ Kafka Cluster บน Kubernetes ให้ Day-2 Operations (scaling, rolling update, certificate rotation) เป็นเรื่องง่าย

# Strimzi Kafka Cluster CRD
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: production-kafka
  namespace: kafka
spec:
  kafka:
    version: 3.7.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: loadbalancer
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      num.partitions: 6
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 500Gi
          class: fast-ssd
          deleteClaim: false
    resources:
      requests:
        memory: 4Gi
        cpu: "2"
      limits:
        memory: 8Gi
        cpu: "4"
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 50Gi
      class: fast-ssd
  entityOperator:
    topicOperator: {}
    userOperator: {}

KRaft Mode — ลาก่อน ZooKeeper

ตั้งแต่ Kafka 3.3 เป็นต้นมา KRaft (Kafka Raft) mode พร้อมใช้งาน Production ทำให้ไม่ต้องใช้ ZooKeeper อีกต่อไป KRaft ใช้ Raft Consensus Protocol ในการจัดการ Metadata ของ Cluster ทำให้ Architecture เรียบง่ายขึ้น Boot เร็วขึ้น และ Scale ได้มากขึ้น

ข้อดีของ KRaft เมื่อเทียบกับ ZooKeeper

# KRaft Mode Configuration
# กำหนด Node Role
process.roles=broker,controller    # Combined mode
# หรือ
process.roles=controller           # Dedicated controller
process.roles=broker               # Dedicated broker

# Controller quorum
controller.quorum.voters=1@controller1:9093,2@controller2:9093,3@controller3:9093
controller.listener.names=CONTROLLER

# Node ID (ต้องไม่ซ้ำกัน)
node.id=1

# Generate Cluster ID
kafka-storage.sh random-uuid
# Format Storage
kafka-storage.sh format -t <cluster-id> -c server.properties

Kafka vs Pulsar vs Redpanda

ในปี 2026 มีทางเลือกอื่นนอกจาก Kafka ให้พิจารณาด้วย

FeatureApache KafkaApache PulsarRedpanda
ภาษาJava/ScalaJavaC++
ZooKeeperไม่ต้อง (KRaft)ต้อง (BookKeeper)ไม่ต้อง (Raft built-in)
Latencyต่ำ (ms)ต่ำมาก (sub-ms)ต่ำมาก (sub-ms)
Multi-tenancyจำกัดดีมาก (Namespace)ปานกลาง
Geo-replicationMirrorMaker 2Built-inไม่มี native
Tiered Storageมี (ตั้งแต่ 3.6)มี (built-in)มี
API Compatibility-Kafka API adapter100% Kafka API
Ecosystemใหญ่ที่สุดกำลังโตใช้ Kafka Ecosystem
เหมาะกับทุก Use CaseMulti-tenant, QueueKafka ที่เร็วกว่า

Data Pipeline Architecture กับ Kafka

Kafka เป็นแกนกลางของ Data Pipeline Architecture หลายรูปแบบ ที่พบบ่อยในปี 2026 ได้แก่:

Lambda Architecture

แบ่ง Pipeline เป็น 2 Layer: Batch Layer ประมวลผลข้อมูลย้อนหลังทั้งหมดด้วย Spark/Hadoop และ Speed Layer ประมวลผล Real-time ด้วย Kafka Streams ผลลัพธ์จากทั้ง 2 Layer รวมกันใน Serving Layer

Kappa Architecture

ใช้ Stream Processing เป็นหลัก ทุกอย่างผ่าน Kafka หมด ไม่มี Batch Layer แยก ง่ายกว่า Lambda และเป็นที่นิยมมากขึ้นเรื่อยๆ เมื่อ Kafka Streams และ ksqlDB มีความสามารถมากพอ

Event Sourcing + CQRS

ใช้ Kafka เป็น Event Store เก็บทุก Event ที่เกิดขึ้น แยก Read Model (Query) จาก Write Model (Command) Kafka Connect ส่ง Event ไปสร้าง Read-optimized View ใน Database ปลายทาง

# ตัวอย่าง Data Pipeline Architecture
# Source Systems -> Debezium CDC -> Kafka Topics
#                                    |
#                          +---------+---------+
#                          |         |         |
#                    Kafka Streams  ksqlDB    Kafka Connect
#                    (Real-time    (SQL       (Sink)
#                     processing)  analytics)  |
#                          |         |    +----+----+
#                          v         v    v    v    v
#                      Kafka Topic  Dashboard ES  S3  PostgreSQL
#                      (enriched)            (search)(lake)(serving)

Real-world Use Cases

E-commerce: Order Processing Pipeline

เมื่อลูกค้าสั่งซื้อ Event จะถูกส่งเข้า Kafka แล้วกระจายไปยัง Consumer หลายตัว: Inventory Service ลดสต็อก, Payment Service เรียกเก็บเงิน, Notification Service ส่งอีเมล, Analytics Service บันทึกข้อมูลลง Data Warehouse ทั้งหมดนี้ทำงาน Asynchronously และ Independently

Financial Services: Fraud Detection

ใช้ Kafka Streams ประมวลผล Transaction แบบ Real-time ตรวจจับ Pattern ที่ผิดปกติ (เช่น Transaction ถี่ผิดปกติ, จำนวนเงินสูงผิดปกติ, หลาย Location ในเวลาใกล้กัน) ใช้ Windowing เพื่อดู Pattern ในช่วงเวลาที่กำหนด

IoT: Sensor Data Processing

อุปกรณ์ IoT หลายล้านตัวส่งข้อมูล Sensor เข้า Kafka ผ่าน MQTT Bridge จากนั้น Kafka Streams ทำ Aggregation (เช่น ค่าเฉลี่ยอุณหภูมิต่อ 5 นาที) และ Anomaly Detection แล้วส่งไป Time-series Database (เช่น InfluxDB) และ Alerting System

Log Aggregation

Application หลายร้อยตัวส่ง Log เข้า Kafka แทนที่จะส่งตรงไป Elasticsearch ซึ่งอาจรับไม่ไหว Kafka ทำหน้าที่เป็น Buffer ที่รับ Throughput สูงมากได้ แล้วค่อยๆ ส่งไป Elasticsearch ด้วย Kafka Connect ตาม Rate ที่ Elasticsearch รับได้

Best Practices สำหรับ Production

Topic Naming Convention

# รูปแบบ: <domain>.<system>.<entity>.<version>
# ตัวอย่าง:
shop.orders.created.v1
shop.payments.completed.v1
user.profiles.updated.v2
analytics.pageviews.raw.v1

การจัดการ Dead Letter Queue (DLQ)

# Kafka Streams Error Handling
StreamsConfig config = new StreamsConfig(props);
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);

// Custom DLQ Handler
public class DlqProductionExceptionHandler implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record,
                                                      Exception exception) {
        // ส่งไป DLQ topic
        dlqProducer.send(new ProducerRecord<>("dlq.processing-errors",
            record.key(), record.value()));
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

Retention Policy

# Time-based retention
log.retention.hours=168           # 7 วัน (default)
log.retention.ms=604800000        # เท่ากัน (ms)

# Size-based retention
log.retention.bytes=107374182400  # 100GB ต่อ Partition

# Compact + Delete (สำหรับ Changelog topics)
log.cleanup.policy=compact,delete
min.compaction.lag.ms=86400000    # Compact หลัง 24 ชั่วโมง

Docker Compose สำหรับ Kafka Development

version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    hostname: kafka
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093,HOST://0.0.0.0:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_PROCESS_ROLES: broker,controller
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
    volumes:
      - kafka-data:/var/lib/kafka/data

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
    depends_on:
      - kafka

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.6.0
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    depends_on:
      - kafka
      - schema-registry

  ksqldb:
    image: confluentinc/cp-ksqldb-server:7.6.0
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: kafka:29092
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
    depends_on:
      - kafka
      - schema-registry

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
    depends_on:
      - kafka

volumes:
  kafka-data:

สรุป

Apache Kafka เป็นมากกว่า Message Queue ธรรมดา มันเป็นแพลตฟอร์ม Event Streaming ครบวงจรที่มี Ecosystem กว้างขวาง ตั้งแต่ Kafka Connect สำหรับ Integration, Kafka Streams สำหรับ Stream Processing, ksqlDB สำหรับ SQL Analytics และ Schema Registry สำหรับ Data Governance

ในปี 2026 การเปลี่ยนไปใช้ KRaft mode ทำให้ Kafka ง่ายขึ้นในการ Deploy และ Operate ไม่ต้องดูแล ZooKeeper อีกต่อไป รองรับ Partition ได้มากขึ้น และ Controller Failover เร็วขึ้น ถ้าคุณกำลังสร้าง Data Pipeline หรือ Event-Driven Architecture Kafka ยังคงเป็นตัวเลือกอันดับหนึ่งที่ควรพิจารณา

สิ่งสำคัญคือเลือก Tools ให้เหมาะกับ Use Case ไม่จำเป็นต้องใช้ทุกอย่างในคราวเดียว เริ่มจาก Kafka Core + Connect สำหรับ Data Integration แล้วค่อยเพิ่ม Streams หรือ ksqlDB เมื่อต้องการ Real-time Processing


Back to Blog | iCafe Forex | SiamLanCard | Siam2R