Home > Blog > tech

Python Automation คืออะไร? สอนเขียน Script อัตโนมัติ Web Scraping, File, API 2026

python automation scripting guide
Python Automation Scripting Guide 2026
2026-04-08 | tech | 3500 words

Python เป็นภาษาโปรแกรมที่เหมาะสมที่สุดสำหรับงาน Automation ด้วยไวยากรณ์ที่อ่านง่าย Library มหาศาล และ Community ที่ใหญ่ที่สุดในโลก ไม่ว่าจะเป็นการจัดการไฟล์ ดึงข้อมูลจากเว็บไซต์ ส่งอีเมลอัตโนมัติ หรือสร้าง Bot แจ้งเตือน Python ทำได้ทุกอย่างด้วย Code เพียงไม่กี่บรรทัด

บทความนี้จะสอนคุณเขียน Python Script อัตโนมัติตั้งแต่พื้นฐานจนถึงโปรเจกต์จริง ครอบคลุม File Operations, Web Scraping, API Integration, Email, PDF, Image Processing และ Scheduling ที่ Backend Developer และ DevOps ทุกคนควรรู้

ทำไมต้อง Python สำหรับ Automation?

Python ได้เปรียบภาษาอื่นในเรื่อง Automation หลายประการ ประการแรกคือ Syntax ที่เรียบง่าย อ่านง่าย เขียนเร็ว ไม่ต้องเสียเวลากับ Boilerplate Code ประการที่สอง Standard Library ที่ครบถ้วน มี Module สำหรับจัดการ File, Network, Email, JSON, CSV มาในตัว ประการที่สาม Third-party Libraries ที่มหาศาล เช่น requests, BeautifulSoup, Selenium, pandas, Pillow ประการที่สี่ Cross-platform ใช้ได้ทั้ง Windows, macOS, Linux โดยไม่ต้องแก้ Code และประการสุดท้าย Community ใหญ่ มีตัวอย่างและ Tutorial มากมาย

# ติดตั้ง Python
# Windows: python.org/downloads
# macOS: brew install python
# Linux: sudo apt install python3 python3-pip

# ตรวจสอบ
python --version
pip --version

# สร้าง Virtual Environment (แนะนำ)
python -m venv automation-env
source automation-env/bin/activate  # Linux/macOS
automation-env\Scripts\activate     # Windows

# ติดตั้ง Libraries ที่ใช้บ่อย
pip install requests beautifulsoup4 selenium playwright
pip install pandas openpyxl python-dotenv
pip install schedule apscheduler pillow

File Operations — จัดการไฟล์อัตโนมัติ

os, shutil — จัดการไฟล์และโฟลเดอร์

import os
import shutil
from datetime import datetime

# สร้างโฟลเดอร์
os.makedirs("output/reports/2026", exist_ok=True)

# รายชื่อไฟล์
files = os.listdir("./data")
print(f"พบ {len(files)} ไฟล์")

# ตรวจสอบไฟล์/โฟลเดอร์
os.path.exists("data/report.csv")
os.path.isfile("data/report.csv")
os.path.isdir("data")

# ขนาดไฟล์
size = os.path.getsize("data/report.csv")
print(f"ขนาด: {size / 1024:.1f} KB")

# คัดลอก / ย้าย / ลบ
shutil.copy("source.txt", "backup/source.txt")
shutil.copy2("source.txt", "backup/")  # คัดลอกพร้อม metadata
shutil.move("old/file.txt", "new/file.txt")
os.remove("temp.txt")  # ลบไฟล์
shutil.rmtree("temp_folder")  # ลบโฟลเดอร์ทั้งหมด

# Zip/Unzip
shutil.make_archive("backup_2026", "zip", "data")
shutil.unpack_archive("backup_2026.zip", "restored_data")

pathlib — วิธีสมัยใหม่ในการจัดการ Path

from pathlib import Path

# สร้าง Path object
p = Path("data/reports/2026")
p.mkdir(parents=True, exist_ok=True)

