3分钟理解梳理清楚Python多线程与多进程

原创
小哥 3年前 (2022-10-20) 阅读数 115 #PYTHON

在学习Python在这个过程中,我接触到了与多线程编程相关的知识点,这些知识点以前还没有被彻底了解。今天,我将花一些时间尽可能清楚地梳理细节。

线程和进程之间的区别

进程(process)和线程(thread)是操作系统的基本概念,但比较抽象,不容易掌握。关于多进程、多线程,教材中最经典的一句话是 进程是资源分配的最小单位,线程是CPU最小调度单位 线程是程序中的单个顺序控制流。进程中相对独立和可调度的执行单元是系统的独立调度和调度。CPU基本单元是指运行程序的调度单元。在单个程序中同时运行多个线程以完成不同任务称为多线程。

进程和线程的差异

进程是资源配置的基本单位。与进程相关的所有资源都记录在进程控制块中。PCB在……里面。以指示进程拥有或正在使用这些资源。此外,进程也是抢占式处理器的调度单元,抢占式处理器拥有完整的虚拟地址空间。当调度一个进程时,不同的进程具有不同的虚拟地址空间,同一进程中的不同线程共享相同的地址空间。

对应于进程,线程与资源分配无关。它属于某个进程,并与该进程中的其他线程共享该进程的资源。线程仅由相关堆栈(系统堆栈或用户堆栈)寄存器和线程表控制。TCB组成。寄存器可用于存储线程内的局部变量,但不能存储其他线程的相关变量。

通常,一个进程可以包含多个线程,这些线程可以利用该进程拥有的资源。在引入线程的操作系统中,进程通常被用作分配资源的基本单元,而线程被用作独立运行和独立调度的基本单元。

由于线程比进程小,基本上没有系统资源,调度它们的成本会小得多,可以更有效地提高系统中多个程序之间的并发执行程度,从而显著提高系统资源的利用率和吞吐量。

因此,近年来推出的通用操作系统都引入了线程,以进一步提高系统的并发性,并将其作为现代操作系统的重要指标。

线程和进程之间的区别可以归纳为以下4点:

  • 地址空间和其他资源(如打开的文件):进程彼此独立,并在同一进程的线程之间共享。一个进程中的线程对其他进程不可见。

  • 通信:进程间通信IPC,线程可以直接读写进程数据段(如全局变量)进行通信--需要进程同步和互斥来确保数据一致性。

  • 调度和切换:线程上下文切换比进程上下文切换快得多。

  • 在多线程OS,该进程不是可执行实体。

多进程与多线程的比较

对比维度

多进程

多线程

总结

数据共享、同步

数据共享很复杂,同步很简单。

数据共享很简单,同步很复杂。

各有优劣

内存、CPU

它占用更多的内存,而且切换起来很复杂,CPU利用率低

更少的内存使用、简单的切换、CPU利用率高

线程占优

创建、销毁、切换

复杂,缓慢

简单快捷

线程占优

编程、调试

简单的编程和调试

编程复杂,调试复杂。

进程占优

可靠性

进程之间不会有相互影响。

线程挂起会导致整个进程挂起。

进程占优

分布式

适用于多核多机,扩展到多机简单。

适用于多核

进程占优

总之,进程和线程也可以比作火车和车厢:

  • 线程在进程下运行(简单的车厢不能运行)

  • 一个进程可以包含多个线程(一列火车可以有多节车厢)

  • 不同流程之间难以共享数据(一列列车上的乘客很难切换到另一列,如换站)

  • 同一进程下不同线程之间的数据易于共享(A车厢换到B马车很容易)

  • 进程比线程消耗更多的计算机资源(使用多列列车比使用多节车厢消耗更多的资源)

  • 进程间不会相互影响,线程挂起会导致整个进程挂起。(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到该趟火车的所有车厢)

  • 该流程可扩展到多台机器,最多适用于多核(不同列车可以在多个轨道上行驶,同一列车的车厢不能在不同轨道上)

  • 进程使用的内存地址可以被锁定,也就是说,当一个线程使用一些共享内存时,其他线程必须等待它结束才能使用这段内存。(如火车上的卫生间)--“Mutex(mutex)”

  • 进程使用的内存地址可以是有限的(例如,火车上的餐厅,最多允许多少人进入,如果满座,则需要在门口等待,只有当有人出来时才能进入)--“信号量(semaphore)”

Python全局解释程序锁GIL

全局解释程序锁(英语:Global Interpreter Lock,缩写GIL),不是Python它的特点,就是在执行上Python解析器(CPython)引入了一个概念。结果CPython是大多数环境中的默认设置Python执行环境。所以在很多人的观念中CPython就是Python,也是理所当然的GIL归结为Python语言上的缺陷。所以CPython实现中的GIL那是什么?我们来看看官方的解释:

The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.

Python执行代码的执行Python 虚拟机(也称为解释器主循环,CPython版本)来控制,Python 在设计之初,人们认为只有一个线程在解释器的主循环中执行,即在任何时候,只有一个线程在解释器中运行。正确的Python 虚拟机的访问由全局解释程序锁(GIL)为了控制,正是这个锁可以确保同时只有一个线程在运行。

