Python 多线程
我们知道很多其他语言的多线程一般来做并发和异步处理。异步可以避免主线程被block,并发可以充分利用CPU多核达到提速的目的。但是python的多线程并发是不是我们想象的那样呢?python的多线程其实是伪多线程,由于历史遗留问题,python的多线程不是并行的在多核上执行,而是交替的在单核上支持,即使是多处理器的情况下,也没有能够提升速度,这对于多线程的并发特性简直是个坑。为什么会是这样呢?那是因为解释器会在执行时设置一个GIL(Global Interpreter Lock)锁,每个python线程在执行时都必须先获得这个锁,然后执行100行代码后释放这个锁,再等待获取这个这个锁进行执行。和我们多线程在CPU单核上并发逻辑类似,学过操作系统的同学并不陌生,就是程序强制CPU机制。这样逻辑会造成多线处理数据比单线处理数据会更慢。下面是做的一个500张图片填充的单线、多线、及线程池的例子。
# -*- coding: utf-8 -*-
# for Python 3.6
from concurrent.futures import ThreadPoolExecutor, wait
import numpy as np
import os
import skimage.io
import time
import threading
import torch
class ImageProcessor:
def __init__(self):
self.__extended_images = torch.zeros(0, 0, 0, 0)
# read all files in given path
def read_image(self, path):
file_names = os.listdir(path) # list all file name in current path
images = []
for i in range(len(file_names)):
if file_names[i][-3:] != "jpg":
continue
if i < 0:
continue
fname = os.path.join(path, file_names[i]) # file full path
image = skimage.io.imread(fname)
if len(image.shape)==4:
image = skimage.color.rgba2rgb(image)
if len(image.shape)==3:
image = skimage.color.rgb2gray(image)
if len(image.shape)==2:
image = skimage.color.gray2rgb(image)
image = image.astype(np.float32)
# to expand image array
for j in range(5):
images.append(image)
return images
# resize all image size
def resize_images(self, images):
height, width = self.__fetch_max_size(images)
print("single thread start time is: ", time.time())
start = time.time()
self.__extend_image_by_single_thread(images, height, width) # single thread
end = time.time()
print("single thread end time is: ", time.time())
print("single thread time duration: ", end - start)
time.sleep(1)
print("multi-thread start time is: ", time.time())
start = time.time()
self.__extend_by_threading(images, height, width, 2) # threading
end = time.time()
print("multi-thread end time is: ", time.time())
print("multi-thread time duration: ", end - start)
time.sleep(1)
print("thread pool start time is ", time.time())
start = time.time()
self.__extend_by_thread_pool(images, height, width, 2) # thread pool
end = time.time()
print("thread pool end time is ", time.time())
print("thread pool time duration: ", end - start)
# [0] is height, [1] is width, [2] is channel
def __fetch_max_size(self, images):
"""
# using numpy
heights = [int (image.shape[0]) for image in images]
widths = [int (image.shape[1]) for image in images]
height = np.array(heights).max()
width = np.array(widths).max()
"""
# using for loop
height = 0
width = 0
for image in images:
height = np.maximum(height, image.shape[0])
width = np.maximum(width, image.shape[1])
return height, width
# using single thread
def __extend_image_by_single_thread(self, images, height, width):
self.__extended_images = torch.zeros(len(images), height, width, 3)
for i in range(len(images)):
self.__extended_images[i, :int(images[i].shape[0]), :int(images[i].shape[1]), :] = torch.from_numpy(images[i])
# using threading
def __extend_by_threading(self, images, height, width, parts_num):
threads = []
self.__extended_images = torch.zeros(len(images), height, width, 3)
for i in range(parts_num):
t = threading.Thread(target = self.__extend_part_image, args = (images, int(len(images) * i / parts_num), int(len(images) * (i + 1) / parts_num)))
threads.append(t)
for t in threads:
t.setDaemon(True)
t.start()
for t in threads:
t.join()
def __extend_part_image(self, images, i, j):
for k in range(i, j):
self.__extended_images[k, :int(images[k].shape[0]), :int(images[k].shape[1]), :] = torch.from_numpy(images[k])
# using thread pool
def __extend_by_thread_pool(self, images, height, width, workers):
pool_tasks = []
pool_executor = ThreadPoolExecutor(max_workers = workers)
self.__extended_images = torch.zeros(len(images), height, width, 3)
# using thread pool
for i in range(len(images)):
pool_tasks.append(pool_executor.submit(self.__extend_single_image, images[i], i))
wait(pool_tasks)
def __extend_single_image(self, image, i):
self.__extended_images[i, :int(image.shape[0]), :int(image.shape[1]), :] = torch.from_numpy(image)
def main():
img_proc = ImageProcessor()
path = "/home/chenyu.xu/test/allimages/"
images = img_proc.read_image(path)
if len(images) == 0:
print("There is no image!")
return
img_proc.resize_images(images)
print("All done")
if __name__ == "__main__":
main()
从结果可以看出,单线是最快的。
(py35) root@M7-10-6-0-46-1-200:~# python test.py
single thread start time is: 1539424660.2730281
single thread end time is: 1539424662.7921205
single thread time duration: 2.518900156021118
multi-thread start time is: 1539424663.793299
multi-thread end time is: 1539424667.7644918
multi-thread time duration: 3.971099376678467
thread pool start time is 1539424668.7684498
thread pool end time is 1539424672.3863258
thread pool time duration: 3.6148757934570312
All done
(py35) root@M7-10-6-0-46-1-200:~#
声明:该文观点仅代表作者本人,牛骨文系教育信息发布平台,牛骨文仅提供信息存储空间服务。