python多线程多进程脚本

Posted by kevin on March 20, 2020

多线程

基本用法

python 中一般用 threading 模块来实现多线程,一种实现多线程的脚本如下,最终的运行时间为 1s 多一点点,join 表示将子线程加入主线程,等待子线程都运行完才会继续往下执行。

import time
import threading

def func(n):
    print("current task:", n)
    time.sleep(1)

if __name__ == "__main__":
    t = time.time()
    t1 = threading.Thread(target=func, args=("thread 1",))
    t2 = threading.Thread(target=func, args=("thread 2",))
    t1.start()
    t2.start()
    # 加入主线程
    t1.join()
    t2.join()
    print('done')
    print(time.time() - t)

还有一种写法是通过继承 threading.Thread 定义自己的线程类,重构一下父类的 run 方法,注意这个 run 不能写成其他的函数

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

import time
import threading

class MyThread(threading.Thread):
    def __init__(self, n):
        # 重构run函数必须要写
        super(MyThread, self).__init__()  
        self.n = n

    def run(self):
        print("current task:", self.n)
        time.sleep(1)

if __name__ == "__main__":
    t = time.time()
    t1 = MyThread("thread 1")
    t2 = MyThread("thread 2")
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('done')
    print(time.time() - t)

想开多个线程可以用 for 循环先让线程 start 再用一个 for 循环让所有线程都 join

for i in range(200):
    t = threading.Thread(target=func,args=('thread {}'.format(i),))
    t.start()
for i in range(200):
    t.join()

互斥锁

线程之间数据是共享的,当多个线程对某一个共享数据进行操作时,就需要考虑到线程安全问题。threading 模块中定义了 Lock 类,提供了互斥锁的功能来保证多线程情况下数据的正确性。

#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire()
#释放
mutex.release()

除了 Lock 类还有一个 RLock 递归锁,RLock 允许在同一线程中被多次 acquire, 而 ``Lock 却不允许这种情况,否则会产生死锁。 如果使用 RLock`,那么 acquire 和 release 必须成对出现,即调用了 n 次 acquire,必须调用 n 次的 release 才能真正释放所占用的琐。

rLock = threading.RLock()  #RLock对象
rLock.acquire()
rLock.acquire()    #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()

看下面这个例子,没有加锁,两个线程共享对全局变量 balance 并且对其执行相关函数,由于这个函数里面的操作并不是原子操作,CPU 对线程的调度是随机的,就有可能两个线程同时在操作这个变量,第一个线程还没有操作完毕第二个线程就对当前没有操作完的变量再次操作,所以就会造成结果被改乱

import time, threading

# 假定这是你的银行存款:
balance = 0

def change_it(n):
    # 先存后取,结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

上面这个代码的输出有很多种情况,非常随机,如果对操作变量的地方加上锁的话,当某个线程执行到该函数我们就说它获得了锁,因此其他线程不能同时操作这个变量,只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突

import time, threading

# 假定这是你的银行存款:
balance = 0
# 定义一个锁
lock = threading.Lock()

def change_it(n):
    # 先存后取,结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        # 先获得锁
        lock.acquire()
        change_it(n)
        # 改完了要记得释放锁
        lock.release()

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

不过加上了锁感觉跟没有用多线程是一样的,甚至有时候速度还比单线程更慢,u1s1,python 速度慢不是吹的,也就爬虫用用多线程吧

Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。多线程的并发在Python中就是一个美丽的梦。

多进程

基本用法

Python 要进行多进程操作,需要用到 muiltprocessing 库,其中的 Process 类跟 threading 模块的 Thread 类很相似,第一种方法直接用 Process 来声明一个对象

from multiprocessing import Process  

def show(name):
    print("Process name is " + name)

if __name__ == "__main__": 
    proc = Process(target=show, args=('subprocess',))  
    proc.start()  
    proc.join()

第二种方法和上面的一样,也是重写 run 方法,所以和多线程的表示还是挺像的

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, name):
        super(MyProcess, self).__init__()
        self.name = name

    def run(self):
        print('process name :' + self.name)
        time.sleep(1)

if __name__ == '__main__':
    for i in range(3):
        p = MyProcess(str(i))
        p.start()
    for i in range(3):
        p.join()

进程间通信

操作系统提供了很多机制来实现进程间的通信,Python的 multiprocessing 模块包装了底层的机制,提供了 QueuePipes 等多种方式来交换数据。

Queue 是多进程安全的队列,可以实现多进程之间的数据传递。它主要有两个函数 putget,put() 用以插入数据到队列中,get() 可以从队列读取并且删除一个元素。

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

Pipe 的本质是进程之间的用管道数据传递,而不是数据共享,这和 socket 有点像。pipe() 返回两个连接对象分别表示管道的两端,每端都有 send() 和 recv() 函数,如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据。

from multiprocessing import Process, Pipe
 
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe() 
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   #prints "[42, None, 'hello']"
    p.join()

进程池

如果要启动大量的子进程,可以用进程池 Pool 的方式批量创建子进程,Pool 常用的方法如下

方法 含义
apply() 同步执行(串行)
apply_async() 异步执行(并行)
terminate() 立刻关闭进程池
join() 主进程等待所有子进程执行完毕。必须在 close() 或 terminate() 之后使用
close() 等待所有进程结束后,才关闭进程池

Pool 对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close() ,调用 close() 之后就不能继续添加新的 Process 了。

输出的结果 task 0,1,2,3 是立刻执行的,而 task 4 要等待前面某个 task 完成后才执行,这是因为 Pool 的默认大小在我的电脑上是4,因此,最多同时执行4个进程,当一个进程执行完后会添加新的进程进去

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    # 调用 join() 之前必须先调用 close()
    p.join()
    print('All subprocesses done.')

如何选择

计算密集型任务选择多进程,IO 密集型任务(如爬虫)选择多线程

我们可以把任务分为计算密集型和IO密集型。

计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

reference

https://www.jianshu.com/p/a69dec87e646

https://wulidecade.cn/2019/08/11/python%E7%BC%96%E7%A8%8B/

https://www.liaoxuefeng.com/wiki/1016959663602400/1017628290184064