Series: Lập trình Raspberry Pi & Ứng dụng thực tế Phần 3 — Python “build app thật” Bài 19: MQTT client trên Raspberry Pi (publish/subscribe) + gửi telemetry lên broker mqtt.iotlabs.vn
1) Mục tiêu bài học
Sau bài này bạn sẽ:
- Kết nối MQTT từ Raspberry Pi tới broker.
- Publish telemetry định kỳ (nhiệt độ/độ ẩm).
- Subscribe topic command để nhận lệnh từ xa.
- Có format topic/payload “đủ chuẩn để mở rộng”.
Broker mặc định IoTLabs: mqtt.iotlabs.vn ✅
2) Chuẩn topic & payload đề xuất (dùng luôn cho IoTLabs)
Topic
- Telemetry: iotlabs/<device_id>/telemetry
- Status (online/offline/heartbeat): iotlabs/<device_id>/status
- Command (server → device): iotlabs/<device_id>/cmd
Ví dụ device_id = pi-gw-01:
- iotlabs/pi-gw-01/telemetry
- iotlabs/pi-gw-01/status
- iotlabs/pi-gw-01/cmd
Payload JSON (gợi ý)
Telemetry:
{
"ts": "2026-02-14T20:10:00",
"device_id": "pi-gw-01",
"metrics": {
"temperature_c": 26.5,
"humidity_pct": 70.2
}
}
3) Cài thư viện MQTT cho Python
Trong project:
cd ~/apps/iotlabs-py-agent
source .venv/bin/activate
pip install paho-mqtt
4) Thêm config MQTT vào .env hoặc config.yaml
Option A: .env
Thêm vào .env:
MQTT_HOST=mqtt.iotlabs.vn
MQTT_PORT=8883
MQTT_TLS=true
MQTT_USERNAME=your_user
MQTT_PASSWORD=your_pass
MQTT_DEVICE_ID=pi-gw-01
Nếu bạn đang test local không TLS: set MQTT_PORT=1883 và MQTT_TLS=false.
5) Code: MQTT client (pub/sub + reconnect)
Tạo file src/mqtt_client.py:
nano src/mqtt_client.py
Dán code:
import json
import os
import time
from datetime import datetime
import ssl
import paho.mqtt.client as mqtt
def now_ts():
return datetime.now().isoformat(timespec="seconds")
class MqttAgent:
def __init__(self, logger):
self.logger = logger
self.host = os.getenv("MQTT_HOST", "mqtt.iotlabs.vn")
self.port = int(os.getenv("MQTT_PORT", "8883"))
self.tls = os.getenv("MQTT_TLS", "true").lower() == "true"
self.user = os.getenv("MQTT_USERNAME", "")
self.pwd = os.getenv("MQTT_PASSWORD", "")
self.device_id = os.getenv("MQTT_DEVICE_ID", "pi-gw-01")
self.t_telemetry = f"iotlabs/{self.device_id}/telemetry"
self.t_status = f"iotlabs/{self.device_id}/status"
self.t_cmd = f"iotlabs/{self.device_id}/cmd"
self.client = mqtt.Client(client_id=f"pi-{self.device_id}", clean_session=True)
if self.user:
self.client.username_pw_set(self.user, self.pwd)
if self.tls:
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
self.client.tls_insecure_set(False)
# callbacks
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# auto reconnect backoff
self.client.reconnect_delay_set(min_delay=1, max_delay=30)
def on_connect(self, client, userdata, flags, rc):
self.logger.info("mqtt connected | rc=%s | host=%s:%s", rc, self.host, self.port)
# subscribe command topic
client.subscribe(self.t_cmd, qos=1)
# publish online status
self.publish_status("online")
def on_disconnect(self, client, userdata, rc):
self.logger.warning("mqtt disconnected | rc=%s", rc)
def on_message(self, client, userdata, msg):
try:
payload = msg.payload.decode("utf-8", errors="ignore")
self.logger.info("cmd received | topic=%s | payload=%s", msg.topic, payload)
# demo: command JSON {"action":"ping"} / {"action":"set_interval","sec":30}
data = json.loads(payload) if payload and payload.strip().startswith("{") else {"raw": payload}
action = data.get("action")
if action == "ping":
self.publish_status("pong")
else:
# bạn có thể map command vào control relay/OLED/... ở các bài trước
self.logger.info("cmd handled | action=%s", action)
except Exception as e:
self.logger.warning("cmd parse error | %s", e)
def connect(self):
self.logger.info("mqtt connecting | %s:%s | tls=%s", self.host, self.port, self.tls)
self.client.connect(self.host, self.port, keepalive=60)
self.client.loop_start()
def close(self):
try:
self.publish_status("offline")
except Exception:
pass
self.client.loop_stop()
self.client.disconnect()
def publish_status(self, status: str):
payload = {
"ts": now_ts(),
"device_id": self.device_id,
"status": status
}
self.client.publish(self.t_status, json.dumps(payload), qos=1, retain=False)
def publish_telemetry(self, temperature_c: float, humidity_pct: float):
payload = {
"ts": now_ts(),
"device_id": self.device_id,
"metrics": {
"temperature_c": round(float(temperature_c), 2),
"humidity_pct": round(float(humidity_pct), 2),
}
}
self.client.publish(self.t_telemetry, json.dumps(payload), qos=1, retain=False)
6) Runner: publish telemetry định kỳ
Tạo file src/mqtt_runner.py:
nano src/mqtt_runner.py
Dán:
import os
import time
from dotenv import load_dotenv
from src.utils.logging import setup_logging
from src.mqtt_client import MqttAgent
def main():
load_dotenv()
app_name = os.getenv("APP_NAME", "iotlabs-py-agent")
log_level = os.getenv("LOG_LEVEL", "INFO")
log_dir = os.getenv("LOG_DIR", "logs")
logger = setup_logging(app_name, log_dir, log_level)
agent = MqttAgent(logger)
agent.connect()
try:
interval = 10
logger.info("mqtt runner started | interval=%ss", interval)
# demo values (bài sau sẽ lấy từ BME280 thật)
t = 26.5
h = 70.2
while True:
agent.publish_telemetry(t, h)
logger.info("telemetry published | T=%s | H=%s", t, h)
time.sleep(interval)
except KeyboardInterrupt:
logger.info("stopping...")
finally:
agent.close()
if __name__ == "__main__":
main()
Chạy:
source .venv/bin/activate
python -m src.mqtt_runner
7) Test subscribe bằng MQTTX (hoặc tool khác)
Subscribe:
- iotlabs/pi-gw-01/telemetry
- iotlabs/pi-gw-01/status
- iotlabs/pi-gw-01/cmd
Publish command thử:
- Topic: iotlabs/pi-gw-01/cmd
- Payload:
{"action":"ping"}
8) Best practices nhanh
- QoS: telemetry thường QoS 1 đủ tốt
- Không publish quá dày (1–10s tuỳ use-case)
- Topic có device_id để scale
- Status/heartbeat giúp dashboard biết device online/offline
9) Bài tập nâng cấp
- Lấy dữ liệu thật từ BME280 (Bài 8) thay giá trị demo.
- Thêm “last will” để tự báo offline khi mất điện.
- Chạy mqtt_runner bằng systemd service (như Bài 17).


