IoTLabs

Nghiên cứu, Sáng tạo và Thử nghiệm

Series: Lập trình Raspberry Pi – Bài 20: MQTT “chuẩn production” — Last Will, Retained, Session, QoS, Reconnect, Offline queue

Series: Lập trình Raspberry Pi & Ứng dụng thực tế Phần 3 — Python “build app thật” Bài 20: MQTT “chuẩn production” — Last Will, Retained, Session, QoS, Reconnect, Offline queue


1) Mục tiêu bài học

Sau bài này bạn sẽ nâng MQTT client (Bài 19) lên mức chạy 24/7:

  • Last Will để báo offline khi mất điện/mất mạng.
  • Dùng retained cho status (dashboard mở lên là thấy ngay).
  • Chọn QoS đúng cho telemetry/command.
  • Xử lý reconnect ổn định.
  • offline queue (mất mạng vẫn xếp hàng, mạng lên gửi lại).

2) Quy ước topic (giữ như Bài 19)

  • iotlabs/<device_id>/telemetry
  • iotlabs/<device_id>/status
  • iotlabs/<device_id>/cmd

Khuyến nghị thêm 1 topic “meta”:

  • iotlabs/<device_id>/meta (firmware/app version, capabilities)

3) Last Will (LWT): “mất là báo offline”

Ý tưởng: trước khi connect, set will message:

  • Topic: …/status
  • Payload: {“status”:”offline”, …}
  • qos=1, retain=True (khuyến nghị)

Khi Pi mất điện hoặc TCP chết, broker tự publish will → hệ thống biết device offline.

4) Retained: status luôn “hiện trạng mới nhất”

  • Status: dùng retain=True → app/dashboard subscribe sẽ nhận ngay trạng thái hiện tại.
  • Telemetry: thường retain=False → tránh giữ “giá trị cũ” làm hiểu nhầm.

5) QoS chọn thế nào?

  • telemetry: QoS 1 (đủ tin cậy, không quá nặng)
  • status: QoS 1 + retained
  • command: QoS 1 (đảm bảo lệnh tới nơi)
  • Tránh QoS 2 trừ khi thật sự cần (tốn overhead).

6) Session: clean_session vs persistent session

  • clean_session=True (mặc định): đơn giản, mỗi lần reconnect subscribe lại.
  • Persistent session (clean_session=False) có lợi khi:
    • bạn muốn broker giữ subscription và queue message khi device offline
    • nhưng cần quản lý session state chặt hơn

Trong bài này: vẫn dùng clean_session=True + code subscribe lại khi connect (an toàn, dễ).

7) Nâng cấp code MQTT client (Bài 19)

Mở src/mqtt_client.py và chỉnh theo bản “production” dưới đây.

7.1 Thêm offline queue + LWT + retained status

Thay/merge class MqttAgent thành phiên bản sau (đoạn chính, bạn có thể copy đè phần class):

import json
import os
import time
from datetime import datetime
import ssl
from collections import deque

import paho.mqtt.client as mqtt

def now_ts():
    return datetime.now().isoformat(timespec="seconds")

