侧边栏壁纸
博主头像
Zeeland

全栈算法工程师 | 大模型创业 | 开源项目分享 | Python开发者 | @Promptulate Founder | @SparkLab cofounder | @LangChainAI Top Contributor | @CogitLab core member

  • 累计撰写 61 篇文章
  • 累计创建 47 个标签
  • 累计收到 7 条评论

目 录CONTENT

文章目录

broadcast-service 一个轻量级Python发布订阅者框架

Zeeland
2022-11-22 / 0 评论 / 2 点赞 / 545 阅读 / 1,304 字

Introduction

前两天在Python最佳实践-构建自己的第三方库文章中介绍了自己构建的一个轻量级的Python发布订阅者框架,今天来简单介绍一下。

项目地址:

broadcast-service

broadcast-service是一个轻量级Python发布订阅者框架,事实上,我更愿意称其为广播模式,因为该框架更加的轻量级,你可以通过这个库轻松地构造发布订阅者模式,并且支持同步、异步、多主题订阅等不同场景下的模式建立。通过broadcast-service,只需要引入一个单例类,你就可以十分轻松地构建起一个发布订阅者模式,几乎没有代码侵入性。

主要特性

  1. 可以用十分简单的语法构建起一个发布订阅者模式
  2. 支持异步、同步、多主题订阅等不同的应用场景
  3. 提供lambda、回调函数、装饰器等不同的语法编写模式
  4. 支持同时订阅多个主题回调、同时向多个主题发布信息
  5. 文档完善 document

安装

执行以下语句即可安装该库。

pip install broadcast-service

快速上手

笔者还是推荐大家直接看文档,文档十分详细的介绍了快速上手过程: document

下面是一个简单的发布订阅者实现:

from broadcast_service import broadcast_service

def handle_msg():
    print('handle_msg callback')

if __name__ == '__main__':
    # listen topic
    broadcast_service.listen('Test', handle_msg)

    # publish broadcast
    broadcast_service.broadcast('Test')

你也可以采用装饰器的方式来进行主题的订阅。

from broadcast_service import broadcast_service


@broadcast_service.on_listen('my_topic')
def handle_decorator_msg(params):
    print(f"handle_decorator_msg receive params: {params}")


if __name__ == '__main__':
    info = 'This is very important msg'

    # subscribe topic
    broadcast_service.subscribe('my_topic', handle_msg)

    # publish broadcast
    broadcast_service.publish('my_topic', info)

如果你想要在发布topic的时候传入对应的参数,你只需要将参数传入broadcast(topic_name, params) params中。

from broadcast_service import broadcast_service

def handle_msg(params):
    print(params)

if __name__ == '__main__':
    info = 'This is very important msg'

    # listen topic
    broadcast_service.listen('Test', handle_msg)

    # publish broadcast
    broadcast_service.broadcast('Test', info)

from broadcast_service import broadcast_service

def handle_msg(info, info2):
    print(info)
    print(info2)

if __name__ == '__main__':
    info = 'This is very important msg'
    info2 = 'This is also a very important msg.'

    # listen topic
    broadcast_service.listen('Test', handle_msg)

    # publish broadcast
    broadcast_service.broadcast('Test', info, info2)

此外,broadcast-service默认支持异步回调的方式,避免了现成堵塞的发生,下面是一个形象的demo演示。

    One day, leader Tom arrive the company but find not one staff in company
  because all staff are playing outside. Therefor, Tom send a message
  everyone must must return to company now. Staff Jack, Jasmine, Jane go back
  now when they receive it. Actually, they need different time to go back
  in different places. When they return, they need to declare that they are back.
  ---
  有一天,领导Tom来到公司,却发现公司里没有一个员工,
  因为所有的员工都在外面玩耍。于是,Tom发了一条信息,
  现在每个人都必须回到公司。Jack,Jasmine,Jane必须现在回去
  事实上,当他们收到信息时,他们正处在不同的地方,每个人回
  去需要花费的时间不同,并且回来后他们需要声明他们回来了。
from broadcast_service import broadcast_service
import time
import random

class Application:
    """
    This demo aim to show how to use broadcast-service.
    Scene:
        One day, leader Tom arrive the company but find not one staff in company
        because all staff are playing outside. Therefor, leader Tom send a message
        everyone must must return to company now. Staff Jack, Jasmine, Jane go back
        now when they receive it. Actually, they need different time to go back
        in different places. When they return, they need to declare that they are back.
    Attention:
        broadcast-service is asynchronous by defalut. If you want to close the async
        state. You can use:
            broadcast_service.enable_async = False
        to close the async statement.

    """
    def __init__(self):
        self.leader = Leader('Tom')
        self.staff1 = Staff('Jack')
        self.staff2 = Staff('Jasmine')
        self.staff3 = Staff('Jane')
        self.current_time = None

    def run(self):
        self.current_time = time.time()
        self.leader.notice_go_back()

def print_time():
    return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

class Leader:
    def __init__(self, name):
        self.name = name

    def notice_go_back(self):
        print('[{1}] {0}(leader) notice meeting now.'.format(self.name, print_time()))
        meeting_info = 'You guys must go back now!'
        broadcast_service.broadcast('meeting', meeting_info)

class Staff:
    def __init__(self, name):
        self.name = name
        self.rec_msg()

    def rec_msg(self):
        broadcast_service.listen('meeting', self.go_back)

    def go_back(self, info):
        print("[{2}] {0}(staff) receive msg '{1}' and go back now.".format(self.name, info, print_time()))
        time.sleep(random.randint(1, 5))
        print('[{1}] {0}(staff) is back now.'.format(self.name, print_time()))

if __name__ == '__main__':
    app = Application()
    app.run()

事实上,从如下结果可以看到,该主体发布时,订阅者可以同时响应订阅的主题,而不会产生线程堵塞的问题。

后续更新计划

  1. 优化装饰器等场景下的语法表达能力
  2. 探索更多复杂的应用场景,为更多复杂场景提供解决方案
  3. 增加模糊订阅功能
  4. 增加单次订阅回调功能
2

评论区