IoTLabs

Nghiên cứu, Sáng tạo và Thử nghiệm

Series: Lập trình Raspberry Pi – Bài 21: Điều khiển từ xa qua MQTT (Command Handler) — bật/tắt relay, đổi interval, cập nhật OLED

Series: Lập trình Raspberry Pi & Ứng dụng thực tế Phần 3 — Python “build app thật” Bài 21: Điều khiển từ xa qua MQTT (Command Handler) — bật/tắt relay, đổi interval, cập nhật OLED


1) Mục tiêu bài học

Sau bài này bạn sẽ:

  • Nhận lệnh MQTT ở topic iotlabs/<device_id>/cmd
  • Parse command JSON theo chuẩn
  • Thực thi 3 nhóm lệnh “thực chiến”:
    1. Relay: ON/OFF/TOGGLE
    2. Runtime: đổi telemetry_interval_sec
    3. OLED: set text hiển thị (nếu có OLED)
  • Gửi phản hồi về iotlabs/<device_id>/status hoặc …/event

2) Chuẩn command payload đề xuất

Topic: iotlabs/pi-gw-01/cmd

Payload chung:

{
  "id": "cmd-0001",
  "ts": "2026-02-17T22:10:00",
  "action": "relay.set",
  "data": { "state": "on" }
}

Các action mẫu:

  • relay.set { “state”: “on” | “off” }
  • relay.toggle {}
  • runtime.set_interval { “sec”: 30 }
  • oled.set_text { “line1″:”…”, “line2″:”…” }

Phản hồi event (device → server):

  • Topic: iotlabs/<device_id>/event
{
  "ts": "...",
  "device_id": "pi-gw-01",
  "cmd_id": "cmd-0001",
  "ok": true,
  "result": { "state": "on" }
}

3) Chuẩn bị: cấu trúc “Command Bus”

Ta sẽ tạo:

  • src/commands/handlers.py (map action → function)
  • src/runtime_state.py (giữ state runtime như interval, relay state)
  • update src/mqtt_client.py để gọi command handler

Tạo folder:

cd ~/apps/iotlabs-py-agent
mkdir -p src/commands
touch src/commands/__init__.py
nano src/runtime_state.py

4) Runtime state (in-memory)

src/runtime_state.py:

from dataclasses import dataclass

@dataclass
class RuntimeState:
    telemetry_interval_sec: int = 10
    relay_state: str = "off"
    oled_line1: str = "IoTLabs Pi"
    oled_line2: str = "ready"

5) Command handlers

Tạo src/commands/handlers.py:

nano src/commands/handlers.py

Dán:

from typing import Dict, Any
from src.runtime_state import RuntimeState

# --- Relay driver (stub) ---
# Nếu bạn đã làm relay ở Bài 9, bạn có thể implement thật ở đây.
def relay_apply(state: str, logger):
    # TODO: thay bằng gpiozero OutputDevice
    logger.info("relay_apply | state=%s (stub)", state)

def handle_relay_set(cmd: Dict[str, Any], rt: RuntimeState, logger):
    state = cmd.get("data", {}).get("state", "").lower()
    if state not in ("on", "off"):
        raise ValueError("invalid relay state")
    relay_apply(state, logger)
    rt.relay_state = state
    return {"state": rt.relay_state}

def handle_relay_toggle(cmd: Dict[str, Any], rt: RuntimeState, logger):
    new_state = "off" if rt.relay_state == "on" else "on"
    relay_apply(new_state, logger)
    rt.relay_state = new_state
    return {"state": rt.relay_state}

def handle_runtime_set_interval(cmd: Dict[str, Any], rt: RuntimeState, logger):
    sec = int(cmd.get("data", {}).get("sec", 0))
    if sec < 2 or sec > 3600:
        raise ValueError("interval out of range (2..3600)")
    rt.telemetry_interval_sec = sec
    logger.info("runtime interval updated | sec=%s", sec)
    return {"telemetry_interval_sec": rt.telemetry_interval_sec}