# อ่าน/เขียนไฟล์
content = Path("config.json").read_text(encoding="utf-8")
Path("output.txt").write_text("Hello World", encoding="utf-8")

# ข้อมูลไฟล์
p = Path("report.csv")
print(p.name)       # report.csv
print(p.stem)       # report
print(p.suffix)     # .csv
print(p.parent)     # .
print(p.absolute()) # /home/user/report.csv

# วนลูปไฟล์
for f in Path("data").iterdir():
    if f.is_file():
        print(f.name, f.stat().st_size)

glob — ค้นหาไฟล์ด้วย Pattern

from pathlib import Path
import glob

# ค้นหาด้วย Pattern
csv_files = list(Path("data").glob("*.csv"))
all_py = list(Path(".").glob("**/*.py"))  # recursive

# ใช้ glob module
json_files = glob.glob("data/**/*.json", recursive=True)

# ตัวอย่าง: จัดระเบียบไฟล์ตามนามสกุล
from pathlib import Path
import shutil

source = Path("Downloads")
for f in source.iterdir():
    if f.is_file():
        ext = f.suffix.lower()
        if ext in [".jpg", ".png", ".gif"]:
            dest = source / "Images"
        elif ext in [".pdf", ".doc", ".docx"]:
            dest = source / "Documents"
        elif ext in [".mp4", ".avi", ".mkv"]:
            dest = source / "Videos"
        else:
            dest = source / "Others"
        dest.mkdir(exist_ok=True)
        shutil.move(str(f), str(dest / f.name))
        print(f"ย้าย {f.name} -> {dest.name}/")

CSV & Excel Automation

csv module — อ่าน/เขียน CSV

import csv

# อ่าน CSV
with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row["name"], row["email"])

# เขียน CSV
data = [
    {"name": "สมชาย", "email": "somchai@test.com", "score": 95},
    {"name": "สมหญิง", "email": "somying@test.com", "score": 88},
]
with open("output.csv", "w", encoding="utf-8-sig", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=["name", "email", "score"])
    writer.writeheader()
    writer.writerows(data)

openpyxl — จัดการ Excel

from openpyxl import Workbook, load_workbook
from openpyxl.styles import Font, PatternFill, Alignment

# สร้าง Excel ใหม่
wb = Workbook()
ws = wb.active
ws.title = "รายงานยอดขาย"

# Header
headers = ["ลำดับ", "สินค้า", "จำนวน", "ราคา", "รวม"]
for col, header in enumerate(headers, 1):
    cell = ws.cell(row=1, column=col, value=header)
    cell.font = Font(bold=True, color="FFFFFF")
    cell.fill = PatternFill(start_color="4472C4", fill_type="solid")
    cell.alignment = Alignment(horizontal="center")

# ข้อมูล
products = [
    ("iPhone 16", 50, 35000),
    ("MacBook Pro", 20, 75000),
    ("AirPods Pro", 100, 8900),
    ("iPad Air", 30, 22000),
]
for i, (name, qty, price) in enumerate(products, 2):
    ws.cell(row=i, column=1, value=i-1)
    ws.cell(row=i, column=2, value=name)
    ws.cell(row=i, column=3, value=qty)
    ws.cell(row=i, column=4, value=price)
    ws.cell(row=i, column=5, value=qty * price)

# ปรับความกว้างคอลัมน์
ws.column_dimensions["B"].width = 20
ws.column_dimensions["E"].width = 15

wb.save("report.xlsx")
print("สร้าง report.xlsx สำเร็จ")

pandas — วิเคราะห์ข้อมูลขั้นสูง

import pandas as pd

# อ่านข้อมูล
df = pd.read_csv("sales.csv")
df_excel = pd.read_excel("report.xlsx", sheet_name="Sheet1")

# ดูข้อมูลเบื้องต้น
print(df.head())
print(df.describe())
print(df.info())

# กรองข้อมูล
high_sales = df[df["amount"] > 10000]
bangkok = df[df["city"] == "กรุงเทพ"]

