牛骨文教育服务平台(让学习变的简单)
博文笔记

Python 多线程

创建时间:2018-10-13 投稿人: 浏览次数:284

我们知道很多其他语言的多线程一般来做并发和异步处理。异步可以避免主线程被block,并发可以充分利用CPU多核达到提速的目的。但是python的多线程并发是不是我们想象的那样呢?python的多线程其实是伪多线程,由于历史遗留问题,python的多线程不是并行的在多核上执行,而是交替的在单核上支持,即使是多处理器的情况下,也没有能够提升速度,这对于多线程的并发特性简直是个坑。为什么会是这样呢?那是因为解释器会在执行时设置一个GIL(Global Interpreter Lock)锁,每个python线程在执行时都必须先获得这个锁,然后执行100行代码后释放这个锁,再等待获取这个这个锁进行执行。和我们多线程在CPU单核上并发逻辑类似,学过操作系统的同学并不陌生,就是程序强制CPU机制。这样逻辑会造成多线处理数据比单线处理数据会更慢。下面是做的一个500张图片填充的单线、多线、及线程池的例子。

# -*- coding: utf-8 -*-
# for Python 3.6
from concurrent.futures import ThreadPoolExecutor, wait
import numpy as np
import os
import skimage.io
import time
import threading
import torch

class ImageProcessor:
    def __init__(self):
        self.__extended_images = torch.zeros(0, 0, 0, 0)

    # read all files in given path
    def read_image(self, path):
        file_names = os.listdir(path)     # list all file name in current path
        images = []

        for i in range(len(file_names)):
            if file_names[i][-3:] != "jpg":
                continue
            if i < 0:
                continue
            fname = os.path.join(path, file_names[i]) # file full path

            image = skimage.io.imread(fname)
            if len(image.shape)==4:
                image = skimage.color.rgba2rgb(image)
            if len(image.shape)==3:
                image = skimage.color.rgb2gray(image)
            if len(image.shape)==2:
                image = skimage.color.gray2rgb(image)
            image = image.astype(np.float32)
    
            # to expand image array
            for j in range(5):
                images.append(image)
    
        return images

    # resize all image size
    def resize_images(self, images):
        height, width = self.__fetch_max_size(images)

        print("single thread start time is: ", time.time())
        start = time.time()
        self.__extend_image_by_single_thread(images, height, width)        # single thread
        end = time.time()
        print("single thread end time is: ", time.time())
        print("single thread time duration: ", end - start)
        time.sleep(1)
        print("multi-thread start time is: ", time.time())
        start = time.time()
        self.__extend_by_threading(images, height, width, 2)         # threading
        end = time.time()
        print("multi-thread end time is: ", time.time())
        print("multi-thread time duration: ", end - start)
        time.sleep(1)
        print("thread pool start time is ", time.time())
        start = time.time()
        self.__extend_by_thread_pool(images, height, width, 2)        # thread pool
        end = time.time()
        print("thread pool end time is ", time.time())
        print("thread pool time duration: ", end - start)

    # [0] is height, [1] is width, [2] is channel
    def __fetch_max_size(self, images):
        """
        # using numpy
        heights = [int (image.shape[0]) for image in images]
        widths = [int (image.shape[1]) for image in images]
        height = np.array(heights).max()
        width = np.array(widths).max()
        """

        # using for loop
        height = 0
        width = 0

        for image in images:
            height = np.maximum(height, image.shape[0])
            width = np.maximum(width, image.shape[1])

        return height, width

    # using single thread
    def __extend_image_by_single_thread(self, images, height, width):
        self.__extended_images = torch.zeros(len(images), height, width, 3)
        for i in range(len(images)):
            self.__extended_images[i, :int(images[i].shape[0]), :int(images[i].shape[1]), :] = torch.from_numpy(images[i])

    # using threading
    def __extend_by_threading(self, images, height, width, parts_num):
        threads = []
        self.__extended_images = torch.zeros(len(images), height, width, 3)

        for i in range(parts_num):
            t = threading.Thread(target = self.__extend_part_image, args = (images, int(len(images) * i / parts_num), int(len(images) * (i + 1) / parts_num)))
            threads.append(t)

        for t in threads:
            t.setDaemon(True)
            t.start()
    
        for t in threads:
            t.join()

    def __extend_part_image(self, images, i, j):
        for k in range(i, j):
            self.__extended_images[k, :int(images[k].shape[0]), :int(images[k].shape[1]), :] = torch.from_numpy(images[k])
    
    # using thread pool
    def __extend_by_thread_pool(self, images, height, width, workers):
        pool_tasks = []
        pool_executor = ThreadPoolExecutor(max_workers = workers)
        self.__extended_images = torch.zeros(len(images), height, width, 3)

        # using thread pool
        for i in range(len(images)):
            pool_tasks.append(pool_executor.submit(self.__extend_single_image, images[i], i))

        wait(pool_tasks)

    def __extend_single_image(self, image, i):
        self.__extended_images[i, :int(image.shape[0]), :int(image.shape[1]), :] = torch.from_numpy(image)

def main():
    img_proc = ImageProcessor()
    path = "/home/chenyu.xu/test/allimages/" 
    images = img_proc.read_image(path)
    if len(images) == 0:
        print("There is no image!")
        return

    img_proc.resize_images(images)
    print("All done")

if __name__ == "__main__":
    main()

从结果可以看出,单线是最快的。

(py35) root@M7-10-6-0-46-1-200:~# python test.py
single thread start time is:  1539424660.2730281
single thread end time is:  1539424662.7921205
single thread time duration:  2.518900156021118
multi-thread start time is:  1539424663.793299
multi-thread end time is:  1539424667.7644918
multi-thread time duration:  3.971099376678467
thread pool start time is  1539424668.7684498
thread pool end time is  1539424672.3863258
thread pool time duration:  3.6148757934570312
All done
(py35) root@M7-10-6-0-46-1-200:~#

 

声明:该文观点仅代表作者本人,牛骨文系教育信息发布平台,牛骨文仅提供信息存储空间服务。