Python เป็นภาษาโปรแกรมที่เหมาะสมที่สุดสำหรับงาน Automation ด้วยไวยากรณ์ที่อ่านง่าย Library มหาศาล และ Community ที่ใหญ่ที่สุดในโลก ไม่ว่าจะเป็นการจัดการไฟล์ ดึงข้อมูลจากเว็บไซต์ ส่งอีเมลอัตโนมัติ หรือสร้าง Bot แจ้งเตือน Python ทำได้ทุกอย่างด้วย Code เพียงไม่กี่บรรทัด
บทความนี้จะสอนคุณเขียน Python Script อัตโนมัติตั้งแต่พื้นฐานจนถึงโปรเจกต์จริง ครอบคลุม File Operations, Web Scraping, API Integration, Email, PDF, Image Processing และ Scheduling ที่ Backend Developer และ DevOps ทุกคนควรรู้
ทำไมต้อง Python สำหรับ Automation?
Python ได้เปรียบภาษาอื่นในเรื่อง Automation หลายประการ ประการแรกคือ Syntax ที่เรียบง่าย อ่านง่าย เขียนเร็ว ไม่ต้องเสียเวลากับ Boilerplate Code ประการที่สอง Standard Library ที่ครบถ้วน มี Module สำหรับจัดการ File, Network, Email, JSON, CSV มาในตัว ประการที่สาม Third-party Libraries ที่มหาศาล เช่น requests, BeautifulSoup, Selenium, pandas, Pillow ประการที่สี่ Cross-platform ใช้ได้ทั้ง Windows, macOS, Linux โดยไม่ต้องแก้ Code และประการสุดท้าย Community ใหญ่ มีตัวอย่างและ Tutorial มากมาย
# ติดตั้ง Python
# Windows: python.org/downloads
# macOS: brew install python
# Linux: sudo apt install python3 python3-pip
# ตรวจสอบ
python --version
pip --version
# สร้าง Virtual Environment (แนะนำ)
python -m venv automation-env
source automation-env/bin/activate # Linux/macOS
automation-env\Scripts\activate # Windows
# ติดตั้ง Libraries ที่ใช้บ่อย
pip install requests beautifulsoup4 selenium playwright
pip install pandas openpyxl python-dotenv
pip install schedule apscheduler pillow
File Operations — จัดการไฟล์อัตโนมัติ
os, shutil — จัดการไฟล์และโฟลเดอร์
import os
import shutil
from datetime import datetime
# สร้างโฟลเดอร์
os.makedirs("output/reports/2026", exist_ok=True)
# รายชื่อไฟล์
files = os.listdir("./data")
print(f"พบ {len(files)} ไฟล์")
# ตรวจสอบไฟล์/โฟลเดอร์
os.path.exists("data/report.csv")
os.path.isfile("data/report.csv")
os.path.isdir("data")
# ขนาดไฟล์
size = os.path.getsize("data/report.csv")
print(f"ขนาด: {size / 1024:.1f} KB")
# คัดลอก / ย้าย / ลบ
shutil.copy("source.txt", "backup/source.txt")
shutil.copy2("source.txt", "backup/") # คัดลอกพร้อม metadata
shutil.move("old/file.txt", "new/file.txt")
os.remove("temp.txt") # ลบไฟล์
shutil.rmtree("temp_folder") # ลบโฟลเดอร์ทั้งหมด
# Zip/Unzip
shutil.make_archive("backup_2026", "zip", "data")
shutil.unpack_archive("backup_2026.zip", "restored_data")
pathlib — วิธีสมัยใหม่ในการจัดการ Path
from pathlib import Path
# สร้าง Path object
p = Path("data/reports/2026")
p.mkdir(parents=True, exist_ok=True)
# อ่าน/เขียนไฟล์
content = Path("config.json").read_text(encoding="utf-8")
Path("output.txt").write_text("Hello World", encoding="utf-8")
# ข้อมูลไฟล์
p = Path("report.csv")
print(p.name) # report.csv
print(p.stem) # report
print(p.suffix) # .csv
print(p.parent) # .
print(p.absolute()) # /home/user/report.csv
# วนลูปไฟล์
for f in Path("data").iterdir():
if f.is_file():
print(f.name, f.stat().st_size)
glob — ค้นหาไฟล์ด้วย Pattern
from pathlib import Path
import glob
# ค้นหาด้วย Pattern
csv_files = list(Path("data").glob("*.csv"))
all_py = list(Path(".").glob("**/*.py")) # recursive
# ใช้ glob module
json_files = glob.glob("data/**/*.json", recursive=True)
# ตัวอย่าง: จัดระเบียบไฟล์ตามนามสกุล
from pathlib import Path
import shutil
source = Path("Downloads")
for f in source.iterdir():
if f.is_file():
ext = f.suffix.lower()
if ext in [".jpg", ".png", ".gif"]:
dest = source / "Images"
elif ext in [".pdf", ".doc", ".docx"]:
dest = source / "Documents"
elif ext in [".mp4", ".avi", ".mkv"]:
dest = source / "Videos"
else:
dest = source / "Others"
dest.mkdir(exist_ok=True)
shutil.move(str(f), str(dest / f.name))
print(f"ย้าย {f.name} -> {dest.name}/")
CSV & Excel Automation
csv module — อ่าน/เขียน CSV
import csv
# อ่าน CSV
with open("data.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
print(row["name"], row["email"])
# เขียน CSV
data = [
{"name": "สมชาย", "email": "somchai@test.com", "score": 95},
{"name": "สมหญิง", "email": "somying@test.com", "score": 88},
]
with open("output.csv", "w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["name", "email", "score"])
writer.writeheader()
writer.writerows(data)
openpyxl — จัดการ Excel
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Font, PatternFill, Alignment
# สร้าง Excel ใหม่
wb = Workbook()
ws = wb.active
ws.title = "รายงานยอดขาย"
# Header
headers = ["ลำดับ", "สินค้า", "จำนวน", "ราคา", "รวม"]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = Font(bold=True, color="FFFFFF")
cell.fill = PatternFill(start_color="4472C4", fill_type="solid")
cell.alignment = Alignment(horizontal="center")
# ข้อมูล
products = [
("iPhone 16", 50, 35000),
("MacBook Pro", 20, 75000),
("AirPods Pro", 100, 8900),
("iPad Air", 30, 22000),
]
for i, (name, qty, price) in enumerate(products, 2):
ws.cell(row=i, column=1, value=i-1)
ws.cell(row=i, column=2, value=name)
ws.cell(row=i, column=3, value=qty)
ws.cell(row=i, column=4, value=price)
ws.cell(row=i, column=5, value=qty * price)
# ปรับความกว้างคอลัมน์
ws.column_dimensions["B"].width = 20
ws.column_dimensions["E"].width = 15
wb.save("report.xlsx")
print("สร้าง report.xlsx สำเร็จ")
pandas — วิเคราะห์ข้อมูลขั้นสูง
import pandas as pd
# อ่านข้อมูล
df = pd.read_csv("sales.csv")
df_excel = pd.read_excel("report.xlsx", sheet_name="Sheet1")
# ดูข้อมูลเบื้องต้น
print(df.head())
print(df.describe())
print(df.info())
# กรองข้อมูล
high_sales = df[df["amount"] > 10000]
bangkok = df[df["city"] == "กรุงเทพ"]
# จัดกลุ่มและสรุป
summary = df.groupby("category").agg({
"amount": ["sum", "mean", "count"],
"quantity": "sum"
}).round(2)
# Pivot Table
pivot = pd.pivot_table(df, values="amount",
index="month", columns="category",
aggfunc="sum", fill_value=0)
# Export
summary.to_csv("summary.csv", encoding="utf-8-sig")
summary.to_excel("summary.xlsx", sheet_name="สรุป")
pivot.to_html("pivot_table.html")
Web Scraping — ดึงข้อมูลจากเว็บไซต์
requests + BeautifulSoup — สำหรับ Static Pages
import requests
from bs4 import BeautifulSoup
import time
# ดึง HTML
url = "https://example.com/products"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.text, "html.parser")
# ค้นหา Elements
title = soup.find("h1").text.strip()
links = soup.find_all("a", class_="product-link")
# ดึงข้อมูลสินค้า
products = []
for item in soup.select(".product-card"):
name = item.select_one(".product-name").text.strip()
price = item.select_one(".product-price").text.strip()
link = item.select_one("a")["href"]
products.append({"name": name, "price": price, "link": link})
print(f"พบ {len(products)} สินค้า")
# Scrape หลายหน้า
all_data = []
for page in range(1, 11):
url = f"https://example.com/products?page={page}"
resp = requests.get(url, headers=headers, timeout=30)
soup = BeautifulSoup(resp.text, "html.parser")
items = soup.select(".product-card")
all_data.extend(items)
print(f"หน้า {page}: {len(items)} รายการ")
time.sleep(2) # หน่วงเวลาเพื่อไม่ให้โดน Block
Selenium — สำหรับ Dynamic Pages (JavaScript Rendering)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# ตั้งค่า Chrome Driver
options = webdriver.ChromeOptions()
options.add_argument("--headless") # ไม่เปิดหน้าต่าง Browser
options.add_argument("--no-sandbox")
driver = webdriver.Chrome(options=options)
try:
# เปิดเว็บ
driver.get("https://example.com/search")
# รอให้ Element โหลด
search_box = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "search-input"))
)
# พิมพ์และค้นหา
search_box.send_keys("Python automation")
search_box.send_keys(Keys.RETURN)
# รอผลลัพธ์
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CLASS_NAME, "result-item"))
)
# ดึงข้อมูล
results = driver.find_elements(By.CLASS_NAME, "result-item")
for r in results:
title = r.find_element(By.TAG_NAME, "h3").text
link = r.find_element(By.TAG_NAME, "a").get_attribute("href")
print(f"{title}: {link}")
# Screenshot
driver.save_screenshot("search_results.png")
finally:
driver.quit()
Playwright — ทางเลือกที่ทันสมัยกว่า Selenium
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
# เปิดเว็บ
page.goto("https://example.com/dashboard")
# Login
page.fill("#username", "myuser")
page.fill("#password", "mypass")
page.click("#login-button")
# รอ Navigation
page.wait_for_url("**/dashboard")
# ดึงข้อมูลจากตาราง
rows = page.query_selector_all("table.data-table tr")
for row in rows[1:]: # ข้าม Header
cells = row.query_selector_all("td")
data = [cell.inner_text() for cell in cells]
print(data)
# Screenshot
page.screenshot(path="dashboard.png", full_page=True)
# ดาวน์โหลดไฟล์
with page.expect_download() as download_info:
page.click("#export-csv")
download = download_info.value
download.save_as("exported_data.csv")
browser.close()
API Automation — เชื่อมต่อ API อัตโนมัติ
import requests
import json
# GET Request
response = requests.get("https://api.example.com/users",
headers={"Authorization": "Bearer YOUR_TOKEN"},
params={"page": 1, "limit": 50},
timeout=30
)
users = response.json()
# POST Request
new_user = {
"name": "สมชาย",
"email": "somchai@example.com",
"role": "developer"
}
response = requests.post("https://api.example.com/users",
json=new_user,
headers={"Authorization": "Bearer YOUR_TOKEN"},
timeout=30
)
print(f"Status: {response.status_code}")
print(f"Created: {response.json()}")
# PUT/PATCH Request
response = requests.patch(f"https://api.example.com/users/123",
json={"role": "admin"},
headers={"Authorization": "Bearer YOUR_TOKEN"}
)
# DELETE Request
response = requests.delete(f"https://api.example.com/users/456",
headers={"Authorization": "Bearer YOUR_TOKEN"}
)
# Session — ใช้ซ้ำ Connection
session = requests.Session()
session.headers.update({"Authorization": "Bearer YOUR_TOKEN"})
session.headers.update({"Content-Type": "application/json"})
# ใช้ Session สำหรับหลาย Request
for page in range(1, 20):
resp = session.get(f"https://api.example.com/data?page={page}")
data = resp.json()
print(f"Page {page}: {len(data['items'])} items")
httpx — Async HTTP Client (เร็วกว่า requests)
import httpx
import asyncio
# Synchronous
with httpx.Client(timeout=30) as client:
resp = client.get("https://api.example.com/data")
print(resp.json())
# Asynchronous — ดึงข้อมูลพร้อมกันหลาย URL
async def fetch_all():
urls = [
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
]
async with httpx.AsyncClient(timeout=30) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
for resp in responses:
print(f"{resp.url}: {resp.status_code}")
asyncio.run(fetch_all())
Email Automation — ส่งอีเมลอัตโนมัติ
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
def send_email(to_email, subject, body, attachment_path=None):
# ตั้งค่า SMTP
smtp_server = "smtp.gmail.com"
smtp_port = 587
sender_email = "your@gmail.com"
sender_password = "your_app_password" # ใช้ App Password
# สร้างข้อความ
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = to_email
msg["Subject"] = subject
# Body (HTML)
html_body = f"""
<html>
<body>
<h2>{subject}</h2>
<p>{body}</p>
<p>ส่งโดยระบบอัตโนมัติ</p>
</body>
</html>
"""
msg.attach(MIMEText(html_body, "html", "utf-8"))
# แนบไฟล์ (ถ้ามี)
if attachment_path:
with open(attachment_path, "rb") as f:
part = MIMEBase("application", "octet-stream")
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition",
f"attachment; filename={Path(attachment_path).name}")
msg.attach(part)
# ส่ง
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(sender_email, sender_password)
server.send_message(msg)
print(f"ส่งอีเมลถึง {to_email} สำเร็จ")
# ใช้งาน
send_email("boss@company.com", "รายงานประจำวัน",
"รายงานยอดขายวันนี้ 500,000 บาท", "report.xlsx")
PDF Manipulation — จัดการ PDF
from PyPDF2 import PdfReader, PdfWriter, PdfMerger
# อ่าน PDF
reader = PdfReader("document.pdf")
print(f"จำนวนหน้า: {len(reader.pages)}")
# ดึงข้อความ
for page in reader.pages:
text = page.extract_text()
print(text)
# แยกหน้า
writer = PdfWriter()
writer.add_page(reader.pages[0]) # หน้าแรก
with open("first_page.pdf", "wb") as f:
writer.write(f)
# รวม PDF
merger = PdfMerger()
merger.append("part1.pdf")
merger.append("part2.pdf")
merger.append("part3.pdf")
merger.write("combined.pdf")
merger.close()
# สร้าง PDF ใหม่ด้วย reportlab
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
# ลงทะเบียน Font ภาษาไทย
pdfmetrics.registerFont(TTFont("THSarabun", "THSarabunNew.ttf"))
c = canvas.Canvas("report.pdf", pagesize=A4)
c.setFont("THSarabun", 16)
c.drawString(72, 750, "รายงานประจำเดือน เมษายน 2026")
c.setFont("THSarabun", 12)
c.drawString(72, 700, "ยอดขายรวม: 1,500,000 บาท")
c.drawString(72, 680, "จำนวนลูกค้า: 350 ราย")
c.save()
Image Processing — ประมวลผลรูปภาพ
from PIL import Image, ImageDraw, ImageFont, ImageFilter
from pathlib import Path
# เปิดและดูข้อมูลรูป
img = Image.open("photo.jpg")
print(f"ขนาด: {img.size}")
print(f"Format: {img.format}")
print(f"Mode: {img.mode}")
# Resize
img_resized = img.resize((800, 600))
img_resized.save("photo_800x600.jpg")
# Thumbnail (รักษา Aspect Ratio)
img_thumb = img.copy()
img_thumb.thumbnail((200, 200))
img_thumb.save("thumbnail.jpg")
# Crop
box = (100, 100, 500, 400) # left, upper, right, lower
img_cropped = img.crop(box)
img_cropped.save("cropped.jpg")
# แปลง Format
img.save("photo.png")
img.save("photo.webp", quality=85)
# ใส่ Watermark
watermark = Image.open("watermark.png").convert("RGBA")
img.paste(watermark, (img.width - watermark.width - 10,
img.height - watermark.height - 10),
watermark)
# Batch Processing — ย่อรูปทั้งโฟลเดอร์
input_dir = Path("photos")
output_dir = Path("photos_resized")
output_dir.mkdir(exist_ok=True)
for img_path in input_dir.glob("*.jpg"):
img = Image.open(img_path)
img.thumbnail((1200, 1200))
img.save(output_dir / img_path.name, "JPEG", quality=85, optimize=True)
print(f"ย่อ {img_path.name}: {img.size}")
Scheduling — ตั้งเวลารันอัตโนมัติ
schedule — ง่ายที่สุด
import schedule
import time
def daily_report():
print(f"กำลังสร้างรายงาน... {datetime.now()}")
# ... สร้างรายงาน ...
def check_api():
print(f"ตรวจสอบ API... {datetime.now()}")
# ... เช็คสถานะ API ...
# ตั้งเวลา
schedule.every().day.at("08:00").do(daily_report)
schedule.every(30).minutes.do(check_api)
schedule.every().monday.at("09:00").do(daily_report)
schedule.every().hour.do(check_api)
# รัน Loop
while True:
schedule.run_pending()
time.sleep(60)
APScheduler — ฟีเจอร์ครบ
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = BlockingScheduler()
@scheduler.scheduled_job("interval", minutes=30)
def check_system():
print("ตรวจสอบระบบ...")
@scheduler.scheduled_job(CronTrigger(hour=8, minute=0))
def morning_report():
print("สร้างรายงานเช้า...")
@scheduler.scheduled_job(CronTrigger(day_of_week="mon-fri", hour=18))
def daily_backup():
print("สำรองข้อมูลประจำวัน...")
scheduler.start()
cron (Linux) — ตั้งเวลาผ่าน OS
# แก้ไข crontab
crontab -e
# รันทุกวัน 8 โมงเช้า
0 8 * * * /usr/bin/python3 /home/user/scripts/daily_report.py >> /var/log/report.log 2>&1
# รันทุก 30 นาที
*/30 * * * * /usr/bin/python3 /home/user/scripts/check_api.py
# รันวันจันทร์-ศุกร์ เวลา 18:00
0 18 * * 1-5 /usr/bin/python3 /home/user/scripts/backup.py
# Windows Task Scheduler
# schtasks /create /tn "DailyReport" /tr "python C:\scripts\report.py" /sc daily /st 08:00
Database Automation
import sqlite3
import pymysql
# SQLite
conn = sqlite3.connect("app.db")
cursor = conn.cursor()
# สร้างตาราง
cursor.execute("""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
level TEXT,
message TEXT
)
""")
# Insert ข้อมูล
cursor.execute("INSERT INTO logs (level, message) VALUES (?, ?)",
("INFO", "System started"))
conn.commit()
# Query
cursor.execute("SELECT * FROM logs WHERE level = ?", ("ERROR",))
errors = cursor.fetchall()
print(f"พบ Error {len(errors)} รายการ")
conn.close()
# MySQL/MariaDB
conn = pymysql.connect(
host="localhost", user="root", password="secret",
database="myapp", charset="utf8mb4"
)
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute("SELECT * FROM users WHERE active = %s", (1,))
users = cursor.fetchall()
for user in users:
print(user["name"], user["email"])
conn.close()
Notification Bots — บอทแจ้งเตือน
LINE Notify
import requests
def send_line_notify(token, message, image_path=None):
url = "https://notify-api.line.me/api/notify"
headers = {"Authorization": f"Bearer {token}"}
data = {"message": message}
files = None
if image_path:
files = {"imageFile": open(image_path, "rb")}
resp = requests.post(url, headers=headers, data=data, files=files)
return resp.status_code == 200
# ใช้งาน
LINE_TOKEN = "YOUR_LINE_NOTIFY_TOKEN"
send_line_notify(LINE_TOKEN, "\nรายงานประจำวัน\nยอดขาย: 500,000 บาท\nลูกค้าใหม่: 15 ราย")
Telegram Bot
import requests
def send_telegram(bot_token, chat_id, message):
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = {
"chat_id": chat_id,
"text": message,
"parse_mode": "HTML"
}
resp = requests.post(url, json=data)
return resp.json()
# ใช้งาน
BOT_TOKEN = "YOUR_BOT_TOKEN"
CHAT_ID = "YOUR_CHAT_ID"
send_telegram(BOT_TOKEN, CHAT_ID,
"<b>แจ้งเตือน</b>\n"
"Server CPU Usage: 95%\n"
"กรุณาตรวจสอบ!"
)
Slack Webhook
import requests
import json
def send_slack(webhook_url, message, color="#36a64f"):
payload = {
"attachments": [{
"color": color,
"text": message,
"footer": "Python Automation Bot",
}]
}
resp = requests.post(webhook_url, json=payload)
return resp.status_code == 200
SLACK_WEBHOOK = "https://hooks.slack.com/services/XXX/YYY/ZZZ"
send_slack(SLACK_WEBHOOK, "Deploy สำเร็จ! Version 2.5.0 พร้อมใช้งาน")
CLI Tools — สร้างเครื่องมือ Command Line
argparse — Built-in
import argparse
parser = argparse.ArgumentParser(description="File Organizer Tool")
parser.add_argument("source", help="Source directory")
parser.add_argument("-o", "--output", default="./organized", help="Output directory")
parser.add_argument("-d", "--dry-run", action="store_true", help="Show what would happen")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
print(f"Source: {args.source}")
print(f"Output: {args.output}")
# python organizer.py ./Downloads -o ./Organized --dry-run -v
typer — Modern CLI Framework
import typer
app = typer.Typer()
@app.command()
def organize(
source: str = typer.Argument(..., help="Source directory"),
output: str = typer.Option("./organized", help="Output directory"),
dry_run: bool = typer.Option(False, "--dry-run", help="Preview only"),
):
"""จัดระเบียบไฟล์ตามนามสกุล"""
typer.echo(f"จัดระเบียบ: {source} -> {output}")
if dry_run:
typer.echo("(Dry run mode)")
@app.command()
def clean(
path: str = typer.Argument(..., help="Directory to clean"),
days: int = typer.Option(30, help="Delete files older than N days"),
):
"""ลบไฟล์เก่า"""
typer.echo(f"ลบไฟล์เก่ากว่า {days} วัน ใน {path}")
if __name__ == "__main__":
app()
# python tool.py organize ./Downloads --dry-run
# python tool.py clean ./temp --days 7
Packaging Scripts — แพ็ค Script เป็น EXE
# ติดตั้ง PyInstaller
pip install pyinstaller
# สร้าง EXE (ไฟล์เดียว)
pyinstaller --onefile --name "FileOrganizer" organizer.py
# สร้าง EXE พร้อม Icon
pyinstaller --onefile --windowed --icon=app.ico --name "MyTool" main.py
# ผลลัพธ์จะอยู่ใน dist/ folder
# dist/FileOrganizer.exe
Error Handling & Logging
import logging
from pathlib import Path
# ตั้งค่า Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("automation.log", encoding="utf-8"),
logging.StreamHandler() # แสดงใน Console ด้วย
]
)
logger = logging.getLogger(__name__)
# ใช้ในโปรแกรม
def process_file(filepath):
try:
logger.info(f"กำลังประมวลผล: {filepath}")
# ... ทำงาน ...
logger.info(f"สำเร็จ: {filepath}")
except FileNotFoundError:
logger.error(f"ไม่พบไฟล์: {filepath}")
except PermissionError:
logger.error(f"ไม่มีสิทธิ์เข้าถึง: {filepath}")
except Exception as e:
logger.exception(f"เกิดข้อผิดพลาด: {e}")
# Retry Decorator
import time
from functools import wraps
def retry(max_attempts=3, delay=5):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning(f"Attempt {attempt}/{max_attempts} failed: {e}")
if attempt < max_attempts:
time.sleep(delay)
else:
raise
return wrapper
return decorator
@retry(max_attempts=3, delay=10)
def fetch_data(url):
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
Practical Project: Automated Report Generator
#!/usr/bin/env python3
"""รายงานยอดขายอัตโนมัติ — ส่งทุกเช้า 8 โมง"""
import pandas as pd
import requests
from datetime import datetime, timedelta
from pathlib import Path
import logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def fetch_sales_data():
"""ดึงข้อมูลยอดขายจาก API"""
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
resp = requests.get(f"https://api.myshop.com/sales?date={yesterday}",
headers={"Authorization": "Bearer TOKEN"})
return pd.DataFrame(resp.json()["data"])
def generate_report(df):
"""สร้างรายงาน Excel"""
report_path = Path(f"reports/sales_{datetime.now().strftime('%Y%m%d')}.xlsx")
report_path.parent.mkdir(exist_ok=True)
with pd.ExcelWriter(str(report_path), engine="openpyxl") as writer:
# สรุปรวม
summary = df.groupby("category").agg({
"amount": ["sum", "count", "mean"]
}).round(2)
summary.to_excel(writer, sheet_name="สรุป")
# รายละเอียด
df.to_excel(writer, sheet_name="รายละเอียด", index=False)
logger.info(f"สร้างรายงาน: {report_path}")
return str(report_path)
def send_notification(report_path, total_sales, total_orders):
"""แจ้งเตือนผ่าน LINE"""
message = (
f"\nรายงานยอดขาย {datetime.now().strftime('%d/%m/%Y')}"
f"\nยอดขายรวม: {total_sales:,.0f} บาท"
f"\nจำนวน Orders: {total_orders:,} รายการ"
)
requests.post("https://notify-api.line.me/api/notify",
headers={"Authorization": "Bearer LINE_TOKEN"},
data={"message": message},
files={"imageFile": open(report_path, "rb")} if report_path.endswith(".png") else None
)
def main():
logger.info("เริ่มสร้างรายงาน...")
df = fetch_sales_data()
report_path = generate_report(df)
total_sales = df["amount"].sum()
total_orders = len(df)
send_notification(report_path, total_sales, total_orders)
logger.info("เสร็จสิ้น!")
if __name__ == "__main__":
main()
Practical Project: Monitoring Bot
#!/usr/bin/env python3
"""ตรวจสอบสถานะเว็บไซต์ทุก 5 นาที"""
import requests
import time
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
SITES = [
{"name": "Main Website", "url": "https://example.com", "expected_status": 200},
{"name": "API Server", "url": "https://api.example.com/health", "expected_status": 200},
{"name": "Admin Panel", "url": "https://admin.example.com", "expected_status": 200},
]
def check_site(site):
try:
start = time.time()
resp = requests.get(site["url"], timeout=15)
response_time = (time.time() - start) * 1000 # ms
if resp.status_code != site["expected_status"]:
return {"status": "ERROR", "message": f"Status {resp.status_code}", "time": response_time}
if response_time > 5000:
return {"status": "SLOW", "message": f"{response_time:.0f}ms", "time": response_time}
return {"status": "OK", "message": f"{response_time:.0f}ms", "time": response_time}
except requests.exceptions.Timeout:
return {"status": "TIMEOUT", "message": "Timeout 15s", "time": 0}
except requests.exceptions.ConnectionError:
return {"status": "DOWN", "message": "Connection refused", "time": 0}
def alert(site_name, result):
"""ส่งแจ้งเตือนเมื่อมีปัญหา"""
message = f"[{result['status']}] {site_name}: {result['message']}"
logger.warning(message)
# ส่ง LINE/Telegram/Slack ที่นี่
def monitor():
while True:
for site in SITES:
result = check_site(site)
if result["status"] in ("ERROR", "TIMEOUT", "DOWN"):
alert(site["name"], result)
else:
logger.info(f"[OK] {site['name']}: {result['message']}")
time.sleep(300) # 5 นาที
if __name__ == "__main__":
monitor()
Best Practices — แนวทางปฏิบัติที่ดี
- ใช้ Virtual Environment เสมอ — แยก Dependencies ของแต่ละโปรเจกต์ ป้องกัน Library ชนกัน ใช้ requirements.txt หรือ pyproject.toml จัดการ
- ใช้ Environment Variables สำหรับ Secrets — อย่า Hardcode Password, API Key ในโค้ด ใช้ .env file กับ python-dotenv แทน
- เขียน Logging แทน print — Logging มี Level (DEBUG, INFO, WARNING, ERROR) บันทึกลงไฟล์ได้ ตรวจสอบปัญหาย้อนหลังได้
- จัดการ Error ด้วย try/except — อย่าปล่อยให้ Script พังโดยไม่มีการจัดการ ใช้ Retry สำหรับ Network Operations
- ใช้ pathlib แทน os.path — pathlib เป็น Object-oriented ใช้ง่ายกว่า อ่านง่ายกว่า และเป็นมาตรฐานใน Python 3
- หน่วงเวลาระหว่าง Request — เมื่อ Scrape หรือเรียก API ต้องมี delay เพื่อไม่ให้โดน Rate Limit หรือ Ban
- ทดสอบก่อน Production — ทดสอบ Script กับข้อมูลตัวอย่างก่อนรันจริง ใช้ Dry Run mode สำหรับการลบ/ย้ายไฟล์
- เอกสารประกอบ — เขียน Docstring อธิบายว่า Script ทำอะไร ต้องการ Input อะไร Output อะไร
สรุป
Python Automation เป็นทักษะที่มีค่ามหาศาลสำหรับนักพัฒนาทุกคน ช่วยประหยัดเวลาจากงานซ้ำซาก ลดความผิดพลาดจากการทำด้วยมือ และเพิ่มประสิทธิภาพการทำงานหลายเท่า ไม่ว่าจะเป็นการจัดการไฟล์ ดึงข้อมูลจากเว็บ ส่งรายงานอัตโนมัติ หรือสร้างบอทแจ้งเตือน Python ทำได้ทุกอย่าง
เริ่มต้นจากปัญหาที่คุณเจอทุกวัน ถ้ามีงานที่ทำซ้ำมากกว่า 3 ครั้ง นั่นคือโอกาสที่จะเขียน Script อัตโนมัติ เริ่มจากงานง่ายๆ เช่น จัดระเบียบไฟล์ หรือดึงข้อมูลจาก API แล้วค่อยๆ เพิ่มความซับซ้อน คุณจะพบว่า Python Automation ไม่เพียงช่วยประหยัดเวลา แต่ยังทำให้งานสนุกขึ้นอีกด้วย
