12.11 实现消息发布/订阅模型

问题

You have a program based on communicating threads and want them to implementpublish/subscribe messaging.

解决方案

To implement publish/subscribe messaging, you typically introduce a separate “ex‐change” or “gateway” object that acts as an intermediary for all messages. That is, insteadof directly sending a message from one task to another, a message is sent to the exchangeand it delivers it to one or more attached tasks. Here is one example of a very simpleexchange implementation:

from collections import defaultdict

class Exchange:def init(self):self._subscribers = set()def attach(self, task):self._subscribers.add(task)def detach(self, task):self._subscribers.remove(task)def send(self, msg):for subscriber in self._subscribers:subscriber.send(msg)

Finally, it should be noted that there are numerous possible extensions to the exchangeidea. For example, exchanges could implement an entire collection of message channels

or apply pattern matching rules to exchange names. Exchanges can also be extendedinto distributed computing applications (e.g., routing messages to tasks on differentmachines, etc.).

文章导航