IoTLabs

Nghiên cứu, Sáng tạo và Thử nghiệm

Series: Lập trình Raspberry Pi – Bài 19: MQTT client trên Raspberry Pi (publish/subscribe) + gửi telemetry lên broker

Series: Lập trình Raspberry Pi & Ứng dụng thực tế Phần 3 — Python “build app thật” Bài 19: MQTT client trên Raspberry Pi (publish/subscribe) + gửi telemetry lên broker mqtt.iotlabs.vn


1) Mục tiêu bài học

Sau bài này bạn sẽ:

  • Kết nối MQTT từ Raspberry Pi tới broker.
  • Publish telemetry định kỳ (nhiệt độ/độ ẩm).
  • Subscribe topic command để nhận lệnh từ xa.
  • Có format topic/payload “đủ chuẩn để mở rộng”.

Broker mặc định IoTLabs: mqtt.iotlabs.vn

2) Chuẩn topic & payload đề xuất (dùng luôn cho IoTLabs)

Topic

  • Telemetry: iotlabs/<device_id>/telemetry
  • Status (online/offline/heartbeat): iotlabs/<device_id>/status
  • Command (server → device): iotlabs/<device_id>/cmd

Ví dụ device_id = pi-gw-01:

  • iotlabs/pi-gw-01/telemetry
  • iotlabs/pi-gw-01/status
  • iotlabs/pi-gw-01/cmd

Payload JSON (gợi ý)

Telemetry:

{
  "ts": "2026-02-14T20:10:00",
  "device_id": "pi-gw-01",
  "metrics": {
    "temperature_c": 26.5,
    "humidity_pct": 70.2
  }
}

3) Cài thư viện MQTT cho Python

Trong project:

cd ~/apps/iotlabs-py-agent
source .venv/bin/activate
pip install paho-mqtt

4) Thêm config MQTT vào .env hoặc config.yaml

Option A: .env

Thêm vào .env:

MQTT_HOST=mqtt.iotlabs.vn
MQTT_PORT=8883
MQTT_TLS=true
MQTT_USERNAME=your_user
MQTT_PASSWORD=your_pass
MQTT_DEVICE_ID=pi-gw-01

Nếu bạn đang test local không TLS: set MQTT_PORT=1883 và MQTT_TLS=false.

5) Code: MQTT client (pub/sub + reconnect)

Tạo file src/mqtt_client.py:

nano src/mqtt_client.py

Dán code:

import json
import os
import time
from datetime import datetime
import ssl

import paho.mqtt.client as mqtt

def now_ts():
    return datetime.now().isoformat(timespec="seconds")