GIL 有什么福利待遇?简而言之,它在单线程的情况下速度更快,在和 C 在不考虑线程安全问题的情况下组合库会更方便,这也是一个早期阶段。 Python 最常见的应用场景和优势。此外,GIL简化系统的设计CPython的实现使对象模型(包括关键的内置类型,如字典)可以同时隐式访问。锁定全局解释器使其更容易支持多线程,但它也失去了多处理器主机的并行计算能力。

在多线程环境中,Python 该虚拟机按如下方式执行:

  1. 设置GIL

  2. 切换到要运行的线程

  3. 运行到指定数量的字节码指令,或者线程主动放弃控制(可以调用sleep(0))

  4. 将线程设置为休眠

  5. 解锁GIL

  6. 再次重复上述所有步骤

Python3.2前,GIL当前线程满足释放逻辑。IO操作或者ticks计数达到100(ticks可以看到python一种自己的计数器,专门使用GIL,每次释放后都为零,则可以传递此计数 sys.setcheckinterval 调整),松开。因为计算密集型线程正在释放GIL在那之后,我会立即申请。GIL,并且通常在调度其他线程之前重新获取它。GIL,它将导致一旦获得计算密集型线程。GIL,那么它将占据很长一段时间GIL,甚至直到线程执行结束。

Python 3.2开始使用新的GIL。新的GIL该实现使用固定的超时来指示当前线程放弃全局锁。当当前线程持有此锁并且其他线程请求此锁时,当前线程将5锁被强制在毫秒后释放。改进的是在单核的情况下,对于单线程的长期占用。GIL情况有所改善。

在单核CPU打开时,数百次间隔检查只会导致线程切换。在多核中CPU打开时,有一个严重的线条凹凸(thrashing)。每一次发布GIL锁、线程竞争锁和切换线程,这会消耗资源。单核下多线程,每次发布GIL,唤醒的线程可以获得它。GIL锁,所以它可以无缝执行,但在多核下,CPU0释放GIL后,其他CPU上线将会竞争,GIL可能会立即再次出现CPU0得到它,导致其他几个CPU在被唤醒后,线程会被唤醒并等待,直到切换时间,然后进入等待状态,这会导致线程动荡。(thrashing),导致效率较低。

此外,还可以从上述实现机制中推导出,Python多线程配对IO密集型代码CPU密集的代码更友好。

针对GIL应对措施:

  • 使用更高版本Python(对GIL机构优化)

  • 将多线程替换为多进程(否GIL,但该过程本身会消耗更多资源)

  • 指定cpu运行线程(使用affinity模块)

  • 使用Jython、IronPython等无GIL解释器

  • 全IO多线程仅用于密集型任务。

  • 使用关联(高效单线程模式,也称为微线程;通常用于多个进程)

  • 使用关键组件C/C++编写为Python扩张,ctypes使Python直接程序调用C该语言编译的动态链接库的导出函数。(with nogil调出GIL限制)

Python多进程包multiprocessing

Python的threading该包主要使用多线程开发,但由于GIL的存在,Python多线程并不是真正的多线程,如果你想充分利用多核CPU资源,大多数情况下需要使用多个进程。在……里面Python 2.6介绍了该版本multiprocessing包,它完全复制了一套threading提供的接口方便了迁移。唯一的区别是它使用多个进程而不是多个线程。每个进程都有自己的独立进程GIL,所以不会有进程之间的冲突。GIL争抢。

借助这个multiprocessing,您可以轻松地完成从单进程到并发执行的转换。multiprocessing支持子进程,通信和共享数据,执行不同形式的同步,并提供Process、Queue、Pipe、Lock等组件。

Multiprocessing这一代人的背景

除了应对Python的GIL此外,它还生产multiprocessing另一个原因是Windows操作系统和Linux/Unix系统不一致。

Unix/Linux该操作系统提供了fork()系统调用,它非常特殊。普通函数,调用一次,返回一次,fork()调用一次,返回两次,因为操作系统会自动复制当前进程(父进程)(子进程)的副本,然后分别在父进程和子进程中返回。子进程始终返回0,并且父进程返回子进程的ID。这样做的原因是父进程可以fork有很多子进程,所以父进程应该写下ID,子进程只需调用getpid()您可以获取父进程ID。

Python的os该该模块已封装常见的系统调用,包括fork,可以在Python在程序中轻松创建子流程:

import os

print(Process (%s) start... % os.getpid())

# Only works on Unix/Linux/Mac:

pid = os.fork()

if pid == 0:

    print(I am child process (%s) and my parent is %s. % (os.getpid(), os.getppid()))

else:

    print(I (%s) just created a child process (%s). % (os.getpid(), pid))

上面的代码在Linux、Unix和Mac上的执行结果为:

Process (876) start...

I (876) just created a child process (877).

I am child process (877) and my parent is 876.

有了fork调用时,进程可以在收到新任务时复制子进程以处理新任务。Apache每当有新的http当被要求时,它是fork处理新信息的子流程http请求。

由于Windows没有fork调用,上面的代码在Windows它不能继续运行。结果Python它是跨平台的,自然也应该提供跨平台的多进程支持。multiprocessing模块是多进程模块的跨平台版本。multiprocessing该模块已封装fork()打电话,这样我们就不需要注意了fork()详细情况。结果Windows没有fork因此,呼叫,multiprocessing需要“模拟”出来fork的效果。

