Series: Lập trình Raspberry Pi & Ứng dụng thực tế Phần 3 — Python “build app thật” Bài 21: Điều khiển từ xa qua MQTT (Command Handler) — bật/tắt relay, đổi interval, cập nhật OLED
1) Mục tiêu bài học
Sau bài này bạn sẽ:
- Nhận lệnh MQTT ở topic iotlabs/<device_id>/cmd
- Parse command JSON theo chuẩn
- Thực thi 3 nhóm lệnh “thực chiến”:
- Relay: ON/OFF/TOGGLE
- Runtime: đổi telemetry_interval_sec
- OLED: set text hiển thị (nếu có OLED)
- Gửi phản hồi về iotlabs/<device_id>/status hoặc …/event
2) Chuẩn command payload đề xuất
Topic: iotlabs/pi-gw-01/cmd
Payload chung:
{
"id": "cmd-0001",
"ts": "2026-02-17T22:10:00",
"action": "relay.set",
"data": { "state": "on" }
}
Các action mẫu:
- relay.set { “state”: “on” | “off” }
- relay.toggle {}
- runtime.set_interval { “sec”: 30 }
- oled.set_text { “line1″:”…”, “line2″:”…” }
Phản hồi event (device → server):
- Topic: iotlabs/<device_id>/event
{
"ts": "...",
"device_id": "pi-gw-01",
"cmd_id": "cmd-0001",
"ok": true,
"result": { "state": "on" }
}
3) Chuẩn bị: cấu trúc “Command Bus”
Ta sẽ tạo:
- src/commands/handlers.py (map action → function)
- src/runtime_state.py (giữ state runtime như interval, relay state)
- update src/mqtt_client.py để gọi command handler
Tạo folder:
cd ~/apps/iotlabs-py-agent
mkdir -p src/commands
touch src/commands/__init__.py
nano src/runtime_state.py
4) Runtime state (in-memory)
src/runtime_state.py:
from dataclasses import dataclass
@dataclass
class RuntimeState:
telemetry_interval_sec: int = 10
relay_state: str = "off"
oled_line1: str = "IoTLabs Pi"
oled_line2: str = "ready"
5) Command handlers
Tạo src/commands/handlers.py:
nano src/commands/handlers.py
Dán:
from typing import Dict, Any
from src.runtime_state import RuntimeState
# --- Relay driver (stub) ---
# Nếu bạn đã làm relay ở Bài 9, bạn có thể implement thật ở đây.
def relay_apply(state: str, logger):
# TODO: thay bằng gpiozero OutputDevice
logger.info("relay_apply | state=%s (stub)", state)
def handle_relay_set(cmd: Dict[str, Any], rt: RuntimeState, logger):
state = cmd.get("data", {}).get("state", "").lower()
if state not in ("on", "off"):
raise ValueError("invalid relay state")
relay_apply(state, logger)
rt.relay_state = state
return {"state": rt.relay_state}
def handle_relay_toggle(cmd: Dict[str, Any], rt: RuntimeState, logger):
new_state = "off" if rt.relay_state == "on" else "on"
relay_apply(new_state, logger)
rt.relay_state = new_state
return {"state": rt.relay_state}
def handle_runtime_set_interval(cmd: Dict[str, Any], rt: RuntimeState, logger):
sec = int(cmd.get("data", {}).get("sec", 0))
if sec < 2 or sec > 3600:
raise ValueError("interval out of range (2..3600)")
rt.telemetry_interval_sec = sec
logger.info("runtime interval updated | sec=%s", sec)
return {"telemetry_interval_sec": rt.telemetry_interval_sec}
def handle_oled_set_text(cmd: Dict[str, Any], rt: RuntimeState, logger):
line1 = str(cmd.get("data", {}).get("line1", ""))[:20]
line2 = str(cmd.get("data", {}).get("line2", ""))[:20]
rt.oled_line1 = line1 or rt.oled_line1
rt.oled_line2 = line2 or rt.oled_line2
logger.info("oled text updated | %s | %s", rt.oled_line1, rt.oled_line2)
# TODO: nếu bạn có OLED thật, gọi hàm render OLED ở đây
return {"line1": rt.oled_line1, "line2": rt.oled_line2}
HANDLERS = {
"relay.set": handle_relay_set,
"relay.toggle": handle_relay_toggle,
"runtime.set_interval": handle_runtime_set_interval,
"oled.set_text": handle_oled_set_text,
}
def dispatch(cmd: Dict[str, Any], rt: RuntimeState, logger):
action = cmd.get("action")
if action not in HANDLERS:
raise ValueError(f"unknown action: {action}")
return HANDLERS[action](cmd, rt, logger)
6) Nâng MQTT client để publish event + gọi dispatch
Mở src/mqtt_client.py (Bài 20) và bổ sung:
- topic event: iotlabs/<device_id>/event
- nhận cmd → dispatch → publish event ok/error
Thêm import:
from src.commands.handlers import dispatch
from src.runtime_state import RuntimeState
Trong __init__ thêm:
self.t_event = f"iotlabs/{self.device_id}/event"
self.rt = RuntimeState(telemetry_interval_sec=int(os.getenv("TELEMETRY_INTERVAL_SEC", "10")))
Sửa on_message:
def on_message(self, client, userdata, msg):
payload = msg.payload.decode("utf-8", errors="ignore")
self.logger.info("cmd received | topic=%s | payload=%s", msg.topic, payload)
cmd_id = ""
try:
cmd = json.loads(payload)
cmd_id = cmd.get("id", "")
result = dispatch(cmd, self.rt, self.logger)
event = {
"ts": now_ts(),
"device_id": self.device_id,
"cmd_id": cmd_id,
"ok": True,
"result": result
}
self.client.publish(self.t_event, json.dumps(event, ensure_ascii=False), qos=1, retain=False)
except Exception as e:
event = {
"ts": now_ts(),
"device_id": self.device_id,
"cmd_id": cmd_id,
"ok": False,
"error": str(e)
}
self.client.publish(self.t_event, json.dumps(event, ensure_ascii=False), qos=1, retain=False)
7) Dùng interval runtime trong mqtt_runner
Sửa src/mqtt_runner.py để interval lấy từ agent.rt.telemetry_interval_sec:
while True:
agent.publish_telemetry(t, h)
time.sleep(agent.rt.telemetry_interval_sec)
8) Test nhanh bằng MQTTX
Subscribe:
- iotlabs/pi-gw-01/event
- iotlabs/pi-gw-01/status
Publish command:
Topic: iotlabs/pi-gw-01/cmd
8.1 Toggle relay
{"id":"cmd-1001","ts":"2026-02-17T22:10:00","action":"relay.toggle","data":{}}
8.2 Set interval 30s
{"id":"cmd-1002","ts":"2026-02-17T22:10:05","action":"runtime.set_interval","data":{"sec":30}}
8.3 OLED set text
{"id":"cmd-1003","ts":"2026-02-17T22:10:10","action":"oled.set_text","data":{"line1":"IoTLabs","line2":"Online"}}
Bạn sẽ thấy event trả về ok:true/false.
9) Nâng cấp “relay_apply” thành điều khiển thật (gắn với Bài 9)
Nếu relay module của bạn là LOW trigger:
from gpiozero import OutputDevice
relay = OutputDevice(17, active_high=False, initial_value=False)
def relay_apply(state: str, logger):
if state == "on":
relay.on()
else:
relay.off()
logger.info("relay applied | state=%s", state)


