12.3 线程间的通信

问题

You have multiple threads in your program and you want to safely communicate orexchange data between them.

解决方案

Perhaps the safest way to send data from one thread to another is to use a Queue fromthe queue library. To do this, you create a Queue instance that is shared by the threads.Threads then use put() or get() operations to add or remove items from the queue.For example:

from queue import Queuefrom threading import Thread

while True:# Get some datadata = in_q.get()# Process the data...

Queue objects provide a few additional features that may prove to be useful in certaincontexts. If you create a Queue with an optional size, such as Queue(N), it places a limiton the number of items that can be enqueued before the put() blocks the producer.Adding an upper bound to a queue might make sense if there is mismatch in speedbetween a producer and consumer. For instance, if a producer is generating items at amuch faster rate than they can be consumed. On the other hand, making a queue blockwhen it’s full can also have an unintended cascading effect throughout your program,possibly causing it to deadlock or run poorly. In general, the problem of “flow control”between communicating threads is a much harder problem than it seems. If you everfind yourself trying to fix a problem by fiddling with queue sizes, it could be an indicatorof a fragile design or some other inherent scaling problem.Both the get() and put() methods support nonblocking and timeouts. For example:

import queueq = queue.Queue()

try:data = q.get(block=False)except queue.Empty:...try:q.put(item, block=False)except queue.Full:...try:data = q.get(timeout=5.0)except queue.Empty:...
Both of these options can be used to avoid the problem of just blocking indefinitely ona particular queuing operation. For example, a nonblocking put() could be used witha fixed-sized queue to implement different kinds of handling code for when a queue isfull. For example, issuing a log message and discarding:

def producer(q):
...try:

q.put(item, block=False)

except queue.Full:log.warning(‘queued item %r discarded!", item)
A timeout is useful if you’re trying to make consumer threads periodically give up onoperations such as q.get() so that they can check things such as a termination flag, asdescribed in Recipe 12.1.

_running = True

def consumer(q):while _running:try:item = q.get(timeout=5.0)# Process item...except queue.Empty:pass
Lastly, there are utility methods q.qsize(), q.full(), q.empty() that can tell you thecurrent size and status of the queue. However, be aware that all of these are unreliablein a multithreaded environment. For example, a call to q.empty() might tell you thatthe queue is empty, but in the time that has elapsed since making the call, another threadcould have added an item to the queue. Frankly, it’s best to write your code not to relyon such functions.

文章导航