问题
You have a program based on communicating threads and want them to implementpublish/subscribe messaging.
解决方案
To implement publish/subscribe messaging, you typically introduce a separate “ex‐change” or “gateway” object that acts as an intermediary for all messages. That is, insteadof directly sending a message from one task to another, a message is sent to the exchangeand it delivers it to one or more attached tasks. Here is one example of a very simpleexchange implementation:
from collections import defaultdict
class Exchange:def init(self):self._subscribers = set()def attach(self, task):self._subscribers.add(task)def detach(self, task):self._subscribers.remove(task)def send(self, msg):for subscriber in self._subscribers:subscriber.send(msg)
Finally, it should be noted that there are numerous possible extensions to the exchangeidea. For example, exchanges could implement an entire collection of message channels
or apply pattern matching rules to exchange names. Exchanges can also be extendedinto distributed computing applications (e.g., routing messages to tasks on differentmachines, etc.).