# จัดกลุ่มและสรุป
summary = df.groupby("category").agg({
    "amount": ["sum", "mean", "count"],
    "quantity": "sum"
}).round(2)

# Pivot Table
pivot = pd.pivot_table(df, values="amount",
                       index="month", columns="category",
                       aggfunc="sum", fill_value=0)

# Export
summary.to_csv("summary.csv", encoding="utf-8-sig")
summary.to_excel("summary.xlsx", sheet_name="สรุป")
pivot.to_html("pivot_table.html")

Web Scraping — ดึงข้อมูลจากเว็บไซต์

requests + BeautifulSoup — สำหรับ Static Pages

import requests
from bs4 import BeautifulSoup
import time

# ดึง HTML
url = "https://example.com/products"
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()

# Parse HTML
soup = BeautifulSoup(response.text, "html.parser")

# ค้นหา Elements
title = soup.find("h1").text.strip()
links = soup.find_all("a", class_="product-link")

# ดึงข้อมูลสินค้า
products = []
for item in soup.select(".product-card"):
    name = item.select_one(".product-name").text.strip()
    price = item.select_one(".product-price").text.strip()
    link = item.select_one("a")["href"]
    products.append({"name": name, "price": price, "link": link})

print(f"พบ {len(products)} สินค้า")

# Scrape หลายหน้า
all_data = []
for page in range(1, 11):
    url = f"https://example.com/products?page={page}"
    resp = requests.get(url, headers=headers, timeout=30)
    soup = BeautifulSoup(resp.text, "html.parser")
    items = soup.select(".product-card")
    all_data.extend(items)
    print(f"หน้า {page}: {len(items)} รายการ")
    time.sleep(2)  # หน่วงเวลาเพื่อไม่ให้โดน Block
จรรยาบรรณ Web Scraping: ตรวจสอบ robots.txt ก่อน Scrape เสมอ หน่วงเวลาระหว่าง Request อย่าส่ง Request ถี่เกินไป ใช้ข้อมูลที่ได้อย่างรับผิดชอบ และเคารพ Terms of Service ของเว็บไซต์

Selenium — สำหรับ Dynamic Pages (JavaScript Rendering)

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# ตั้งค่า Chrome Driver
options = webdriver.ChromeOptions()
options.add_argument("--headless")  # ไม่เปิดหน้าต่าง Browser
options.add_argument("--no-sandbox")
driver = webdriver.Chrome(options=options)

try:
    # เปิดเว็บ
    driver.get("https://example.com/search")

    # รอให้ Element โหลด
    search_box = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "search-input"))
    )

    # พิมพ์และค้นหา
    search_box.send_keys("Python automation")
    search_box.send_keys(Keys.RETURN)

    # รอผลลัพธ์
    WebDriverWait(driver, 10).until(
        EC.presence_of_all_elements_located((By.CLASS_NAME, "result-item"))
    )

    # ดึงข้อมูล
    results = driver.find_elements(By.CLASS_NAME, "result-item")
    for r in results:
        title = r.find_element(By.TAG_NAME, "h3").text
        link = r.find_element(By.TAG_NAME, "a").get_attribute("href")
        print(f"{title}: {link}")

    # Screenshot
    driver.save_screenshot("search_results.png")

finally:
    driver.quit()

Playwright — ทางเลือกที่ทันสมัยกว่า Selenium

from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    browser = p.chromium.launch(headless=True)
    page = browser.new_page()

    # เปิดเว็บ
    page.goto("https://example.com/dashboard")

    # Login
    page.fill("#username", "myuser")
    page.fill("#password", "mypass")
    page.click("#login-button")

    # รอ Navigation
    page.wait_for_url("**/dashboard")

    # ดึงข้อมูลจากตาราง
    rows = page.query_selector_all("table.data-table tr")
    for row in rows[1:]:  # ข้าม Header
        cells = row.query_selector_all("td")
        data = [cell.inner_text() for cell in cells]
        print(data)

    # Screenshot
    page.screenshot(path="dashboard.png", full_page=True)

    # ดาวน์โหลดไฟล์
    with page.expect_download() as download_info:
        page.click("#export-csv")
    download = download_info.value
    download.save_as("exported_data.csv")

    browser.close()