multiprocessing常见组件和功能

要创建管理流程模块,请执行以下操作:

  • Process(用于创建进程)

  • Pool(用于创建管理进程池)

  • Queue(用于进程通信、资源共享)

  • Value,Array(用于进程通信、资源共享)

  • Pipe(用于管道通信)

  • Manager(用于资源共享)

同步子流程模块:

  • Condition(条件变量)

  • Event(事件)

  • Lock(互斥锁)

  • RLock(可重入互斥锁(相同的进程可以多次获取它,而不会导致阻塞。)

  • Semaphore(信号量)

接下来,让我们学习如何使用每个组件和函数。

Process(用于创建进程)

multiprocessing该模块提供了一个Process类来表示进程对象。

在multiprocessing,每个进程使用一个Process要表示的类。

施工方法:Process([group [, target [, name [, args [, kwargs]]]]])

  • group:GROUPING,不实际使用,值始终为None

  • target:表示调用对象,即子进程要执行的任务。您可以传入方法名。

  • name:设置子进程的名称

  • args:要传给target函数的位置参数作为元组传入。

  • kwargs:要传给target函数的DICTIONARY参数作为字典传入。

示例方法:

  • start():启动进程并调用p.run()

  • run():调用进程启动时运行的方法。target指定的函数,则必须在自定义类的类中实现此方法。

  • terminate():强制终止进程p,将不会执行任何清洁操作,如果p创建子流程后,该子进程将成为僵尸进程。使用此方法尤其要注意这种情况。如果p锁也被保存,因此它不会被释放,从而导致死锁。

  • is_alive():返回进程是否正在运行。如果p仍在运行,返回True

  • join([timeout]):进程同步,主进程等待子进程完成,然后执行以下代码。线程正在等待p终止(强调:主线程处于相等状态,并且p处于运行状态)。timeout是可选的超时时间(超过该时间,父线程将不再等待子线程并继续执行)。应该强调的是,p.join只能join住start打开流程,但不是join住run开放式流程

物业简介:

  • daemon:缺省值为False,如果已设置True,代表p后台运行的守护进程;当p当的父进程终止时,p它还会终止并设置True后,p您不能创建自己的新流程;必须在p.start()之前设置

  • name:进程的名称

  • pid:进程的pid

  • exitcode:进程正在运行None,如-N,这意味着要发出信号N结束(了解即可)

  • authkey:进程的身份验证密钥。,默认是由os.urandom()随机生成32字符串。此密钥的目的是为涉及网络连接的底层进程间通信提供安全性,只有当它们具有相同的身份验证密钥时才能成功(了解)

使用示例:(注意:在windows中Process()必须放到if name == ‘ main ’:下)

from multiprocessing import Process

import os

def run_proc(name):

    print(Run child process %s (%s)... % (name, os.getpid()))

if __name__==__main__:

    print(Parent process %s. % os.getpid())

    p = Process(target=run_proc, args=(test,))

    print(Child process will start.)

    p.start()

    p.join()

print(Child process end.)

Pool(用于创建管理进程池)

Pool类用于许多需要执行的目标,手动限制进程数太麻烦,如果目标很少且不需要控制进程数,则可以使用Process类。Pool您可以提供指定数量的进程供用户在提交新请求时调用Pool如果池未满,则会创建一个新进程来执行请求,但如果池中的进程数已达到指定的最大值,则请求将一直等待,直到池中的进程结束并重复使用进程池中的进程。

施工方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes :如果省略,则默认使用要创建的进程数cpu_count()号码被退回了。

  • initializer:每个辅助进程启动时要执行的可调用对象,默认为None。如果initializer是None,则将在开始时调用每个工作进程。initializer(*initargs)。

  • initargs:将被传承下去initializer参数组。

  • maxtasksperchild:在工作进程退出之前可以完成的任务数。完成后,使用新的工作进程来替换原始进程,以释放空闲资源。maxtasksperchild默认是None,这意味着只要Pool如果有一个工作流程,它就会存活下来。

  • context: 用于工作过程开始时的上下文中,通常用于Pool() 或者一个context对象的Pool()方法来创建池,这两种方法都进行了适当的设置。context。

示例方法:

  • apply(func[, args[, kwargs]]):在池进程中执行func(args,*kwargs),然后返回结果。需要强调的是,此操作不会在所有池进程中执行。func功能。如果您想通过不同的参数并发执行func函数,该函数必须从另一个线程调用p.apply()功能或用途p.apply_async()。它挡住了。apply很少使用

  • apply_async(func[, arg[, kwds={}[, callback=None]]]):在池进程中执行func(args,*kwargs),然后返回结果。这种方法的结果是AsyncResult类的一个实例,callback是接收输入参数的可调用对象。什么时候func当结果可用时,理解就通过了callback。callback禁止任何阻塞操作,否则会收到其他异步操作的结果。它是非阻塞的。

  • map(func, iterable[, chunksize=None]):Pool类中的map方法,使用内置的map函数的使用行为基本相同,它会阻塞进程,直到返回结果。请注意,尽管第二个参数是迭代器,但在实际使用中,程序不会运行子进程,直到整个队列准备就绪。

  • map_async(func, iterable[, chunksize=None]):map_async与map的关系同apply与apply_async

  • imap():imap 与 map不同的是,map但是当所有进程已经被执行并且结果已经被返回时,imap()是立即返回一个iterable可重复使用的对象。

  • imap_unordered():不能保证返回结果的顺序与进程添加的顺序一致。

  • close():关闭进程池以防止进一步操作。如果所有操作继续挂起,它们将在工作进程终止之前完成。

  • join():等待所有工作进程退出。此方法仅可用于close()或teminate()在呼叫之后,让它不再接受新的Process。

  • terminate():结束工作流程,不再处理未处理的任务。

方法apply_async()和map_async()的返回值为AsyncResul的实例obj。该实例具有以下方法:

  • get():返回结果,必要时等待结果到达。timeout是可选的。如果在指定的时间内没有到达,将抛出例外。如果在远程操作中引发异常,则在调用此方法时将再次引发该异常。

  • ready():如果调用完成,则返回True

  • successful():如果调用完成并且没有抛出异常,则返回True如果在结果准备好之前调用此方法,则会引发异常。

  • wait([timeout]):等待结果出来。

  • terminate():立即终止所有工作进程,而不执行任何清理或结束任何挂起的工作。如果p垃圾回收时,将自动调用此函数

使用示例:

# -*- coding:utf-8 -*-

# Pool+map

from multiprocessing import Pool

def test(i):

    print(i)

if __name__ == "__main__":

    lists = range(100)

    pool = Pool(8)

    pool.map(test, lists)

    pool.close()

    pool.join()

# -*- coding:utf-8 -*-

# 异步进程池(非阻塞)

from multiprocessing import Pool

def test(i):

    print(i)

if __name__ == "__main__":

    pool = Pool(8)

    for i in range(100):

        

        For执行循环中的步骤:

        (1)循环遍历,将100将子进程添加到进程池(相对于父进程的块)

        (2)每次执行8子进程,等待子进程执行,立即启动新的子进程。(相对于父进程不阻塞)

        apply_async为异步进程池写入。异步是指启动进程的进程,以及父进程本身的执行(print)是异步的,而For将子进程添加到循环中的进程池的进程与父进程本身的执行同步。

        

        pool.apply_async(test, args=(i,))  # 维护实施的流程总数为8当一个进程被执行时,一个新的进程被启动。.

    print("test")

    pool.close()

    pool.join()

# -*- coding:utf-8 -*-

# 异步进程池(非阻塞)

from multiprocessing import Pool

def test(i):

    print(i)

if __name__ == "__main__":

    pool = Pool(8)

    for i in range(100):

        

            实际测试发现,for在循环中执行以下步骤:

            (1)遍历100对象,将子进程放入进程池中。

            (2)执行此子进程,子进程完成后,在执行前将子进程放入进程池中。(一次只执行一个子进程)

            for循环完成后,将再次执行。print函数。

        

        pool.apply(test, args=(i,))  # 维护实施的流程总数为8当一个进程被执行时,一个新的进程被启动。.

    print("test")

    pool.close()

    pool.join()

Queue(用于进程通信、资源共享)

在使用多进程的过程中,最好不要使用共享资源。普通全局变量不能由缝制工艺共享,只能通过Multiprocessing该组件构建的数据结构可以共享。

Queue 是用于创建进程间资源共享队列的类,使用Queue可以实现多个进程之间的数据传输功能(缺点:仅适用Process类,不能在Pool在进程池中使用)。

施工方法:Queue([maxsize])

  • maxsize是队列中允许的最大项目数,如果省略,则没有大小限制。

示例方法:

  • put():用于将数据插入队列。put该方法还有两个可选参数:blocked和timeout。如果blocked为True(默认)和timeout为正值,则该方法将timeout队列剩余空间之前的指定时间。如果超时,则将抛出Queue.Full例外。如果blocked为False,但该Queue满的,将立即抛出Queue.Full异常。

  • get():您可以从队列中读取和删除元素。get该方法有两个可选参数:blocked和timeout。如果blocked为True(默认)和timeout如果它是正的,则在等待时间内不获取任何元素并将被抛出。Queue.Empty例外。如果blocked为False,有两种情况,如果Queue如果值可用,则立即返回该值,否则,如果队列为空,则立即引发该值Queue.Empty例外。如果你不想待在empty在引发异常时,请使blocked为True或将所有参数设置为空。

  • get_nowait():同q.get(False)

  • put_nowait():同q.put(False)

  • empty():当调用此方法时q如果为空,则返回True,结果是不可靠的,例如在返回。True如果将项添加到队列中。

  • full():当调用此方法时q满了就退货True,结果是不可靠的,例如在返回。True如果队列中的项被移走。

  • qsize():返回队列中当前项的正确数量,由于同样的原因,结果不可靠。q.empty()和q.full()一样

使用示例:

from multiprocessing import Process, Queue

import os, time, random

def write(q):

    print(Process to write: %s % os.getpid())

    for value in [A, B, C]:

        print(Put %s to queue... % value)

        q.put(value)

        time.sleep(random.random())

def read(q):

    print(Process to read: %s % os.getpid())

    while True:

        value = q.get(True)

        print(Get %s from queue. % value)

if __name__ == "__main__":

    q = Queue()

    pw = Process(target=write, args=(q,))

    pr = Process(target=read, args=(q,))

    pw.start()

    pr.start()

    pw.join()  # 等待pw结束

    pr.terminate()  # pr这个过程是一个死循环,不能等待它的结束,只能强制终止

JoinableQueue 这就像是一个Queue对象,但该队列允许项的用户通知生成器该项已成功处理。通知过程使用共享信号和条件变量来实现。

施工方法:JoinableQueue([maxsize])

  • maxsize:队列中允许的最大项目数,如果省略,则没有大小限制。

实例方法

JoinableQueue的实例p除了与Queue除了对象的相同方法外,它还具有:

  • task_done():用户使用此方法发出信号,指示q.get()的退货项目已处理。如果调用此方法的数量超过从队列中移除的项数,则将引发此方法。ValueError异常

  • join():生产者调用此方法进行阻塞,直到处理完队列中的所有项。阻塞将继续,直到调用队列中的每个项目q.task_done()方法

使用示例:

# -*- coding:utf-8 -*-

from multiprocessing import Process, JoinableQueue

import time, random

def consumer(q):

    while True:

        res = q.get()

        print(消费者得到了它 %s % res)

        q.task_done()

def producer(seq, q):

    for item in seq:

        time.sleep(random.randrange(1,2))

        q.put(item)

        print(制片人做得很好 %s % item)

    q.join()

if __name__ == "__main__":

    q = JoinableQueue()

    seq = (产品%s % i for i in range(5))

    p = Process(target=consumer, args=(q,))

    p.daemon = True  # 在主线程停止时设置为守护进程p也停下来,但别担心,producer内调用q.join保证了consumer队列中的所有元素都已处理

    p.start()

    producer(seq, q)

    print(主线程)

Value,Array(用于进程通信、资源共享)

multiprocessing 中Value和Array实现原理是在共享内存中创建。ctypes()对象实现数据共享的目的,两种实现方法相似,但选择不同ctypes数据类型。

Value

施工方法:Value((typecode_or_type, args[, lock])

  • typecode_or_type:定义ctypes()可以传递的对象的类型。Type code或 C Type具体对比表如下所示。

  • args:传递给typecode_or_type构造函数的参数

  • lock:默认为True,创建互斥锁以限制该对。Value对象访问,如果传递了锁,则Lock或RLock的实例将用于同步。如果有来电False,Value的实例将不受锁保护,它将不是进程安全的。

typecode_or_type支持类型:

| Type code | C Type             | Python Type       | Minimum size in bytes |

| --------- | ------------------ | ----------------- | --------------------- |

| b     | signed char        | int               | 1                     |

| B     | unsigned char      | int               | 1                     |

| u     | Py_UNICODE         | Unicode character | 2                     |

| h     | signed short       | int               | 2                     |

| H     | unsigned short     | int               | 2                     |

| i     | signed int         | int               | 2                     |

| I     | unsigned int       | int               | 2                     |

| l     | signed long        | int               | 4                     |

| L     | unsigned long      | int               | 4                     |

| q     | signed long long   | int               | 8                     |

| Q     | unsigned long long | int               | 8                     |

| f     | float              | float             | 4                     |

| d     | double             | float             | 8                     |

参考地址:https://docs.python.org/3/library/array.html

Array

施工方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

  • typecode_or_type:同上

  • size_or_initializer:如果是整数,则确定数组的长度,数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,它的长度决定了数组的长度。

  • kwds:传递给typecode_or_type构造函数的参数

  • lock:同上

使用示例:

import multiprocessing

def f(n, a):

    n.value = 3.14

    a[0] = 5

if __name__ == __main__:

    num = multiprocessing.Value(d, 0.0)

    arr = multiprocessing.Array(i, range(10))

    p = multiprocessing.Process(target=f, args=(num, arr))

    p.start()

    p.join()

    print(num.value)

    print(arr[:])

注意:Value和Array只适用于Process类。

Pipe(用于管道通信)

多进程也有一种称为流水线原理的数据传输方法, Queue相同。Pipe您可以在进程之间创建管道并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须生成Process管道在对象之前生成。

施工方法:Pipe([duplex])

  • dumplex:默认管道为全双工,如果duplex射成False,conn1只能用于接收,conn2只能用于发送。

示例方法:

  • send(obj):通过连接发送对象。obj是否有任何对象与序列化兼容

  • recv():接收conn2.send(obj)对象已发送。如果没有要接收的消息,recv该方法将始终阻塞。如果连接的另一端已关闭,则recv方法引发EOFError。

  • close():关闭连接。如果conn1垃圾回收时,将自动调用此方法

  • fileno():返回连接使用的整型文件描述符

  • poll([timeout]):如果连接上的数据可用,则返回True。timeout指定等待的最长时间限制。如果省略此参数,该方法将立即返回结果。如果timeout射成None,则该操作将无限期地等待数据到达。

  • recv_bytes([maxlength]):接收c.send_bytes()方法发送一条完整的字节消息。maxlength指定要接收的最大字节数。如果传入消息超过此最大值,则会引发该消息。IOError异常,并且无法对该连接进行进一步读取。如果连接的另一端已关闭,并且没有更多数据,则将导致EOFError异常。

  • send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,并且size要发送的字节数。生成的数据以单条消息的形式发送,然后调用c.recv_bytes()要接收的函数

  • recv_bytes_into(buffer [, offset]):接收完整的字节消息并将其保存在buffer对象,该对象支持可写缓冲区接口(即,bytearray对象或类似对象)。offset指定消息在缓冲区中放置的字节移位。返回值是接收到的字节数。如果消息长度大于可用缓冲区空间,则会抛出BufferTooShort异常。

使用示例:

from multiprocessing import Process, Pipe

import time

# 一种子流程执行方法

def f(Subconn):

    time.sleep(1)

    Subconn.send("吃了吗")

    print("来自父亲的问候:", Subconn.recv())

    Subconn.close()

if __name__ == "__main__":

    parent_conn, child_conn = Pipe()  # 创建管道的两端

    p = Process(target=f, args=(child_conn,))  # 创建子流程

    p.start()

    print("我儿子的问候:", parent_conn.recv())

    parent_conn.send("嗯")

Manager(用于资源共享)

Manager()返回的manager该对象控制一个server进程,它包含python对象可以由其他进程传递proxies去拜访。从而实现多进程数据通信和安全。Manager模块常与Pool模块一起使用。

Manager支持的类型包括list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

管理器是一个独立子进程,其中真实对象作为服务器存在并运行。其他进程通过使用作为客户端运行的代理来访问共享对象。Manager()是BaseManager,返回一个开始的SyncManager()实例,这些实例可用于创建共享对象并返回访问它们的代理。

BaseManager ,创建管理服务器的基类

施工方法:BaseManager([address[, authkey]])

  • address:(hostname,port),指定服务器的URL地址。默认情况下,您只需分配一个空闲端口。

  • authkey:连接到服务器的客户端的身份验证,默认为current_process().authkey的值

示例方法:

  • start([initializer[, initargs]]):启动一个单独的子进程,并在该子进程中启动管理器服务器

  • get_server():获取服务器对象

  • connect():连接管理器对象

  • shutdown():关闭管理器对象,只能调用start()在方法之后调用

实例属性:

  • address:只读属性,即管理服务器正在使用的地址。

SyncManager 以下类型不是进程安全类型,需要锁定..

示例方法:

  • Array(self,*args,**kwds)

  • BoundedSemaphore(self,*args,**kwds)

  • Condition(self,*args,**kwds)

  • Event(self,*args,**kwds)

  • JoinableQueue(self,*args,**kwds)

  • Lock(self,*args,**kwds)

  • Namespace(self,*args,**kwds)

  • Pool(self,*args,**kwds)

  • Queue(self,*args,**kwds)

  • RLock(self,*args,**kwds)

  • Semaphore(self,*args,**kwds)

  • Value(self,*args,**kwds)

  • dict(self,*args,**kwds)

  • list(self,*args,**kwds)

使用示例:

import multiprocessing

def f(x, arr, l, d, n):

    x.value = 3.14

    arr[0] = 5

    l.append(Hello)

    d[1] = 2

    n.a = 10

if __name__ == __main__:

    server = multiprocessing.Manager()

    x = server.Value(d, 0.0)

    arr = server.Array(i, range(10))

    l = server.list()

    d = server.dict()

    n = server.Namespace()

    proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))

    proc.start()

    proc.join()

    print(x.value)

    print(arr)

    print(l)

    print(d)

    print(n)