class MqttAgent:
    def __init__(self, logger):
        self.logger = logger
        self.host = os.getenv("MQTT_HOST", "mqtt.iotlabs.vn")
        self.port = int(os.getenv("MQTT_PORT", "8883"))
        self.tls  = os.getenv("MQTT_TLS", "true").lower() == "true"
        self.user = os.getenv("MQTT_USERNAME", "")
        self.pwd  = os.getenv("MQTT_PASSWORD", "")
        self.device_id = os.getenv("MQTT_DEVICE_ID", "pi-gw-01")

        self.t_telemetry = f"iotlabs/{self.device_id}/telemetry"
        self.t_status    = f"iotlabs/{self.device_id}/status"
        self.t_cmd       = f"iotlabs/{self.device_id}/cmd"
        self.t_meta      = f"iotlabs/{self.device_id}/meta"

        self.connected = False

        # offline queue: giữ tối đa N message
        self.queue = deque(maxlen=int(os.getenv("MQTT_QUEUE_MAX", "500")))

        self.client = mqtt.Client(
            client_id=f"pi-{self.device_id}",
            clean_session=True
        )

        if self.user:
            self.client.username_pw_set(self.user, self.pwd)

        if self.tls:
            self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
            self.client.tls_insecure_set(False)

        # LWT: nếu mất kết nối bất thường -> broker publish offline
        will_payload = json.dumps({
            "ts": now_ts(),
            "device_id": self.device_id,
            "status": "offline",
            "reason": "lwt"
        })
        self.client.will_set(self.t_status, will_payload, qos=1, retain=True)

        # callbacks
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message

        # reconnect backoff
        self.client.reconnect_delay_set(min_delay=1, max_delay=30)

        # paho internal queue (giúp không mất publish khi reconnect)
        self.client.max_queued_messages_set(0)  # 0 = unlimited in paho internal
        self.client.max_inflight_messages_set(20)

    def on_connect(self, client, userdata, flags, rc):
        self.connected = True
        self.logger.info("mqtt connected | rc=%s | host=%s:%s", rc, self.host, self.port)

        # subscribe cmd
        client.subscribe(self.t_cmd, qos=1)

        # publish online (retained)
        self.publish_status("online", retain=True)

        # publish meta (retained)
        self.publish_meta(retain=True)

        # flush offline queue
        self.flush_queue()

    def on_disconnect(self, client, userdata, rc):
        self.connected = False
        self.logger.warning("mqtt disconnected | rc=%s", rc)

    def on_message(self, client, userdata, msg):
        payload = msg.payload.decode("utf-8", errors="ignore")
        self.logger.info("cmd received | topic=%s | payload=%s", msg.topic, payload)

        # demo: {"action":"ping"} -> trả pong
        try:
            data = json.loads(payload) if payload.strip().startswith("{") else {"raw": payload}
            if data.get("action") == "ping":
                self.publish_status("pong", retain=False)
        except Exception as e:
            self.logger.warning("cmd parse error | %s", e)

    def connect(self):
        self.logger.info("mqtt connecting | %s:%s | tls=%s", self.host, self.port, self.tls)
        self.client.connect(self.host, self.port, keepalive=60)
        self.client.loop_start()

    def close(self):
        try:
            # offline graceful (retained)
            self.publish_status("offline", retain=True, reason="graceful")
        except Exception:
            pass
        self.client.loop_stop()
        self.client.disconnect()

    # ---- publish helpers ----
    def publish_status(self, status: str, retain: bool = True, reason: str = ""):
        payload = {"ts": now_ts(), "device_id": self.device_id, "status": status}
        if reason:
            payload["reason"] = reason
        self._publish(self.t_status, payload, qos=1, retain=retain)

    def publish_meta(self, retain: bool = True):
        payload = {
            "ts": now_ts(),
            "device_id": self.device_id,
            "app": os.getenv("APP_NAME", "iotlabs-py-agent"),
            "version": os.getenv("APP_VERSION", "1.0.0"),
            "capabilities": ["telemetry", "cmd"]
        }
        self._publish(self.t_meta, payload, qos=1, retain=retain)

    def publish_telemetry(self, temperature_c: float, humidity_pct: float):
        payload = {
            "ts": now_ts(),
            "device_id": self.device_id,
            "metrics": {
                "temperature_c": round(float(temperature_c), 2),
                "humidity_pct": round(float(humidity_pct), 2),
            }
        }
        # telemetry thường không retained
        self._publish(self.t_telemetry, payload, qos=1, retain=False)

    def _publish(self, topic: str, payload_obj: dict, qos: int, retain: bool):
        msg = json.dumps(payload_obj, ensure_ascii=False)

        if not self.connected:
            # queue lại khi offline
            self.queue.append((topic, msg, qos, retain))
            self.logger.info("mqtt queued | topic=%s | queued=%s", topic, len(self.queue))
            return

        self.client.publish(topic, msg, qos=qos, retain=retain)

    def flush_queue(self):
        if not self.queue:
            return
        self.logger.info("mqtt flush queue | size=%s", len(self.queue))
        while self.queue:
            topic, msg, qos, retain = self.queue.popleft()
            self.client.publish(topic, msg, qos=qos, retain=retain)

8) Nâng runner: publish theo interval từ config

Trong .env thêm:

MQTT_QUEUE_MAX=500
TELEMETRY_INTERVAL_SEC=10
APP_VERSION=1.0.0

Sửa src/mqtt_runner.py:

  • lấy interval từ env
  • nếu mất mạng: vẫn chạy, data sẽ queue lại

Gợi ý đoạn loop:

interval = int(os.getenv("TELEMETRY_INTERVAL_SEC", "10"))
...
while True:
    agent.publish_telemetry(t, h)
    time.sleep(interval)

9) Test “mất mạng” & kiểm tra queue flush

  1. Chạy mqtt_runner
  2. Tắt mạng (rút LAN / tắt Wi-Fi) 30–60s
  3. Bật mạng lại
  4. Subscribe topic telemetry, bạn sẽ thấy message “dồn” được gửi lại (tuỳ queue max).

Nếu bạn muốn “không dồn” mà chỉ gửi latest: queue max nhỏ (ví dụ 20) hoặc ghi DB local rồi gửi batch sau.


10) Bài tập nâng cấp

  1. Thêm command: {“action”:”set_interval”,”sec”:30} để đổi interval runtime.
  2. Dùng retained status để dashboard biết online/offline ngay khi mở app.
  3. Chạy mqtt runner thành systemd service iotlabs-mqtt.service.