API Automation — เชื่อมต่อ API อัตโนมัติ

import requests
import json

# GET Request
response = requests.get("https://api.example.com/users",
    headers={"Authorization": "Bearer YOUR_TOKEN"},
    params={"page": 1, "limit": 50},
    timeout=30
)
users = response.json()

# POST Request
new_user = {
    "name": "สมชาย",
    "email": "somchai@example.com",
    "role": "developer"
}
response = requests.post("https://api.example.com/users",
    json=new_user,
    headers={"Authorization": "Bearer YOUR_TOKEN"},
    timeout=30
)
print(f"Status: {response.status_code}")
print(f"Created: {response.json()}")

# PUT/PATCH Request
response = requests.patch(f"https://api.example.com/users/123",
    json={"role": "admin"},
    headers={"Authorization": "Bearer YOUR_TOKEN"}
)

# DELETE Request
response = requests.delete(f"https://api.example.com/users/456",
    headers={"Authorization": "Bearer YOUR_TOKEN"}
)

# Session — ใช้ซ้ำ Connection
session = requests.Session()
session.headers.update({"Authorization": "Bearer YOUR_TOKEN"})
session.headers.update({"Content-Type": "application/json"})

# ใช้ Session สำหรับหลาย Request
for page in range(1, 20):
    resp = session.get(f"https://api.example.com/data?page={page}")
    data = resp.json()
    print(f"Page {page}: {len(data['items'])} items")

httpx — Async HTTP Client (เร็วกว่า requests)

import httpx
import asyncio

# Synchronous
with httpx.Client(timeout=30) as client:
    resp = client.get("https://api.example.com/data")
    print(resp.json())

# Asynchronous — ดึงข้อมูลพร้อมกันหลาย URL
async def fetch_all():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/products",
        "https://api.example.com/orders",
    ]
    async with httpx.AsyncClient(timeout=30) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        for resp in responses:
            print(f"{resp.url}: {resp.status_code}")

asyncio.run(fetch_all())

Email Automation — ส่งอีเมลอัตโนมัติ

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders

def send_email(to_email, subject, body, attachment_path=None):
    # ตั้งค่า SMTP
    smtp_server = "smtp.gmail.com"
    smtp_port = 587
    sender_email = "your@gmail.com"
    sender_password = "your_app_password"  # ใช้ App Password

    # สร้างข้อความ
    msg = MIMEMultipart()
    msg["From"] = sender_email
    msg["To"] = to_email
    msg["Subject"] = subject

    # Body (HTML)
    html_body = f"""
    <html>
    <body>
    <h2>{subject}</h2>
    <p>{body}</p>
    <p>ส่งโดยระบบอัตโนมัติ</p>
    </body>
    </html>
    """
    msg.attach(MIMEText(html_body, "html", "utf-8"))

    # แนบไฟล์ (ถ้ามี)
    if attachment_path:
        with open(attachment_path, "rb") as f:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(f.read())
            encoders.encode_base64(part)
            part.add_header("Content-Disposition",
                          f"attachment; filename={Path(attachment_path).name}")
            msg.attach(part)

    # ส่ง
    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.starttls()
        server.login(sender_email, sender_password)
        server.send_message(msg)
        print(f"ส่งอีเมลถึง {to_email} สำเร็จ")

# ใช้งาน
send_email("boss@company.com", "รายงานประจำวัน",
           "รายงานยอดขายวันนี้ 500,000 บาท", "report.xlsx")

PDF Manipulation — จัดการ PDF

from PyPDF2 import PdfReader, PdfWriter, PdfMerger

# อ่าน PDF
reader = PdfReader("document.pdf")
print(f"จำนวนหน้า: {len(reader.pages)}")