同步子流程模块

Lock(互斥锁)

Lock锁的作用是在多个进程需要访问共享资源时避免访问冲突。锁定确保了当多个进程修改同一条数据时,同一时间只能有一次修改,即串行修改,这牺牲了速度,但确保了数据安全。Lock包含两种状态-锁定和解锁,以及两种基本方法。

施工方法:Lock()

示例方法:

  • acquire([timeout]): 使线程进入同步阻塞状态,并尝试获取锁。

  • release(): 解开锁。线程在使用前必须已锁定,否则将引发异常。

使用示例:

from multiprocessing import Process, Lock

def l(lock, num):

    lock.acquire()

    print("Hello Num: %s" % (num))

    lock.release()

if __name__ == __main__:

    lock = Lock()  # 必须将其定义为全局

    for num in range(20):

        Process(target=l, args=(lock, num)).start()

RLock(可重入互斥锁(相同的进程可以多次获取它,而不会导致阻塞。)

RLock(可重入锁)是可由同一线程多次请求的同步指令。RLock使用“拥有线程”和“递归级别”的概念,当处于锁定状态时,RLock由一根线拥有。有RLock的线程可以再次调用。acquire(),您需要在释放锁时进行呼叫。release()相同的次数。可以认为,RLock包含锁定的池和初始值0每个成功调用的计数器。 acquire()/release(),柜台将+1/-1,为0锁被解锁了。

施工方法:RLock()

示例方法:

  • acquire([timeout]):同Lock

  • release(): 同Lock

Semaphore(信号量)

信号量是一种更高级的锁定机制。与Lock对象内部的锁标识不同,信号量内部有一个计数器,只有当占用信号量的线程数超过信号量时,线程才会被阻塞。这允许多个线程同时访问相同的代码区。例如,厕所有3一个坑,最多只允许3当一个人上厕所时,后面的人只能等里面的人出来才能进去。如果指定了信号量3,然后来一个人拿锁,数一数1,当计数相等时3届时,所有落在后面的人都需要等待。一旦释放,某人就可以获得一把锁。

施工方法:Semaphore([value])

  • value:设置信号量,默认为1

示例方法:

  • acquire([timeout]):同Lock

  • release(): 同Lock

使用示例:

from multiprocessing import Process, Semaphore

import time, random

def go_wc(sem, user):

    sem.acquire()

    print(%s 占据一个坑 % user)

    time.sleep(random.randint(0, 3))

    sem.release()

    print(user, OK)

if __name__ == __main__:

    sem = Semaphore(2)

    p_l = []

    for i in range(5):

        p = Process(target=go_wc, args=(sem, user%s % i,))

        p.start()

        p_l.append(p)

    for i in p_l:

        i.join()

Condition(条件变量)

可以把Condition被理解为高级锁,它提供了更好的Lock, RLock更高级的功能允许我们控制复杂的线程同步问题。Condition在内部维护锁对象(缺省值为RLock),可以在其中创建Condigtion当对象作为参数传入时。Condition也提供了acquire, release方法,其含义与锁的acquire, release该方法是一致的,实际上它只是对内部锁对象的对应方法的简单调用。Condition还提供了其他方法。

施工方法:Condition([lock/rlock])

  • 可以传递一个Lock/RLock实例是给构造方法的,否则它会自己生成一个。RLock实例。

示例方法:

  • acquire([timeout]):先进行acquire,然后判断一些条件。如果不满足条件wait

  • release():释放 Lock

  • wait([timeout]): 调用此方法将导致线程进入Condition的等待池等待通知,并解开锁。线程在使用前必须已锁定,否则将引发异常。处于wait状态线程在收到通知后将重新判断条件。

  • notify(): 调用此方法将从等待池中挑选一个线程并通知它,接收通知的线程将自动调用它。acquire()尝试获取锁(进入锁池);其他线程仍在池中等待。调用此方法不会释放锁。线程在使用前必须已锁定,否则将引发异常。

  • notifyAll(): 调用此方法将通知等待池中将进入锁池以尝试获取锁的所有线程。调用此方法不会释放锁。线程在使用前必须已锁定,否则将引发异常。

使用示例:

import multiprocessing

import time

def stage_1(cond):

    """perform first stage of work,

    then notify stage_2 to continue

    """

    name = multiprocessing.current_process().name

    print(Starting, name)

    with cond:

        print({} done and ready for stage 2.format(name))

        cond.notify_all()

def stage_2(cond):

    """wait for the condition telling us stage_1 is done"""

    name = multiprocessing.current_process().name

    print(Starting, name)

    with cond:

        cond.wait()

        print({} running.format(name))

if __name__ == __main__:

    condition = multiprocessing.Condition()

    s1 = multiprocessing.Process(name=s1,

                                 target=stage_1,

                                 args=(condition,))

    s2_clients = [

        multiprocessing.Process(

            name=stage_2[{}].format(i),

            target=stage_2,

            args=(condition,),

        )

        for i in range(1, 3)

    ]

    for c in s2_clients:

        c.start()

        time.sleep(1)

    s1.start()

    s1.join()

    for c in s2_clients:

        c.join()

Event(事件)

Event在内部有一个标志位,最初false。可以使用set()设置它的步骤true;或使用clear()从新设置false;可以使用is_set()检查标志位的状态;另一个最重要的功能是wait(timeout=None),用于阻止当前线程event设置的内部标志位true或者timeout暂停。如果内部标志位为true则wait()函数理解返回。

使用示例:

import multiprocessing

import time

def wait_for_event(e):

    """Wait for the event to be set before doing anything"""

    print(wait_for_event: starting)

    e.wait()

    print(wait_for_event: e.is_set()->, e.is_set())

def wait_for_event_timeout(e, t):

    """Wait t seconds and then timeout"""

    print(wait_for_event_timeout: starting)

    e.wait(t)

    print(wait_for_event_timeout: e.is_set()->, e.is_set())

if __name__ == __main__:

    e = multiprocessing.Event()

    w1 = multiprocessing.Process(

        name=block,

        target=wait_for_event,

        args=(e,),

    )

    w1.start()

    w2 = multiprocessing.Process(

        name=nonblock,

        target=wait_for_event_timeout,

        args=(e, 2),

    )

    w2.start()

    print(main: waiting before calling Event.set())

    time.sleep(3)

    e.set()

    print(main: event is set)

其他内容

multiprocessing.dummy 模块与 multiprocessing 模块之间的差异:dummy 模块是多线程的,而 multiprocessing 是多进程的, api 这一切都是普遍存在的。它们都可以很容易地在多线程和多进程之间切换代码。multiprocessing.dummy通常在IO可以使用场景,例如通过以下方式引入线程池。

from multiprocessing.dummy import Pool as ThreadPool

multiprocessing.dummy与早期的threading,不同的点似乎在多核CPU接下来,只绑定一个核(细节尚未验证)。

参考文档:

Python并发之concurrent.futures

Python标准库为我们提供了threading和multiprocessing该模块编写相应的多线程。/多进程代码。从…Python3.2一开始,标准库为我们提供了concurrent.futures模块,该模块提供ThreadPoolExecutor和ProcessPoolExecutor两个类,实现threading和multiprocessing编写线程池的更高级抽象。/进程池提供直接支持。concurrent.futures基本模块是executor和future。

Executor

Executor是不能直接使用的抽象类。它为特定的异步执行定义了一些基本方法。ThreadPoolExecutor和ProcessPoolExecutor继承了Executor分别用于创建线程池和进程池的代码。

ThreadPoolExecutor对象

ThreadPoolExecutor类是Executor使用线程池执行异步调用的子类。

class concurrent.futures.ThreadPoolExecutor(max_workers)

使用max_workers执行异步调用的线程池数量。

ProcessPoolExecutor对象

ThreadPoolExecutor类是Executor使用进程池执行异步调用的子类。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

使用max_workers在以下情况下执行异步调用的进程池数量max_workers为None使用机器的处理器数量(例如,4核机器max_worker配置为None,然后使用4(进程的异步并发)。

submit()方法

Executor中定义了submit()方法时,此方法的作用是提交可执行回调。task,并返回一个future实例。future该对象表示给定的调用。

Executor.submit(fn, *args, **kwargs)

  • fn:需要异步执行的函数

  • *args, **kwargs:fn参数

使用示例:

from concurrent import futures

def test(num):

    import time

    return time.ctime(), num

with futures.ThreadPoolExecutor(max_workers=1) as executor:

    future = executor.submit(test, 1)

    print(future.result())

map()方法

除了submit,Exectuor它还为我们提供了map方法,该方法返回一个map(func, *iterables)迭代器,则迭代器中的回调执行返回的结果是有序的。

Executor.map(func, *iterables, timeout=None)

  • func:需要异步执行的函数

  • *iterables:可重复对象,如列表等。每次func执行,所有这些都来自iterables将参数传入。

  • timeout:设置每个异步操作的超时时间。timeout的值可以是int或float,如果操作超时,它将返回raisesTimeoutError;如果未指定timeout参数,不设置加班。

使用示例:

from concurrent import futures

def test(num):

    import time

    return time.ctime(), num

data = [1, 2, 3]

with futures.ThreadPoolExecutor(max_workers=1) as executor:

    for future in executor.map(test, data):

        print(future)

shutdown()方法

释放系统资源,在Executor.submit()或 Executor.map()在异步操作后调用。使用with语句可以避免显式调用此方法。

Executor.shutdown(wait=True)

Future

Future它可以理解为在未来完成的操作,这是异步编程的基础。通常,我们会表演io运营、访问url(下图)将在等待结果返回之前阻止,cpu不能做其他的事情,而且Future的引入帮助我们在等待期间完成其他操作。

Future类封装了可调用的异步执行。Future 实例通过 Executor.submit()方法创建。

  • cancel():尝试取消呼叫。如果调用当前正在执行且无法取消,则该方法将返回False否则,调用将被取消,方法将返回True。

  • cancelled():如果呼叫成功取消,则返回True。

  • running():如果调用当前正在执行且无法取消,则返回True。

  • done():如果呼叫成功取消或结束,则返回True。

  • result(timeout=None):返回调用返回的值。如果调用尚未完成,则此方法将等待超时秒。如果呼叫未在超时秒数内完成,则会有一个Futures.TimeoutError将报出。timeout它可以是整形或浮点值,如果timeout未指定或None,等待的时间是无限的。如果futures还没完工就被取消了,然后 CancelledError 这会被报道的。

  • exception(timeout=None):返回调用引发的异常。如果调用未完成,则该方法将等待timeout在指定的持续时间内,如果呼叫在该持续时间后仍未完成,则会报告超时错误。futures.TimeoutError。timeout它可以是整形或浮点值,如果timeout未指定或None,等待的时间是无限的。如果futures还没完工就被取消了,然后 CancelledError 这会被报道的。如果调用完成并且无异常报出,返回None.

  • add_done_callback(fn):将被调用fn捆绑到future上,当Future取消或终止,fn作为future将调用的唯一参数。如果future已完全运行或已取消,fn将立即被呼叫。

  • wait(fs, timeout=None, return_when=ALL_COMPLETED)

    • 等待fs提供的 Future 实例(possibly created by different Executor instances) 操作结束。返回一个命名的2元集合,子表表示已完成和未完成

    • return_when 指示函数应在何时返回。其值必须为下列值之一:

      • FIRST_COMPLETED :函数在任何future结束或取消时返回。

      • FIRST_EXCEPTION :在任何future因为如果不是,则在结束时返回异常future上报错误,效果相同

      • ALL_COMPLETED :功能齐全future它不会回来,直到它结束。

  • as_completed(fs, timeout=None):该参数为 Future 实例列表中,返回值是迭代器,运行结束后输出。 Future实例 。

使用示例:

from concurrent.futures import ThreadPoolExecutor, wait, as_completed

from time import sleep

from random import randint

def return_after_5_secs(num):

    sleep(randint(1, 5))

    return "Return of {}".format(num)

pool = ThreadPoolExecutor(5)

futures = []

for x in range(5):

    futures.append(pool.submit(return_after_5_secs, x))

print(1)

for x in as_completed(futures):

    print(x.result())

print(2)

参考链接:

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除

热门