def handle_oled_set_text(cmd: Dict[str, Any], rt: RuntimeState, logger):
    line1 = str(cmd.get("data", {}).get("line1", ""))[:20]
    line2 = str(cmd.get("data", {}).get("line2", ""))[:20]
    rt.oled_line1 = line1 or rt.oled_line1
    rt.oled_line2 = line2 or rt.oled_line2
    logger.info("oled text updated | %s | %s", rt.oled_line1, rt.oled_line2)
    # TODO: nếu bạn có OLED thật, gọi hàm render OLED ở đây
    return {"line1": rt.oled_line1, "line2": rt.oled_line2}

HANDLERS = {
    "relay.set": handle_relay_set,
    "relay.toggle": handle_relay_toggle,
    "runtime.set_interval": handle_runtime_set_interval,
    "oled.set_text": handle_oled_set_text,
}

def dispatch(cmd: Dict[str, Any], rt: RuntimeState, logger):
    action = cmd.get("action")
    if action not in HANDLERS:
        raise ValueError(f"unknown action: {action}")
    return HANDLERS[action](cmd, rt, logger)

6) Nâng MQTT client để publish event + gọi dispatch

Mở src/mqtt_client.py (Bài 20) và bổ sung:

  • topic event: iotlabs/<device_id>/event
  • nhận cmd → dispatch → publish event ok/error

Thêm import:

from src.commands.handlers import dispatch
from src.runtime_state import RuntimeState

Trong __init__ thêm:

self.t_event = f"iotlabs/{self.device_id}/event"
self.rt = RuntimeState(telemetry_interval_sec=int(os.getenv("TELEMETRY_INTERVAL_SEC", "10")))

Sửa on_message:

def on_message(self, client, userdata, msg):
    payload = msg.payload.decode("utf-8", errors="ignore")
    self.logger.info("cmd received | topic=%s | payload=%s", msg.topic, payload)

    cmd_id = ""
    try:
        cmd = json.loads(payload)
        cmd_id = cmd.get("id", "")
        result = dispatch(cmd, self.rt, self.logger)

        event = {
            "ts": now_ts(),
            "device_id": self.device_id,
            "cmd_id": cmd_id,
            "ok": True,
            "result": result
        }
        self.client.publish(self.t_event, json.dumps(event, ensure_ascii=False), qos=1, retain=False)

    except Exception as e:
        event = {
            "ts": now_ts(),
            "device_id": self.device_id,
            "cmd_id": cmd_id,
            "ok": False,
            "error": str(e)
        }
        self.client.publish(self.t_event, json.dumps(event, ensure_ascii=False), qos=1, retain=False)

7) Dùng interval runtime trong mqtt_runner

Sửa src/mqtt_runner.py để interval lấy từ agent.rt.telemetry_interval_sec:

while True:
    agent.publish_telemetry(t, h)
    time.sleep(agent.rt.telemetry_interval_sec)

8) Test nhanh bằng MQTTX

Subscribe:

  • iotlabs/pi-gw-01/event
  • iotlabs/pi-gw-01/status

Publish command:

Topic: iotlabs/pi-gw-01/cmd

8.1 Toggle relay

{"id":"cmd-1001","ts":"2026-02-17T22:10:00","action":"relay.toggle","data":{}}

8.2 Set interval 30s

{"id":"cmd-1002","ts":"2026-02-17T22:10:05","action":"runtime.set_interval","data":{"sec":30}}

8.3 OLED set text

{"id":"cmd-1003","ts":"2026-02-17T22:10:10","action":"oled.set_text","data":{"line1":"IoTLabs","line2":"Online"}}

Bạn sẽ thấy event trả về ok:true/false.

9) Nâng cấp “relay_apply” thành điều khiển thật (gắn với Bài 9)

Nếu relay module của bạn là LOW trigger:

from gpiozero import OutputDevice
relay = OutputDevice(17, active_high=False, initial_value=False)

def relay_apply(state: str, logger):
    if state == "on":
        relay.on()
    else:
        relay.off()
    logger.info("relay applied | state=%s", state)