# ดึงข้อความ
for page in reader.pages:
    text = page.extract_text()
    print(text)

# แยกหน้า
writer = PdfWriter()
writer.add_page(reader.pages[0])  # หน้าแรก
with open("first_page.pdf", "wb") as f:
    writer.write(f)

# รวม PDF
merger = PdfMerger()
merger.append("part1.pdf")
merger.append("part2.pdf")
merger.append("part3.pdf")
merger.write("combined.pdf")
merger.close()

# สร้าง PDF ใหม่ด้วย reportlab
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont

# ลงทะเบียน Font ภาษาไทย
pdfmetrics.registerFont(TTFont("THSarabun", "THSarabunNew.ttf"))

c = canvas.Canvas("report.pdf", pagesize=A4)
c.setFont("THSarabun", 16)
c.drawString(72, 750, "รายงานประจำเดือน เมษายน 2026")
c.setFont("THSarabun", 12)
c.drawString(72, 700, "ยอดขายรวม: 1,500,000 บาท")
c.drawString(72, 680, "จำนวนลูกค้า: 350 ราย")
c.save()

Image Processing — ประมวลผลรูปภาพ

from PIL import Image, ImageDraw, ImageFont, ImageFilter
from pathlib import Path

# เปิดและดูข้อมูลรูป
img = Image.open("photo.jpg")
print(f"ขนาด: {img.size}")
print(f"Format: {img.format}")
print(f"Mode: {img.mode}")

# Resize
img_resized = img.resize((800, 600))
img_resized.save("photo_800x600.jpg")

# Thumbnail (รักษา Aspect Ratio)
img_thumb = img.copy()
img_thumb.thumbnail((200, 200))
img_thumb.save("thumbnail.jpg")

# Crop
box = (100, 100, 500, 400)  # left, upper, right, lower
img_cropped = img.crop(box)
img_cropped.save("cropped.jpg")

# แปลง Format
img.save("photo.png")
img.save("photo.webp", quality=85)

# ใส่ Watermark
watermark = Image.open("watermark.png").convert("RGBA")
img.paste(watermark, (img.width - watermark.width - 10,
                      img.height - watermark.height - 10),
          watermark)

# Batch Processing — ย่อรูปทั้งโฟลเดอร์
input_dir = Path("photos")
output_dir = Path("photos_resized")
output_dir.mkdir(exist_ok=True)

for img_path in input_dir.glob("*.jpg"):
    img = Image.open(img_path)
    img.thumbnail((1200, 1200))
    img.save(output_dir / img_path.name, "JPEG", quality=85, optimize=True)
    print(f"ย่อ {img_path.name}: {img.size}")

Scheduling — ตั้งเวลารันอัตโนมัติ

schedule — ง่ายที่สุด

import schedule
import time

def daily_report():
    print(f"กำลังสร้างรายงาน... {datetime.now()}")
    # ... สร้างรายงาน ...

def check_api():
    print(f"ตรวจสอบ API... {datetime.now()}")
    # ... เช็คสถานะ API ...

# ตั้งเวลา
schedule.every().day.at("08:00").do(daily_report)
schedule.every(30).minutes.do(check_api)
schedule.every().monday.at("09:00").do(daily_report)
schedule.every().hour.do(check_api)

# รัน Loop
while True:
    schedule.run_pending()
    time.sleep(60)

APScheduler — ฟีเจอร์ครบ

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

scheduler = BlockingScheduler()

@scheduler.scheduled_job("interval", minutes=30)
def check_system():
    print("ตรวจสอบระบบ...")

@scheduler.scheduled_job(CronTrigger(hour=8, minute=0))
def morning_report():
    print("สร้างรายงานเช้า...")

@scheduler.scheduled_job(CronTrigger(day_of_week="mon-fri", hour=18))
def daily_backup():
    print("สำรองข้อมูลประจำวัน...")

scheduler.start()

cron (Linux) — ตั้งเวลาผ่าน OS

# แก้ไข crontab
crontab -e