class MqttAgent:
    def __init__(self, logger):
        self.logger = logger
        self.host = os.getenv("MQTT_HOST", "mqtt.iotlabs.vn")
        self.port = int(os.getenv("MQTT_PORT", "8883"))
        self.tls  = os.getenv("MQTT_TLS", "true").lower() == "true"
        self.user = os.getenv("MQTT_USERNAME", "")
        self.pwd  = os.getenv("MQTT_PASSWORD", "")
        self.device_id = os.getenv("MQTT_DEVICE_ID", "pi-gw-01")

        self.t_telemetry = f"iotlabs/{self.device_id}/telemetry"
        self.t_status    = f"iotlabs/{self.device_id}/status"
        self.t_cmd       = f"iotlabs/{self.device_id}/cmd"

        self.client = mqtt.Client(client_id=f"pi-{self.device_id}", clean_session=True)
        if self.user:
            self.client.username_pw_set(self.user, self.pwd)

        if self.tls:
            self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
            self.client.tls_insecure_set(False)

        # callbacks
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message

        # auto reconnect backoff
        self.client.reconnect_delay_set(min_delay=1, max_delay=30)

    def on_connect(self, client, userdata, flags, rc):
        self.logger.info("mqtt connected | rc=%s | host=%s:%s", rc, self.host, self.port)
        # subscribe command topic
        client.subscribe(self.t_cmd, qos=1)
        # publish online status
        self.publish_status("online")

    def on_disconnect(self, client, userdata, rc):
        self.logger.warning("mqtt disconnected | rc=%s", rc)

    def on_message(self, client, userdata, msg):
        try:
            payload = msg.payload.decode("utf-8", errors="ignore")
            self.logger.info("cmd received | topic=%s | payload=%s", msg.topic, payload)

            # demo: command JSON {"action":"ping"} / {"action":"set_interval","sec":30}
            data = json.loads(payload) if payload and payload.strip().startswith("{") else {"raw": payload}
            action = data.get("action")

            if action == "ping":
                self.publish_status("pong")
            else:
                # bạn có thể map command vào control relay/OLED/... ở các bài trước
                self.logger.info("cmd handled | action=%s", action)
        except Exception as e:
            self.logger.warning("cmd parse error | %s", e)

    def connect(self):
        self.logger.info("mqtt connecting | %s:%s | tls=%s", self.host, self.port, self.tls)
        self.client.connect(self.host, self.port, keepalive=60)
        self.client.loop_start()

    def close(self):
        try:
            self.publish_status("offline")
        except Exception:
            pass
        self.client.loop_stop()
        self.client.disconnect()

    def publish_status(self, status: str):
        payload = {
            "ts": now_ts(),
            "device_id": self.device_id,
            "status": status
        }
        self.client.publish(self.t_status, json.dumps(payload), qos=1, retain=False)

    def publish_telemetry(self, temperature_c: float, humidity_pct: float):
        payload = {
            "ts": now_ts(),
            "device_id": self.device_id,
            "metrics": {
                "temperature_c": round(float(temperature_c), 2),
                "humidity_pct": round(float(humidity_pct), 2),
            }
        }
        self.client.publish(self.t_telemetry, json.dumps(payload), qos=1, retain=False)

6) Runner: publish telemetry định kỳ

Tạo file src/mqtt_runner.py:

nano src/mqtt_runner.py

Dán:

import os
import time
from dotenv import load_dotenv

from src.utils.logging import setup_logging
from src.mqtt_client import MqttAgent

def main():
    load_dotenv()

    app_name = os.getenv("APP_NAME", "iotlabs-py-agent")
    log_level = os.getenv("LOG_LEVEL", "INFO")
    log_dir = os.getenv("LOG_DIR", "logs")

    logger = setup_logging(app_name, log_dir, log_level)

    agent = MqttAgent(logger)
    agent.connect()

    try:
        interval = 10
        logger.info("mqtt runner started | interval=%ss", interval)

        # demo values (bài sau sẽ lấy từ BME280 thật)
        t = 26.5
        h = 70.2

        while True:
            agent.publish_telemetry(t, h)
            logger.info("telemetry published | T=%s | H=%s", t, h)

            time.sleep(interval)
    except KeyboardInterrupt:
        logger.info("stopping...")
    finally:
        agent.close()

if __name__ == "__main__":
    main()

Chạy:

source .venv/bin/activate
python -m src.mqtt_runner

7) Test subscribe bằng MQTTX (hoặc tool khác)

Subscribe:

  • iotlabs/pi-gw-01/telemetry
  • iotlabs/pi-gw-01/status
  • iotlabs/pi-gw-01/cmd

Publish command thử:

  • Topic: iotlabs/pi-gw-01/cmd
  • Payload:
{"action":"ping"}

8) Best practices nhanh

  • QoS: telemetry thường QoS 1 đủ tốt
  • Không publish quá dày (1–10s tuỳ use-case)
  • Topic có device_id để scale
  • Status/heartbeat giúp dashboard biết device online/offline

9) Bài tập nâng cấp

  1. Lấy dữ liệu thật từ BME280 (Bài 8) thay giá trị demo.
  2. Thêm “last will” để tự báo offline khi mất điện.
  3. Chạy mqtt_runner bằng systemd service (như Bài 17).