Apache Kafka เป็นแพลตฟอร์ม Event Streaming แบบ Distributed ที่ได้รับความนิยมมากที่สุดในโลก ถูกพัฒนาโดย LinkedIn และเปิดเป็น Open Source ภายใต้ Apache Software Foundation ในปี 2011 ปัจจุบันองค์กรขนาดใหญ่กว่า 80% ของ Fortune 100 ใช้ Kafka เป็นแกนหลักของ Data Pipeline
บทความนี้จะพาคุณเจาะลึกทุกแง่มุมของ Kafka ตั้งแต่สถาปัตยกรรมภายใน ไปจนถึง Kafka Streams, Kafka Connect, Schema Registry, ksqlDB และ KRaft mode ที่มาแทน ZooKeeper เหมาะสำหรับนักพัฒนาและ Data Engineer ที่ต้องการสร้าง Data Pipeline ระดับ Production ในปี 2026
สถาปัตยกรรม Kafka ทบทวนแบบเจาะลึก
ก่อนจะเข้าสู่เครื่องมือรอบข้าง Kafka Ecosystem มาทบทวนสถาปัตยกรรมหลักของ Kafka ให้เข้าใจอย่างถ่องแท้ Kafka ไม่ใช่แค่ Message Queue ธรรมดา แต่เป็น Distributed Commit Log ที่ออกแบบมาเพื่อรองรับ Throughput สูงมากและเก็บข้อมูลได้นาน
Broker, Topic และ Partition
Broker คือ Server แต่ละตัวใน Kafka Cluster โดยปกติจะมีอย่างน้อย 3 Broker เพื่อความ High Availability ข้อมูลจะถูกกระจายไปเก็บใน Broker ต่างๆ โดยอัตโนมัติ แต่ละ Broker สามารถจัดการ Partition ได้หลายพัน Partition และรองรับ Throughput หลายร้อย MB ต่อวินาที
Topic คือหมวดหมู่ของข้อมูล เปรียบเหมือน Table ในฐานข้อมูล แต่ละ Topic จะถูกแบ่งออกเป็น Partition หลายๆ Partition ซึ่งเป็นหน่วยพื้นฐานของ Parallelism ใน Kafka ข้อมูลใน Partition จะเรียงตาม Offset ซึ่งเป็นตัวเลขที่เพิ่มขึ้นเรื่อยๆ ไม่มีวันซ้ำกัน
# สร้าง Topic ใหม่
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic orders \
--partitions 12 \
--replication-factor 3
# ดูรายละเอียด Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
# ผลลัพธ์:
# Topic: orders Partitions: 12 ReplicationFactor: 3
# Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
# Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Consumer Groups
Consumer Group คือกลุ่มของ Consumer ที่ทำงานร่วมกันเพื่ออ่านข้อมูลจาก Topic เดียวกัน Kafka จะ Assign แต่ละ Partition ให้กับ Consumer เพียงตัวเดียวในกลุ่ม ทำให้สามารถ Scale การประมวลผลได้โดยเพิ่มจำนวน Consumer เท่ากับจำนวน Partition
ข้อสำคัญคือ จำนวน Consumer ไม่ควรมากกว่าจำนวน Partition เพราะ Consumer ส่วนเกินจะไม่ได้รับ Partition ใดเลยและจะ Idle อยู่เฉยๆ หากต้องการ Scale เพิ่มต้องเพิ่ม Partition ด้วย
# ดู Consumer Group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor
# ผลลัพธ์:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-processor orders 0 15234 15240 6
# order-processor orders 1 14890 14890 0
Replication และ ISR
ข้อมูลใน Kafka จะถูก Replicate ไปยัง Broker หลายตัวตาม replication-factor ที่กำหนด แต่ละ Partition จะมี Leader หนึ่งตัวที่รับ Read/Write ทั้งหมด และ Follower ที่คอย Sync ข้อมูลตาม กลุ่ม Follower ที่ Sync ทันเรียกว่า ISR (In-Sync Replicas)
หาก Leader ล่ม Kafka จะเลือก Leader ใหม่จาก ISR โดยอัตโนมัติ ถ้าตั้ง min.insync.replicas=2 Kafka จะไม่ยอมรับข้อมูลใหม่หาก ISR เหลือน้อยกว่า 2 ตัว เป็นการป้องกันการสูญเสียข้อมูล
Kafka Connect — เชื่อมต่อกับทุกระบบ
Kafka Connect เป็น Framework สำหรับย้ายข้อมูลเข้า-ออก Kafka โดยไม่ต้องเขียน Code เอง ใช้ Connector ที่มีให้เลือกมากกว่า 200 ตัวจาก Confluent Hub และชุมชน Open Source
Source Connector vs Sink Connector
Source Connector ดึงข้อมูลจากระบบภายนอก (เช่น Database, File, API) เข้ามาใน Kafka Topic ส่วน Sink Connector ส่งข้อมูลจาก Kafka Topic ออกไปยังระบบปลายทาง (เช่น Elasticsearch, S3, Database) ทั้งสองทำงานแบบ Distributed และ Fault-tolerant โดยอัตโนมัติ
# ตัวอย่าง JDBC Source Connector — ดึงข้อมูลจาก MySQL
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "mysql-source-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/shop",
"connection.user": "kafka_user",
"connection.password": "secret",
"table.whitelist": "orders,order_items",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mysql-",
"poll.interval.ms": 5000,
"tasks.max": 3
}
}'
# ดูสถานะ Connector
curl http://localhost:8083/connectors/mysql-source-orders/status
Debezium CDC — จับทุกการเปลี่ยนแปลงในฐานข้อมูล
Debezium เป็น Change Data Capture (CDC) Platform ที่ทำงานเป็น Kafka Source Connector จับทุกการเปลี่ยนแปลง (INSERT, UPDATE, DELETE) จากฐานข้อมูลแบบ Real-time โดยอ่านจาก Transaction Log ของฐานข้อมูลโดยตรง ไม่ต้อง Poll ซ้ำๆ
# Debezium PostgreSQL Connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "pg-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "shop",
"database.server.name": "shop-db",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}'
Sink Connector ยอดนิยม
# Elasticsearch Sink Connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "mysql-orders",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": false,
"schema.ignore": true,
"tasks.max": 3
}
}'
# S3 Sink Connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "mysql-orders",
"s3.bucket.name": "data-lake-raw",
"s3.region": "ap-southeast-1",
"flush.size": 10000,
"rotate.interval.ms": 3600000,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "en-US",
"timezone": "Asia/Bangkok",
"tasks.max": 3
}
}'
errors.tolerance=all และ errors.deadletterqueue.topic.name เพื่อให้ข้อมูลที่ Error ไม่หยุดทั้ง Pipeline แต่ถูกส่งไป Dead Letter Queue แทน
Kafka Streams — Stream Processing ใน Java/Kotlin
Kafka Streams เป็น Library สำหรับ Stream Processing ที่ทำงานเป็นส่วนหนึ่งของ Application ไม่ต้องมี Cluster แยก (ต่างจาก Apache Flink หรือ Spark Streaming) ทำให้ Deploy ง่ายและ Scale ได้ด้วยการเพิ่ม Instance ของ Application
Stateless Operations
Operation ที่ไม่ต้องจำ State ได้แก่ filter, map, flatMap, branch และ merge
// Java — Kafka Streams Stateless Example
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders",
Consumed.with(Serdes.String(), orderSerde));
// Filter เฉพาะ order ที่มูลค่าสูงกว่า 1000
KStream<String, Order> highValueOrders = orders
.filter((key, order) -> order.getAmount() > 1000);
// แยก Stream ตาม Region
Map<String, KStream<String, Order>> branches = orders
.split(Named.as("region-"))
.branch((k, v) -> v.getRegion().equals("APAC"), Branched.as("apac"))
.branch((k, v) -> v.getRegion().equals("EU"), Branched.as("eu"))
.defaultBranch(Branched.as("other"));
// Map เปลี่ยน Value
KStream<String, EnrichedOrder> enriched = orders
.mapValues(order -> new EnrichedOrder(
order, lookupCustomer(order.getCustomerId())
));
highValueOrders.to("high-value-orders",
Produced.with(Serdes.String(), orderSerde));
Stateful Operations กับ KTable
KTable คือ Changelog Stream ที่เก็บค่าล่าสุดของแต่ละ Key เหมือน Materialized View ของฐานข้อมูล ใช้สำหรับ Aggregation, Join และ Windowing
// KTable — นับจำนวน Order ต่อลูกค้า
KTable<String, Long> orderCounts = orders
.groupBy((key, order) -> order.getCustomerId(),
Grouped.with(Serdes.String(), orderSerde))
.count(Materialized.as("order-counts-store"));
// Windowed Aggregation — ยอดขายต่อชั่วโมง
TimeWindowedKStream<String, Order> windowedOrders = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)));
KTable<Windowed<String>, Double> hourlySales = windowedOrders
.aggregate(
() -> 0.0,
(key, order, total) -> total + order.getAmount(),
Materialized.<String, Double, WindowStore<Bytes, byte[]>>as("hourly-sales")
.withValueSerde(Serdes.Double())
);
// Join KStream กับ KTable
KTable<String, Customer> customers = builder.table("customers",
Consumed.with(Serdes.String(), customerSerde));
KStream<String, OrderWithCustomer> enrichedOrders = orders
.selectKey((k, v) -> v.getCustomerId())
.join(customers,
(order, customer) -> new OrderWithCustomer(order, customer),
Joined.with(Serdes.String(), orderSerde, customerSerde));
Windowing ใน Kafka Streams
Kafka Streams รองรับ Window หลายแบบสำหรับ Time-based Aggregation ได้แก่ Tumbling Window (ช่วงเวลาตายตัวไม่ซ้อน), Hopping Window (ช่วงเวลาที่ซ้อนกันได้), Sliding Window (ขยับตาม Event) และ Session Window (Group ตาม Activity Gap)
// Tumbling Window — 5 นาที ไม่ซ้อนกัน
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
// Hopping Window — 5 นาที ขยับทุก 1 นาที
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5), Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1)))
// Session Window — Gap 30 นาที
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
ksqlDB — SQL บน Kafka Streams
ksqlDB ช่วยให้คุณเขียน Stream Processing ด้วย SQL โดยไม่ต้องเขียน Java/Kotlin เหมาะสำหรับทีมที่คุ้นเคยกับ SQL มากกว่า Programming Language
-- สร้าง Stream จาก Topic
CREATE STREAM orders_stream (
order_id VARCHAR KEY,
customer_id VARCHAR,
product_id VARCHAR,
amount DOUBLE,
region VARCHAR,
order_time TIMESTAMP
) WITH (
KAFKA_TOPIC = 'orders',
VALUE_FORMAT = 'AVRO',
TIMESTAMP = 'order_time'
);
-- สร้าง Table สำหรับ Customer
CREATE TABLE customers_table (
customer_id VARCHAR PRIMARY KEY,
name VARCHAR,
email VARCHAR,
tier VARCHAR
) WITH (
KAFKA_TOPIC = 'customers',
VALUE_FORMAT = 'AVRO'
);
-- Continuous Query: ยอดขายต่อชั่วโมงต่อ Region
CREATE TABLE hourly_sales AS
SELECT region,
WINDOWSTART AS window_start,
WINDOWEND AS window_end,
COUNT(*) AS order_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount
FROM orders_stream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY region
EMIT CHANGES;
-- Join Stream กับ Table
CREATE STREAM enriched_orders AS
SELECT o.order_id, o.amount, o.region,
c.name AS customer_name, c.tier AS customer_tier
FROM orders_stream o
LEFT JOIN customers_table c ON o.customer_id = c.customer_id
EMIT CHANGES;
-- Push Query (Real-time)
SELECT * FROM hourly_sales WHERE region = 'APAC' EMIT CHANGES;
-- Pull Query (Point-in-time)
SELECT * FROM hourly_sales WHERE region = 'APAC';
Schema Registry — จัดการ Schema ของข้อมูล
เมื่อ Producer และ Consumer หลายตัวใช้ Kafka Topic ร่วมกัน ปัญหาใหญ่คือ Schema Compatibility ถ้า Producer เปลี่ยนโครงสร้างข้อมูลโดยไม่บอก Consumer จะพังทันที Schema Registry แก้ปัญหานี้โดยเป็นศูนย์กลางจัดการ Schema ทั้งหมด
รูปแบบ Schema ที่รองรับ
| Format | ข้อดี | ข้อเสีย | เหมาะกับ |
|---|---|---|---|
| Avro | Compact binary, Schema evolution ดี | ต้องมี Schema ก่อน | High-throughput data pipeline |
| Protobuf | เร็วมาก, หลายภาษา, gRPC | ต้อง Compile proto files | Microservices, gRPC |
| JSON Schema | อ่านง่าย, Debug ง่าย | ขนาดใหญ่, ช้ากว่า binary | Development, RESTful API |
Schema Evolution
Schema Registry รองรับ Compatibility Mode หลายแบบ ที่สำคัญคือ Backward Compatible (Consumer ใหม่อ่านข้อมูลเก่าได้), Forward Compatible (Consumer เก่าอ่านข้อมูลใหม่ได้) และ Full Compatible (ทั้ง Backward และ Forward)
# ลงทะเบียน Avro Schema
curl -X POST http://schema-registry:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "AVRO",
"schema": "{\"type\": \"record\", \"name\": \"Order\", \"fields\": [{\"name\": \"order_id\", \"type\": \"string\"}, {\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"customer_id\", \"type\": \"string\"}, {\"name\": \"region\", \"type\": [\"null\", \"string\"], \"default\": null}]}"
}'
# ดู Schema ล่าสุด
curl http://schema-registry:8081/subjects/orders-value/versions/latest
# ตรวจสอบ Compatibility
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{...new schema...}'
# ตั้ง Compatibility Mode
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
Kafka Security — รักษาความปลอดภัย
ใน Production Kafka ต้องมีระบบ Security ครบ 3 ด้าน: Authentication (ยืนยันตัวตน), Authorization (สิทธิ์การเข้าถึง) และ Encryption (เข้ารหัสข้อมูล)
SASL Authentication
# server.properties — SASL/SCRAM
listeners=SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
# สร้าง User
kafka-configs.sh --bootstrap-server localhost:9093 \
--alter --add-config 'SCRAM-SHA-512=[password=secret123]' \
--entity-type users --entity-name producer-app
# Client config
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="producer-app" password="secret123";
ACL Authorization
# อนุญาตให้ producer-app เขียน Topic orders
kafka-acls.sh --bootstrap-server localhost:9093 \
--add --allow-principal User:producer-app \
--operation Write --topic orders \
--command-config admin.properties
# อนุญาตให้ consumer-app อ่าน Topic orders
kafka-acls.sh --bootstrap-server localhost:9093 \
--add --allow-principal User:consumer-app \
--operation Read --topic orders \
--operation Read --group order-processor \
--command-config admin.properties
# ดู ACL ทั้งหมด
kafka-acls.sh --bootstrap-server localhost:9093 \
--list --command-config admin.properties
SSL/TLS Encryption
# สร้าง SSL Certificate
keytool -keystore kafka.broker1.keystore.jks -alias broker1 \
-keyalg RSA -validity 365 -genkey \
-storepass changeit -keypass changeit \
-dname "CN=broker1.kafka.local"
# server.properties
ssl.keystore.location=/etc/kafka/kafka.broker1.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=/etc/kafka/kafka.truststore.jks
ssl.truststore.password=changeit
ssl.client.auth=required
Exactly-Once Semantics (EOS)
การรับประกันว่าข้อมูลจะถูกประมวลผลเพียงครั้งเดียว (Exactly-Once) เป็นเรื่องยากในระบบ Distributed แต่ Kafka รองรับ EOS ผ่าน 2 กลไกหลัก คือ Idempotent Producer และ Transactions
Idempotent Producer
เปิดใช้งานด้วย enable.idempotence=true (เป็น Default ตั้งแต่ Kafka 3.0) Kafka จะกำหนด Producer ID และ Sequence Number ให้แต่ละ Message ถ้ามี Message ซ้ำ (เช่น จาก Network Retry) Kafka จะ Deduplicate ให้อัตโนมัติ
Transactions
สำหรับ Pattern Read-Process-Write ที่ต้องการ Atomic Operation ข้ามหลาย Topic/Partition Kafka รองรับ Transactions ที่ทำงานคล้ายกับ Database Transaction
// Java — Transactional Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// อ่านจาก source topic
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// ประมวลผล
String result = process(record.value());
// เขียนไป output topic
producer.send(new ProducerRecord<>("output-topic", record.key(), result));
}
// Commit offset + send ในTransaction เดียวกัน
producer.sendOffsetsToTransaction(
offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
// Consumer ที่อ่านเฉพาะ Committed messages
props.put("isolation.level", "read_committed");
Kafka Performance Tuning
การ Tune Kafka ให้ได้ประสิทธิภาพสูงสุดต้องปรับหลายพารามิเตอร์ทั้งฝั่ง Producer, Consumer และ Broker
Producer Tuning
| พารามิเตอร์ | ค่าแนะนำ | คำอธิบาย |
|---|---|---|
batch.size | 32768-65536 | ขนาด Batch ที่ส่งครั้งละเท่าไหร่ (bytes) |
linger.ms | 5-20 | รอรวบรวม Message ก่อนส่ง (ms) |
compression.type | lz4 หรือ zstd | ลดขนาดข้อมูล ประหยัด Network |
buffer.memory | 67108864 | Memory สำหรับ Buffer ก่อนส่ง (64MB) |
acks | all (-1) | รอ Acknowledge จากทุก ISR Replica |
max.in.flight.requests | 5 | จำนวน Request ที่ส่งพร้อมกัน (ต้อง <= 5 สำหรับ EOS) |
Consumer Tuning
# Consumer Performance Config
fetch.min.bytes=1048576 # รอรวบรวมข้อมูลก่อน Fetch (1MB)
fetch.max.wait.ms=500 # รอไม่เกิน 500ms
max.poll.records=1000 # จำนวน Records ต่อครั้ง Poll
max.partition.fetch.bytes=1048576 # ขนาดสูงสุดต่อ Partition ต่อ Fetch
session.timeout.ms=45000 # Timeout สำหรับ Consumer Group
heartbeat.interval.ms=15000 # ส่ง Heartbeat ทุก 15 วินาที
Partition Count Strategy
จำนวน Partition มีผลต่อ Throughput และ Parallelism โดยตรง สูตรคำนวณคร่าวๆ คือ: Partitions = max(Target Throughput / Producer Throughput per Partition, Target Throughput / Consumer Throughput per Partition) โดยทั่วไปเริ่มต้นที่ 6-12 Partition สำหรับ Topic ที่มี Traffic ปานกลาง และ 30-100 สำหรับ High-throughput Topic
Monitoring Kafka
Kafka เปิดเผย Metrics ผ่าน JMX ซึ่งสามารถ Scrape ด้วย Prometheus ผ่าน JMX Exporter แล้วแสดงผลใน Grafana
Metrics สำคัญที่ต้อง Monitor
| Metric | ความหมาย | Threshold |
|---|---|---|
| UnderReplicatedPartitions | Partition ที่ ISR ไม่ครบ | ต้องเป็น 0 เสมอ |
| ActiveControllerCount | จำนวน Controller | ต้องเป็น 1 เสมอ |
| OfflinePartitionsCount | Partition ที่ไม่มี Leader | ต้องเป็น 0 เสมอ |
| Consumer Lag | ข้อมูลที่ Consumer ยังไม่ได้อ่าน | ขึ้นอยู่กับ SLA |
| RequestsPerSec | จำนวน Request ต่อวินาที | Monitor trend |
| BytesInPerSec/BytesOutPerSec | Network throughput | ไม่ควรเกิน 70% ของ NIC |
# docker-compose สำหรับ Kafka monitoring stack
services:
kafka-exporter:
image: danielqsj/kafka-exporter
command:
- --kafka.server=kafka1:9092
- --kafka.server=kafka2:9092
- --kafka.server=kafka3:9092
ports:
- "9308:9308"
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
Cruise Control — Auto Rebalancing
LinkedIn Cruise Control เป็นเครื่องมือสำหรับ Auto-rebalance Partition ใน Kafka Cluster ทำงานโดยวิเคราะห์ Load ของแต่ละ Broker แล้วย้าย Partition เพื่อกระจาย Load ให้สม่ำเสมอ มี REST API สำหรับ Trigger Rebalance และ Monitor สถานะ
Kafka บน Kubernetes — Strimzi
Strimzi เป็น Kubernetes Operator สำหรับ Deploy และจัดการ Kafka Cluster บน Kubernetes ให้ Day-2 Operations (scaling, rolling update, certificate rotation) เป็นเรื่องง่าย
# Strimzi Kafka Cluster CRD
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: production-kafka
namespace: kafka
spec:
kafka:
version: 3.7.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: loadbalancer
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
num.partitions: 6
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 500Gi
class: fast-ssd
deleteClaim: false
resources:
requests:
memory: 4Gi
cpu: "2"
limits:
memory: 8Gi
cpu: "4"
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yml
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 50Gi
class: fast-ssd
entityOperator:
topicOperator: {}
userOperator: {}
KRaft Mode — ลาก่อน ZooKeeper
ตั้งแต่ Kafka 3.3 เป็นต้นมา KRaft (Kafka Raft) mode พร้อมใช้งาน Production ทำให้ไม่ต้องใช้ ZooKeeper อีกต่อไป KRaft ใช้ Raft Consensus Protocol ในการจัดการ Metadata ของ Cluster ทำให้ Architecture เรียบง่ายขึ้น Boot เร็วขึ้น และ Scale ได้มากขึ้น
ข้อดีของ KRaft เมื่อเทียบกับ ZooKeeper
- Architecture เรียบง่าย — ไม่ต้อง Maintain ZooKeeper Cluster แยก ลด Operational Overhead
- Partition ได้มากขึ้น — รองรับหลายล้าน Partition ต่อ Cluster (ZooKeeper จำกัดที่หลักแสน)
- Controller Failover เร็วขึ้น — จากหลายสิบวินาทีเหลือไม่กี่วินาที
- Metadata ถูกจัดการภายใน Kafka — ใช้ __cluster_metadata Topic แบบ Internal
# KRaft Mode Configuration
# กำหนด Node Role
process.roles=broker,controller # Combined mode
# หรือ
process.roles=controller # Dedicated controller
process.roles=broker # Dedicated broker
# Controller quorum
controller.quorum.voters=1@controller1:9093,2@controller2:9093,3@controller3:9093
controller.listener.names=CONTROLLER
# Node ID (ต้องไม่ซ้ำกัน)
node.id=1
# Generate Cluster ID
kafka-storage.sh random-uuid
# Format Storage
kafka-storage.sh format -t <cluster-id> -c server.properties
Kafka vs Pulsar vs Redpanda
ในปี 2026 มีทางเลือกอื่นนอกจาก Kafka ให้พิจารณาด้วย
| Feature | Apache Kafka | Apache Pulsar | Redpanda |
|---|---|---|---|
| ภาษา | Java/Scala | Java | C++ |
| ZooKeeper | ไม่ต้อง (KRaft) | ต้อง (BookKeeper) | ไม่ต้อง (Raft built-in) |
| Latency | ต่ำ (ms) | ต่ำมาก (sub-ms) | ต่ำมาก (sub-ms) |
| Multi-tenancy | จำกัด | ดีมาก (Namespace) | ปานกลาง |
| Geo-replication | MirrorMaker 2 | Built-in | ไม่มี native |
| Tiered Storage | มี (ตั้งแต่ 3.6) | มี (built-in) | มี |
| API Compatibility | - | Kafka API adapter | 100% Kafka API |
| Ecosystem | ใหญ่ที่สุด | กำลังโต | ใช้ Kafka Ecosystem |
| เหมาะกับ | ทุก Use Case | Multi-tenant, Queue | Kafka ที่เร็วกว่า |
Data Pipeline Architecture กับ Kafka
Kafka เป็นแกนกลางของ Data Pipeline Architecture หลายรูปแบบ ที่พบบ่อยในปี 2026 ได้แก่:
Lambda Architecture
แบ่ง Pipeline เป็น 2 Layer: Batch Layer ประมวลผลข้อมูลย้อนหลังทั้งหมดด้วย Spark/Hadoop และ Speed Layer ประมวลผล Real-time ด้วย Kafka Streams ผลลัพธ์จากทั้ง 2 Layer รวมกันใน Serving Layer
Kappa Architecture
ใช้ Stream Processing เป็นหลัก ทุกอย่างผ่าน Kafka หมด ไม่มี Batch Layer แยก ง่ายกว่า Lambda และเป็นที่นิยมมากขึ้นเรื่อยๆ เมื่อ Kafka Streams และ ksqlDB มีความสามารถมากพอ
Event Sourcing + CQRS
ใช้ Kafka เป็น Event Store เก็บทุก Event ที่เกิดขึ้น แยก Read Model (Query) จาก Write Model (Command) Kafka Connect ส่ง Event ไปสร้าง Read-optimized View ใน Database ปลายทาง
# ตัวอย่าง Data Pipeline Architecture
# Source Systems -> Debezium CDC -> Kafka Topics
# |
# +---------+---------+
# | | |
# Kafka Streams ksqlDB Kafka Connect
# (Real-time (SQL (Sink)
# processing) analytics) |
# | | +----+----+
# v v v v v
# Kafka Topic Dashboard ES S3 PostgreSQL
# (enriched) (search)(lake)(serving)
Real-world Use Cases
E-commerce: Order Processing Pipeline
เมื่อลูกค้าสั่งซื้อ Event จะถูกส่งเข้า Kafka แล้วกระจายไปยัง Consumer หลายตัว: Inventory Service ลดสต็อก, Payment Service เรียกเก็บเงิน, Notification Service ส่งอีเมล, Analytics Service บันทึกข้อมูลลง Data Warehouse ทั้งหมดนี้ทำงาน Asynchronously และ Independently
Financial Services: Fraud Detection
ใช้ Kafka Streams ประมวลผล Transaction แบบ Real-time ตรวจจับ Pattern ที่ผิดปกติ (เช่น Transaction ถี่ผิดปกติ, จำนวนเงินสูงผิดปกติ, หลาย Location ในเวลาใกล้กัน) ใช้ Windowing เพื่อดู Pattern ในช่วงเวลาที่กำหนด
IoT: Sensor Data Processing
อุปกรณ์ IoT หลายล้านตัวส่งข้อมูล Sensor เข้า Kafka ผ่าน MQTT Bridge จากนั้น Kafka Streams ทำ Aggregation (เช่น ค่าเฉลี่ยอุณหภูมิต่อ 5 นาที) และ Anomaly Detection แล้วส่งไป Time-series Database (เช่น InfluxDB) และ Alerting System
Log Aggregation
Application หลายร้อยตัวส่ง Log เข้า Kafka แทนที่จะส่งตรงไป Elasticsearch ซึ่งอาจรับไม่ไหว Kafka ทำหน้าที่เป็น Buffer ที่รับ Throughput สูงมากได้ แล้วค่อยๆ ส่งไป Elasticsearch ด้วย Kafka Connect ตาม Rate ที่ Elasticsearch รับได้
Best Practices สำหรับ Production
Topic Naming Convention
# รูปแบบ: <domain>.<system>.<entity>.<version>
# ตัวอย่าง:
shop.orders.created.v1
shop.payments.completed.v1
user.profiles.updated.v2
analytics.pageviews.raw.v1
การจัดการ Dead Letter Queue (DLQ)
# Kafka Streams Error Handling
StreamsConfig config = new StreamsConfig(props);
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// Custom DLQ Handler
public class DlqProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record,
Exception exception) {
// ส่งไป DLQ topic
dlqProducer.send(new ProducerRecord<>("dlq.processing-errors",
record.key(), record.value()));
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
Retention Policy
# Time-based retention
log.retention.hours=168 # 7 วัน (default)
log.retention.ms=604800000 # เท่ากัน (ms)
# Size-based retention
log.retention.bytes=107374182400 # 100GB ต่อ Partition
# Compact + Delete (สำหรับ Changelog topics)
log.cleanup.policy=compact,delete
min.compaction.lag.ms=86400000 # Compact หลัง 24 ชั่วโมง
Docker Compose สำหรับ Kafka Development
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093,HOST://0.0.0.0:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_PROCESS_ROLES: broker,controller
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
volumes:
- kafka-data:/var/lib/kafka/data
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
depends_on:
- kafka
kafka-connect:
image: confluentinc/cp-kafka-connect:7.6.0
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on:
- kafka
- schema-registry
ksqldb:
image: confluentinc/cp-ksqldb-server:7.6.0
ports:
- "8088:8088"
environment:
KSQL_BOOTSTRAP_SERVERS: kafka:29092
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
depends_on:
- kafka
- schema-registry
kafka-ui:
image: provectuslabs/kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
depends_on:
- kafka
volumes:
kafka-data:
สรุป
Apache Kafka เป็นมากกว่า Message Queue ธรรมดา มันเป็นแพลตฟอร์ม Event Streaming ครบวงจรที่มี Ecosystem กว้างขวาง ตั้งแต่ Kafka Connect สำหรับ Integration, Kafka Streams สำหรับ Stream Processing, ksqlDB สำหรับ SQL Analytics และ Schema Registry สำหรับ Data Governance
ในปี 2026 การเปลี่ยนไปใช้ KRaft mode ทำให้ Kafka ง่ายขึ้นในการ Deploy และ Operate ไม่ต้องดูแล ZooKeeper อีกต่อไป รองรับ Partition ได้มากขึ้น และ Controller Failover เร็วขึ้น ถ้าคุณกำลังสร้าง Data Pipeline หรือ Event-Driven Architecture Kafka ยังคงเป็นตัวเลือกอันดับหนึ่งที่ควรพิจารณา
สิ่งสำคัญคือเลือก Tools ให้เหมาะกับ Use Case ไม่จำเป็นต้องใช้ทุกอย่างในคราวเดียว เริ่มจาก Kafka Core + Connect สำหรับ Data Integration แล้วค่อยเพิ่ม Streams หรือ ksqlDB เมื่อต้องการ Real-time Processing