# รันทุกวัน 8 โมงเช้า
0 8 * * * /usr/bin/python3 /home/user/scripts/daily_report.py >> /var/log/report.log 2>&1

# รันทุก 30 นาที
*/30 * * * * /usr/bin/python3 /home/user/scripts/check_api.py

# รันวันจันทร์-ศุกร์ เวลา 18:00
0 18 * * 1-5 /usr/bin/python3 /home/user/scripts/backup.py

# Windows Task Scheduler
# schtasks /create /tn "DailyReport" /tr "python C:\scripts\report.py" /sc daily /st 08:00

Database Automation

import sqlite3
import pymysql

# SQLite
conn = sqlite3.connect("app.db")
cursor = conn.cursor()

# สร้างตาราง
cursor.execute("""
    CREATE TABLE IF NOT EXISTS logs (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
        level TEXT,
        message TEXT
    )
""")

# Insert ข้อมูล
cursor.execute("INSERT INTO logs (level, message) VALUES (?, ?)",
               ("INFO", "System started"))
conn.commit()

# Query
cursor.execute("SELECT * FROM logs WHERE level = ?", ("ERROR",))
errors = cursor.fetchall()
print(f"พบ Error {len(errors)} รายการ")

conn.close()

# MySQL/MariaDB
conn = pymysql.connect(
    host="localhost", user="root", password="secret",
    database="myapp", charset="utf8mb4"
)
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
    cursor.execute("SELECT * FROM users WHERE active = %s", (1,))
    users = cursor.fetchall()
    for user in users:
        print(user["name"], user["email"])
conn.close()

Notification Bots — บอทแจ้งเตือน

LINE Notify

import requests

def send_line_notify(token, message, image_path=None):
    url = "https://notify-api.line.me/api/notify"
    headers = {"Authorization": f"Bearer {token}"}
    data = {"message": message}
    files = None
    if image_path:
        files = {"imageFile": open(image_path, "rb")}
    resp = requests.post(url, headers=headers, data=data, files=files)
    return resp.status_code == 200

# ใช้งาน
LINE_TOKEN = "YOUR_LINE_NOTIFY_TOKEN"
send_line_notify(LINE_TOKEN, "\nรายงานประจำวัน\nยอดขาย: 500,000 บาท\nลูกค้าใหม่: 15 ราย")

Telegram Bot

import requests

def send_telegram(bot_token, chat_id, message):
    url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
    data = {
        "chat_id": chat_id,
        "text": message,
        "parse_mode": "HTML"
    }
    resp = requests.post(url, json=data)
    return resp.json()

# ใช้งาน
BOT_TOKEN = "YOUR_BOT_TOKEN"
CHAT_ID = "YOUR_CHAT_ID"
send_telegram(BOT_TOKEN, CHAT_ID,
    "<b>แจ้งเตือน</b>\n"
    "Server CPU Usage: 95%\n"
    "กรุณาตรวจสอบ!"
)

Slack Webhook

import requests
import json

def send_slack(webhook_url, message, color="#36a64f"):
    payload = {
        "attachments": [{
            "color": color,
            "text": message,
            "footer": "Python Automation Bot",
        }]
    }
    resp = requests.post(webhook_url, json=payload)
    return resp.status_code == 200

SLACK_WEBHOOK = "https://hooks.slack.com/services/XXX/YYY/ZZZ"
send_slack(SLACK_WEBHOOK, "Deploy สำเร็จ! Version 2.5.0 พร้อมใช้งาน")

CLI Tools — สร้างเครื่องมือ Command Line

argparse — Built-in

import argparse

parser = argparse.ArgumentParser(description="File Organizer Tool")
parser.add_argument("source", help="Source directory")
parser.add_argument("-o", "--output", default="./organized", help="Output directory")
parser.add_argument("-d", "--dry-run", action="store_true", help="Show what would happen")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")

args = parser.parse_args()
print(f"Source: {args.source}")
print(f"Output: {args.output}")

