IoTLabs

Nghiên cứu, Sáng tạo và Thử nghiệm

Series: Lập trình Raspberry Pi – Bài 14: Background jobs — Scheduler (APScheduler/Cron) để chạy tác vụ định kỳ

Series: Lập trình Raspberry Pi & Ứng dụng thực tế Phần 3 — Python “build app thật” Bài 14: Background jobs — Scheduler (APScheduler/Cron) để chạy tác vụ định kỳ


1) Mục tiêu bài học

Sau bài này bạn sẽ:

  • Chạy tác vụ nền định kỳ theo phút/giây mà không cần viết while-loop thủ công.
  • Dùng APScheduler (khuyến nghị cho app Python) và hiểu khi nào dùng Cron.
  • Kết hợp scheduler với FastAPI (Bài 13) theo kiểu “API + job” như production.

2) Khi nào dùng APScheduler, khi nào dùng Cron?

APScheduler (trong app)

Dùng khi:

  • Job cần dùng chung config/logger/DB connection của app.
  • Muốn quản lý job bằng code, dễ debug, dễ đóng gói thành 1 service.

Cron (ngoài hệ điều hành)

Dùng khi:

  • Job là script độc lập (backup, dọn rác, rotate file…).
  • Muốn OS tự chạy “đúng giờ” ngay cả khi app không chạy.

Trong series này: ưu tiên APScheduler cho các job liên quan ứng dụng IoT gateway.

3) Cài APScheduler

Trong project ~/apps/iotlabs-py-agent:

cd ~/apps/iotlabs-py-agent
source .venv/bin/activate
pip install apscheduler

4) Tạo job định kỳ (ví dụ: log heartbeat + dọn ảnh cũ)

Tạo file src/jobs.py:

nano src/jobs.py

Dán code:

import os
import time
from datetime import datetime, timedelta

def job_heartbeat(cfg: dict, logger):
    device_id = cfg["device"]["id"]
    logger.info("job heartbeat | device_id=%s | ts=%s",
                device_id, datetime.now().isoformat(timespec="seconds"))

def job_cleanup_old_files(cfg: dict, logger):
    # ví dụ: dọn ảnh camera cũ (nếu bạn có dùng ~/data/camera)
    base_dir = os.path.expanduser(cfg.get("camera", {}).get("base_dir", "~/data/camera"))
    keep_days = int(cfg.get("camera", {}).get("keep_days", 7))

    cutoff = datetime.now() - timedelta(days=keep_days)
    removed = 0

    if not os.path.isdir(base_dir):
        logger.info("cleanup skip | base_dir not found: %s", base_dir)
        return

    for root, _, files in os.walk(base_dir):
        for fn in files:
            if not fn.lower().endswith((".jpg", ".jpeg", ".png")):
                continue
            path = os.path.join(root, fn)
            try:
                mtime = datetime.fromtimestamp(os.path.getmtime(path))
                if mtime < cutoff:
                    os.remove(path)
                    removed += 1
            except Exception as e:
                logger.warning("cleanup error | %s | %s", path, e)

    logger.info("cleanup done | base_dir=%s | keep_days=%s | removed=%s",
                base_dir, keep_days, removed)

5) Tạo scheduler runner

Tạo file src/scheduler_runner.py:

nano src/scheduler_runner.py

Dán code:

import os
import yaml
from dotenv import load_dotenv
from apscheduler.schedulers.background import BackgroundScheduler

from src.utils.logging import setup_logging
from src.jobs import job_heartbeat, job_cleanup_old_files

def load_yaml(path: str) -> dict:
    with open(path, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)

def build_scheduler(cfg: dict, logger) -> BackgroundScheduler:
    sched = BackgroundScheduler(timezone="Asia/Bangkok")

    # 1) heartbeat mỗi 30s
    sched.add_job(
        job_heartbeat,
        "interval",
        seconds=30,
        args=[cfg, logger],
        id="heartbeat",
        replace_existing=True,
        max_instances=1,
        coalesce=True
    )

    # 2) cleanup mỗi ngày 02:10
    sched.add_job(
        job_cleanup_old_files,
        "cron",
        hour=2,
        minute=10,
        args=[cfg, logger],
        id="cleanup",
        replace_existing=True,
        max_instances=1,
        coalesce=True
    )

    return sched

def main():
    load_dotenv()

    app_name = os.getenv("APP_NAME", "iotlabs-py-agent")
    log_level = os.getenv("LOG_LEVEL", "INFO")
    log_dir = os.getenv("LOG_DIR", "logs")
    cfg_path = os.getenv("CONFIG_YAML", "config/config.yaml")

    logger = setup_logging(app_name, log_dir, log_level)
    cfg = load_yaml(cfg_path)

    sched = build_scheduler(cfg, logger)
    sched.start()

    logger.info("scheduler started")

    # giữ process sống
    try:
        import time
        while True:
            time.sleep(3600)
    except KeyboardInterrupt:
        logger.info("scheduler stopping...")
        sched.shutdown()

if __name__ == "__main__":
    main()

6) Thêm config cho camera cleanup (tuỳ chọn)

Mở config/config.yaml thêm:

camera:
  base_dir: "~/data/camera"
  keep_days: 7

7) Chạy thử scheduler

cd ~/apps/iotlabs-py-agent
source .venv/bin/activate
python -m src.scheduler_runner

Xem log:

tail -n 50 logs/iotlabs-py-agent.log

8) Kết hợp FastAPI + Scheduler trong 1 process (gợi ý production)

Bạn có 2 lựa chọn:

A) Tách 2 service (khuyến nghị, rõ ràng)

  • iotlabs-api.service
  • iotlabs-scheduler.service

B) Gộp chung (đơn giản triển khai)

Trong src/api.py, bạn có thể start scheduler ở startup event của FastAPI.

Bài 15 mình sẽ hướng dẫn nâng cấp storage (SQLite → Postgres), nên tách service thường dễ mở rộng hơn.


9) Cron job (tối giản) — ví dụ backup/dọn rác

Mở crontab:

crontab -e

Ví dụ chạy mỗi ngày 02:10:

10 2 * * * /usr/bin/bash /home/developer/apps/iotlabs-py-agent/scripts/backup.sh >> /home/developer/apps/iotlabs-py-agent/logs/cron.log 2>&1

10) Bài tập nâng cấp

  1. Tạo job “publish metrics” (in ra hoặc viết file json mỗi 60s).
  2. Thêm job “ping MQTT broker” (chỉ ping TCP host:port) mỗi 2 phút.
  3. Tách thành 2 systemd services: api + scheduler.