Series: Lập trình Raspberry Pi & Ứng dụng thực tế Phần 3 — Python “build app thật” Bài 20: MQTT “chuẩn production” — Last Will, Retained, Session, QoS, Reconnect, Offline queue
1) Mục tiêu bài học
Sau bài này bạn sẽ nâng MQTT client (Bài 19) lên mức chạy 24/7:
- Có Last Will để báo offline khi mất điện/mất mạng.
- Dùng retained cho status (dashboard mở lên là thấy ngay).
- Chọn QoS đúng cho telemetry/command.
- Xử lý reconnect ổn định.
- Có offline queue (mất mạng vẫn xếp hàng, mạng lên gửi lại).
2) Quy ước topic (giữ như Bài 19)
- iotlabs/<device_id>/telemetry
- iotlabs/<device_id>/status
- iotlabs/<device_id>/cmd
Khuyến nghị thêm 1 topic “meta”:
- iotlabs/<device_id>/meta (firmware/app version, capabilities)
3) Last Will (LWT): “mất là báo offline”
Ý tưởng: trước khi connect, set will message:
- Topic: …/status
- Payload: {“status”:”offline”, …}
- qos=1, retain=True (khuyến nghị)
Khi Pi mất điện hoặc TCP chết, broker tự publish will → hệ thống biết device offline.
4) Retained: status luôn “hiện trạng mới nhất”
- Status: dùng retain=True → app/dashboard subscribe sẽ nhận ngay trạng thái hiện tại.
- Telemetry: thường retain=False → tránh giữ “giá trị cũ” làm hiểu nhầm.
5) QoS chọn thế nào?
- telemetry: QoS 1 (đủ tin cậy, không quá nặng)
- status: QoS 1 + retained
- command: QoS 1 (đảm bảo lệnh tới nơi)
- Tránh QoS 2 trừ khi thật sự cần (tốn overhead).
6) Session: clean_session vs persistent session
- clean_session=True (mặc định): đơn giản, mỗi lần reconnect subscribe lại.
- Persistent session (clean_session=False) có lợi khi:
- bạn muốn broker giữ subscription và queue message khi device offline
- nhưng cần quản lý session state chặt hơn
Trong bài này: vẫn dùng clean_session=True + code subscribe lại khi connect (an toàn, dễ).
7) Nâng cấp code MQTT client (Bài 19)
Mở src/mqtt_client.py và chỉnh theo bản “production” dưới đây.
7.1 Thêm offline queue + LWT + retained status
Thay/merge class MqttAgent thành phiên bản sau (đoạn chính, bạn có thể copy đè phần class):
import json
import os
import time
from datetime import datetime
import ssl
from collections import deque
import paho.mqtt.client as mqtt
def now_ts():
return datetime.now().isoformat(timespec="seconds")
class MqttAgent:
def __init__(self, logger):
self.logger = logger
self.host = os.getenv("MQTT_HOST", "mqtt.iotlabs.vn")
self.port = int(os.getenv("MQTT_PORT", "8883"))
self.tls = os.getenv("MQTT_TLS", "true").lower() == "true"
self.user = os.getenv("MQTT_USERNAME", "")
self.pwd = os.getenv("MQTT_PASSWORD", "")
self.device_id = os.getenv("MQTT_DEVICE_ID", "pi-gw-01")
self.t_telemetry = f"iotlabs/{self.device_id}/telemetry"
self.t_status = f"iotlabs/{self.device_id}/status"
self.t_cmd = f"iotlabs/{self.device_id}/cmd"
self.t_meta = f"iotlabs/{self.device_id}/meta"
self.connected = False
# offline queue: giữ tối đa N message
self.queue = deque(maxlen=int(os.getenv("MQTT_QUEUE_MAX", "500")))
self.client = mqtt.Client(
client_id=f"pi-{self.device_id}",
clean_session=True
)
if self.user:
self.client.username_pw_set(self.user, self.pwd)
if self.tls:
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
self.client.tls_insecure_set(False)
# LWT: nếu mất kết nối bất thường -> broker publish offline
will_payload = json.dumps({
"ts": now_ts(),
"device_id": self.device_id,
"status": "offline",
"reason": "lwt"
})
self.client.will_set(self.t_status, will_payload, qos=1, retain=True)
# callbacks
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# reconnect backoff
self.client.reconnect_delay_set(min_delay=1, max_delay=30)
# paho internal queue (giúp không mất publish khi reconnect)
self.client.max_queued_messages_set(0) # 0 = unlimited in paho internal
self.client.max_inflight_messages_set(20)
def on_connect(self, client, userdata, flags, rc):
self.connected = True
self.logger.info("mqtt connected | rc=%s | host=%s:%s", rc, self.host, self.port)
# subscribe cmd
client.subscribe(self.t_cmd, qos=1)
# publish online (retained)
self.publish_status("online", retain=True)
# publish meta (retained)
self.publish_meta(retain=True)
# flush offline queue
self.flush_queue()
def on_disconnect(self, client, userdata, rc):
self.connected = False
self.logger.warning("mqtt disconnected | rc=%s", rc)
def on_message(self, client, userdata, msg):
payload = msg.payload.decode("utf-8", errors="ignore")
self.logger.info("cmd received | topic=%s | payload=%s", msg.topic, payload)
# demo: {"action":"ping"} -> trả pong
try:
data = json.loads(payload) if payload.strip().startswith("{") else {"raw": payload}
if data.get("action") == "ping":
self.publish_status("pong", retain=False)
except Exception as e:
self.logger.warning("cmd parse error | %s", e)
def connect(self):
self.logger.info("mqtt connecting | %s:%s | tls=%s", self.host, self.port, self.tls)
self.client.connect(self.host, self.port, keepalive=60)
self.client.loop_start()
def close(self):
try:
# offline graceful (retained)
self.publish_status("offline", retain=True, reason="graceful")
except Exception:
pass
self.client.loop_stop()
self.client.disconnect()
# ---- publish helpers ----
def publish_status(self, status: str, retain: bool = True, reason: str = ""):
payload = {"ts": now_ts(), "device_id": self.device_id, "status": status}
if reason:
payload["reason"] = reason
self._publish(self.t_status, payload, qos=1, retain=retain)
def publish_meta(self, retain: bool = True):
payload = {
"ts": now_ts(),
"device_id": self.device_id,
"app": os.getenv("APP_NAME", "iotlabs-py-agent"),
"version": os.getenv("APP_VERSION", "1.0.0"),
"capabilities": ["telemetry", "cmd"]
}
self._publish(self.t_meta, payload, qos=1, retain=retain)
def publish_telemetry(self, temperature_c: float, humidity_pct: float):
payload = {
"ts": now_ts(),
"device_id": self.device_id,
"metrics": {
"temperature_c": round(float(temperature_c), 2),
"humidity_pct": round(float(humidity_pct), 2),
}
}
# telemetry thường không retained
self._publish(self.t_telemetry, payload, qos=1, retain=False)
def _publish(self, topic: str, payload_obj: dict, qos: int, retain: bool):
msg = json.dumps(payload_obj, ensure_ascii=False)
if not self.connected:
# queue lại khi offline
self.queue.append((topic, msg, qos, retain))
self.logger.info("mqtt queued | topic=%s | queued=%s", topic, len(self.queue))
return
self.client.publish(topic, msg, qos=qos, retain=retain)
def flush_queue(self):
if not self.queue:
return
self.logger.info("mqtt flush queue | size=%s", len(self.queue))
while self.queue:
topic, msg, qos, retain = self.queue.popleft()
self.client.publish(topic, msg, qos=qos, retain=retain)
8) Nâng runner: publish theo interval từ config
Trong .env thêm:
MQTT_QUEUE_MAX=500
TELEMETRY_INTERVAL_SEC=10
APP_VERSION=1.0.0
Sửa src/mqtt_runner.py:
- lấy interval từ env
- nếu mất mạng: vẫn chạy, data sẽ queue lại
Gợi ý đoạn loop:
interval = int(os.getenv("TELEMETRY_INTERVAL_SEC", "10"))
...
while True:
agent.publish_telemetry(t, h)
time.sleep(interval)
9) Test “mất mạng” & kiểm tra queue flush
- Chạy mqtt_runner
- Tắt mạng (rút LAN / tắt Wi-Fi) 30–60s
- Bật mạng lại
- Subscribe topic telemetry, bạn sẽ thấy message “dồn” được gửi lại (tuỳ queue max).
Nếu bạn muốn “không dồn” mà chỉ gửi latest: queue max nhỏ (ví dụ 20) hoặc ghi DB local rồi gửi batch sau.
10) Bài tập nâng cấp
- Thêm command: {“action”:”set_interval”,”sec”:30} để đổi interval runtime.
- Dùng retained status để dashboard biết online/offline ngay khi mở app.
- Chạy mqtt runner thành systemd service iotlabs-mqtt.service.