# python organizer.py ./Downloads -o ./Organized --dry-run -v

typer — Modern CLI Framework

import typer

app = typer.Typer()

@app.command()
def organize(
    source: str = typer.Argument(..., help="Source directory"),
    output: str = typer.Option("./organized", help="Output directory"),
    dry_run: bool = typer.Option(False, "--dry-run", help="Preview only"),
):
    """จัดระเบียบไฟล์ตามนามสกุล"""
    typer.echo(f"จัดระเบียบ: {source} -> {output}")
    if dry_run:
        typer.echo("(Dry run mode)")

@app.command()
def clean(
    path: str = typer.Argument(..., help="Directory to clean"),
    days: int = typer.Option(30, help="Delete files older than N days"),
):
    """ลบไฟล์เก่า"""
    typer.echo(f"ลบไฟล์เก่ากว่า {days} วัน ใน {path}")

if __name__ == "__main__":
    app()

# python tool.py organize ./Downloads --dry-run
# python tool.py clean ./temp --days 7

Packaging Scripts — แพ็ค Script เป็น EXE

# ติดตั้ง PyInstaller
pip install pyinstaller

# สร้าง EXE (ไฟล์เดียว)
pyinstaller --onefile --name "FileOrganizer" organizer.py

# สร้าง EXE พร้อม Icon
pyinstaller --onefile --windowed --icon=app.ico --name "MyTool" main.py

# ผลลัพธ์จะอยู่ใน dist/ folder
# dist/FileOrganizer.exe

Error Handling & Logging

import logging
from pathlib import Path

# ตั้งค่า Logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("automation.log", encoding="utf-8"),
        logging.StreamHandler()  # แสดงใน Console ด้วย
    ]
)
logger = logging.getLogger(__name__)

# ใช้ในโปรแกรม
def process_file(filepath):
    try:
        logger.info(f"กำลังประมวลผล: {filepath}")
        # ... ทำงาน ...
        logger.info(f"สำเร็จ: {filepath}")
    except FileNotFoundError:
        logger.error(f"ไม่พบไฟล์: {filepath}")
    except PermissionError:
        logger.error(f"ไม่มีสิทธิ์เข้าถึง: {filepath}")
    except Exception as e:
        logger.exception(f"เกิดข้อผิดพลาด: {e}")

# Retry Decorator
import time
from functools import wraps

def retry(max_attempts=3, delay=5):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    logger.warning(f"Attempt {attempt}/{max_attempts} failed: {e}")
                    if attempt < max_attempts:
                        time.sleep(delay)
                    else:
                        raise
        return wrapper
    return decorator

@retry(max_attempts=3, delay=10)
def fetch_data(url):
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return response.json()

Practical Project: Automated Report Generator

#!/usr/bin/env python3
"""รายงานยอดขายอัตโนมัติ — ส่งทุกเช้า 8 โมง"""
import pandas as pd
import requests
from datetime import datetime, timedelta
from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

def fetch_sales_data():
    """ดึงข้อมูลยอดขายจาก API"""
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    resp = requests.get(f"https://api.myshop.com/sales?date={yesterday}",
                       headers={"Authorization": "Bearer TOKEN"})
    return pd.DataFrame(resp.json()["data"])

def generate_report(df):
    """สร้างรายงาน Excel"""
    report_path = Path(f"reports/sales_{datetime.now().strftime('%Y%m%d')}.xlsx")
    report_path.parent.mkdir(exist_ok=True)

    with pd.ExcelWriter(str(report_path), engine="openpyxl") as writer:
        # สรุปรวม
        summary = df.groupby("category").agg({
            "amount": ["sum", "count", "mean"]
        }).round(2)
        summary.to_excel(writer, sheet_name="สรุป")

        # รายละเอียด
        df.to_excel(writer, sheet_name="รายละเอียด", index=False)

    logger.info(f"สร้างรายงาน: {report_path}")
    return str(report_path)

