侧边栏壁纸
博主头像
Zeeland

全栈算法工程师 | 大模型创业 | 开源项目分享 | Python开发者 | @Promptulate Founder | @SparkLab cofounder | @LangChainAI Top Contributor | @CogitLab core member

  • 累计撰写 61 篇文章
  • 累计创建 47 个标签
  • 累计收到 7 条评论

目 录CONTENT

文章目录

flask+mqtt快速上手

Zeeland
2022-12-29 / 0 评论 / 0 点赞 / 536 阅读 / 1,328 字

简介

本文旨在介绍如何快速上手联动flask + mqtt,本文将会给出一个简单的demo,用于演示在如何通过访问flask接口来触发mqtt,并在flask运行的基础的上对mqtt进行订阅。

快速上手

因为有项目需求,所以需要flask + mqtt进行联动,因为需要一直开启监听,所以需要一直挂在一个线程上,一开始想到用多线程做,或者说用异步协程来做,后来发现有一个关于flask的mqtt扩展库,因此为了快速上手,直接用该库进行开发,可以节省很多精力。

link: https://github.com/stlehmann/Flask-MQTT

首先,我们需要安装flask-mqtt库。

pip install flask-mqtt

下面放上一个简单的示例代码,里面包括了MQTT的订阅、发布,flask的访问。

"""  
A small Test application to show how to use Flask-MQTT.  
"""  
  
import eventlet  
from flask import Flask, render_template  
from flask_mqtt import Mqtt  
  
  
eventlet.monkey_patch()  
  
app = Flask(__name__)  
app.config['SECRET'] = 'my secret key'  
app.config['TEMPLATES_AUTO_RELOAD'] = True  
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'  
app.config['MQTT_BROKER_PORT'] = 1883  
app.config['MQTT_USERNAME'] = ''  
app.config['MQTT_PASSWORD'] = ''  
app.config['MQTT_KEEPALIVE'] = 5  
app.config['MQTT_TLS_ENABLED'] = False  
app.config['MQTT_CLEAN_SESSION'] = True  
  
  
mqtt = Mqtt(app)  
  
  
@app.route('/')  
def index():  
    return "hello mqtt_flask"  
  
@app.route('/hello')  
def hello():  
    mqtt.publish('hello', 'hello, this is flask')  
    print("[mqtt] publish successfully")  
    return "publish successfully"  
  
@mqtt.on_connect()  
def handle_connect(client, userdata, flags, rc):  
    mqtt.subscribe('hello')  
    print("[mqtt] has listen topic hello")  
  
  
@mqtt.on_message()  
def handle_mqtt_message(client, userdata, message):  
    data = dict(  
        topic=message.topic,  
        payload=message.payload.decode()  
    )  
    print(data)  
  
if __name__ == '__main__':  
    # important: Do not use reloader because this will create two Flask instances.  
    # Flask-MQTT only supports running with one instance    # socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=False)    app.run()

订阅测试

这里用了MQTTX工具来进行测试,现在我已经连接到了一个MQTT服务器上,我将用MQTTX向该服务器发送一个topic为hello,payload为{"msg": "Hello, This is mqtt server"}的消息。

运行程序后,MQTTX发送消息,结果如下,可以看到,接收正常。

发布测试

现在我们访问http://localhost:5000/hello,flask服务器将会向我的MQTT服务器发送一个topic为hello,payload为"hello, this is flask"的消息,我们用MQTTX来监听。

从下图中可以看到,MQTTX成功订阅接收到了该消息,至此,关于FLASK+MQTT的必要流程已经可以走通了。

进一步测试

最初,笔者以为启动mqtt_client.loop_start()函数之后会线程堵塞,但是在使用 Paho MQTT Python 库时,调用 client.loop_start() 方法会在后台启动一个新线程,用于处理 MQTT 客户端的事件循环。这意味着您的代码不会堵塞,并且可以在 MQTT 客户端的事件循环运行时继续执行。

例如,您可以在调用 client.loop_start() 后使用以下代码继续执行其他操作:

import paho.mqtt.client as mqtt

# The callback for when a message is received from the server
def on_message(client, userdata, message):
    print(f"Received message '{message.payload.decode()}' on topic '{message.topic}'")

# Create an MQTT client
client = mqtt.Client()

# Set the on_message callback
client.on_message = on_message

# Connect to the MQTT server
client.connect("mqtt.example.com", 1883, 60)

# Subscribe to a topic
client.subscribe("my/topic")

# Start the MQTT client loop in the background
client.loop_start()

# Do other work here
print("Doing other work")

# Wait for a while
time.sleep(10)

# Stop the MQTT client loop
client.loop_stop()

在上面的代码中,我们在调用 client.loop_start() 后立即执行了其他代码,然后等待了 10 秒钟。在这期间,MQTT 客户端的事件循环仍在后台运行,并等待接收消息。

基于此,那么flask和mqtt联动其实不用上述的扩展库也可以,笔者在paho官方的demo上小改一下,最后结果如下所示:

"""
访问 localhost:5000/hello时,会用mqtt客户端发布主题消息
"""
from flask import Flask, jsonify  
import paho.mqtt.client as mqtt  
import sys  
  
  
app = Flask(__name__)  
print('[app] start work', file=sys.stdout)  
  
  
def connect_mqtt() -> mqtt:  
    def on_connect(client, userdata, flags, rc):  
        if rc == 0:  
            print("Connected to MQTT Broker!", file=sys.stdout)  
        else:  
            print("Failed to connect, return code %d\n", rc, file=sys.stdout)  
  
    client = mqtt.Client()  
    client.username_pw_set("", "")  
    client.on_connect = on_connect  
    client.connect("broker.emqx.io", 1883, 60)  
    return client  
  
  
def subscribe(client: mqtt, topic):  
    def on_message(client, userdata, msg):  
        print(f"[mqtt] Received `{msg.payload.decode()}` from `{msg.topic}` topic", file=sys.stdout)  
  
    client.on_message = on_message  
    client.subscribe(topic)  
    print("[mqtt] subscribe topic")  
  
  
client = connect_mqtt()  
subscribe(client, "hello")  
client.loop_start()
  
@app.route("/hello")  
def alarm():  
    client.publish("hello", "important msg")  
    print("[mqtt] send msg successfully", file=sys.stdout)  
    return "Mqtt message published"  
  
  
if __name__ == '__main__':  
    app.run()

该代码经本人实际测试,也是可以正常食用的。需要说明的是,如果你只是需要发布而不需要监听,那么client.loop_start()不是必要的。

References

0

评论区