def send_notification(report_path, total_sales, total_orders):
    """แจ้งเตือนผ่าน LINE"""
    message = (
        f"\nรายงานยอดขาย {datetime.now().strftime('%d/%m/%Y')}"
        f"\nยอดขายรวม: {total_sales:,.0f} บาท"
        f"\nจำนวน Orders: {total_orders:,} รายการ"
    )
    requests.post("https://notify-api.line.me/api/notify",
        headers={"Authorization": "Bearer LINE_TOKEN"},
        data={"message": message},
        files={"imageFile": open(report_path, "rb")} if report_path.endswith(".png") else None
    )

def main():
    logger.info("เริ่มสร้างรายงาน...")
    df = fetch_sales_data()
    report_path = generate_report(df)
    total_sales = df["amount"].sum()
    total_orders = len(df)
    send_notification(report_path, total_sales, total_orders)
    logger.info("เสร็จสิ้น!")

if __name__ == "__main__":
    main()

Practical Project: Monitoring Bot

#!/usr/bin/env python3
"""ตรวจสอบสถานะเว็บไซต์ทุก 5 นาที"""
import requests
import time
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

SITES = [
    {"name": "Main Website", "url": "https://example.com", "expected_status": 200},
    {"name": "API Server", "url": "https://api.example.com/health", "expected_status": 200},
    {"name": "Admin Panel", "url": "https://admin.example.com", "expected_status": 200},
]

def check_site(site):
    try:
        start = time.time()
        resp = requests.get(site["url"], timeout=15)
        response_time = (time.time() - start) * 1000  # ms

        if resp.status_code != site["expected_status"]:
            return {"status": "ERROR", "message": f"Status {resp.status_code}", "time": response_time}
        if response_time > 5000:
            return {"status": "SLOW", "message": f"{response_time:.0f}ms", "time": response_time}
        return {"status": "OK", "message": f"{response_time:.0f}ms", "time": response_time}
    except requests.exceptions.Timeout:
        return {"status": "TIMEOUT", "message": "Timeout 15s", "time": 0}
    except requests.exceptions.ConnectionError:
        return {"status": "DOWN", "message": "Connection refused", "time": 0}

def alert(site_name, result):
    """ส่งแจ้งเตือนเมื่อมีปัญหา"""
    message = f"[{result['status']}] {site_name}: {result['message']}"
    logger.warning(message)
    # ส่ง LINE/Telegram/Slack ที่นี่

def monitor():
    while True:
        for site in SITES:
            result = check_site(site)
            if result["status"] in ("ERROR", "TIMEOUT", "DOWN"):
                alert(site["name"], result)
            else:
                logger.info(f"[OK] {site['name']}: {result['message']}")
        time.sleep(300)  # 5 นาที

if __name__ == "__main__":
    monitor()

Best Practices — แนวทางปฏิบัติที่ดี

สรุป

Python Automation เป็นทักษะที่มีค่ามหาศาลสำหรับนักพัฒนาทุกคน ช่วยประหยัดเวลาจากงานซ้ำซาก ลดความผิดพลาดจากการทำด้วยมือ และเพิ่มประสิทธิภาพการทำงานหลายเท่า ไม่ว่าจะเป็นการจัดการไฟล์ ดึงข้อมูลจากเว็บ ส่งรายงานอัตโนมัติ หรือสร้างบอทแจ้งเตือน Python ทำได้ทุกอย่าง

เริ่มต้นจากปัญหาที่คุณเจอทุกวัน ถ้ามีงานที่ทำซ้ำมากกว่า 3 ครั้ง นั่นคือโอกาสที่จะเขียน Script อัตโนมัติ เริ่มจากงานง่ายๆ เช่น จัดระเบียบไฟล์ หรือดึงข้อมูลจาก API แล้วค่อยๆ เพิ่มความซับซ้อน คุณจะพบว่า Python Automation ไม่เพียงช่วยประหยัดเวลา แต่ยังทำให้งานสนุกขึ้นอีกด้วย


Back to Blog | iCafe Forex | SiamLanCard | Siam2R