3分钟理解梳理清楚Python多线程与多进程
原创在学习Python在这个过程中,我接触到了与多线程编程相关的知识点,这些知识点以前还没有被彻底了解。今天,我将花一些时间尽可能清楚地梳理细节。
线程和进程之间的区别
进程(process)和线程(thread)是操作系统的基本概念,但比较抽象,不容易掌握。关于多进程、多线程,教材中最经典的一句话是 进程是资源分配的最小单位,线程是CPU最小调度单位 线程是程序中的单个顺序控制流。进程中相对独立和可调度的执行单元是系统的独立调度和调度。CPU基本单元是指运行程序的调度单元。在单个程序中同时运行多个线程以完成不同任务称为多线程。
进程和线程的差异
进程是资源配置的基本单位。与进程相关的所有资源都记录在进程控制块中。PCB在……里面。以指示进程拥有或正在使用这些资源。此外,进程也是抢占式处理器的调度单元,抢占式处理器拥有完整的虚拟地址空间。当调度一个进程时,不同的进程具有不同的虚拟地址空间,同一进程中的不同线程共享相同的地址空间。
对应于进程,线程与资源分配无关。它属于某个进程,并与该进程中的其他线程共享该进程的资源。线程仅由相关堆栈(系统堆栈或用户堆栈)寄存器和线程表控制。TCB组成。寄存器可用于存储线程内的局部变量,但不能存储其他线程的相关变量。
通常,一个进程可以包含多个线程,这些线程可以利用该进程拥有的资源。在引入线程的操作系统中,进程通常被用作分配资源的基本单元,而线程被用作独立运行和独立调度的基本单元。
由于线程比进程小,基本上没有系统资源,调度它们的成本会小得多,可以更有效地提高系统中多个程序之间的并发执行程度,从而显著提高系统资源的利用率和吞吐量。
因此,近年来推出的通用操作系统都引入了线程,以进一步提高系统的并发性,并将其作为现代操作系统的重要指标。
线程和进程之间的区别可以归纳为以下4点:
-
地址空间和其他资源(如打开的文件):进程彼此独立,并在同一进程的线程之间共享。一个进程中的线程对其他进程不可见。
-
通信:进程间通信IPC,线程可以直接读写进程数据段(如全局变量)进行通信--需要进程同步和互斥来确保数据一致性。
-
调度和切换:线程上下文切换比进程上下文切换快得多。
-
在多线程OS,该进程不是可执行实体。
多进程与多线程的比较
对比维度
多进程
多线程
总结
数据共享、同步
数据共享很复杂,同步很简单。
数据共享很简单,同步很复杂。
各有优劣
内存、CPU
它占用更多的内存,而且切换起来很复杂,CPU利用率低
更少的内存使用、简单的切换、CPU利用率高
线程占优
创建、销毁、切换
复杂,缓慢
简单快捷
线程占优
编程、调试
简单的编程和调试
编程复杂,调试复杂。
进程占优
可靠性
进程之间不会有相互影响。
线程挂起会导致整个进程挂起。
进程占优
分布式
适用于多核多机,扩展到多机简单。
适用于多核
进程占优
总之,进程和线程也可以比作火车和车厢:
-
线程在进程下运行(简单的车厢不能运行)
-
一个进程可以包含多个线程(一列火车可以有多节车厢)
-
不同流程之间难以共享数据(一列列车上的乘客很难切换到另一列,如换站)
-
同一进程下不同线程之间的数据易于共享(A车厢换到B马车很容易)
-
进程比线程消耗更多的计算机资源(使用多列列车比使用多节车厢消耗更多的资源)
-
进程间不会相互影响,线程挂起会导致整个进程挂起。(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到该趟火车的所有车厢)
-
该流程可扩展到多台机器,最多适用于多核(不同列车可以在多个轨道上行驶,同一列车的车厢不能在不同轨道上)
-
进程使用的内存地址可以被锁定,也就是说,当一个线程使用一些共享内存时,其他线程必须等待它结束才能使用这段内存。(如火车上的卫生间)--“Mutex(mutex)”
-
进程使用的内存地址可以是有限的(例如,火车上的餐厅,最多允许多少人进入,如果满座,则需要在门口等待,只有当有人出来时才能进入)--“信号量(semaphore)”
Python全局解释程序锁GIL
全局解释程序锁(英语:Global Interpreter Lock,缩写GIL),不是Python它的特点,就是在执行上Python解析器(CPython)引入了一个概念。结果CPython是大多数环境中的默认设置Python执行环境。所以在很多人的观念中CPython就是Python,也是理所当然的GIL归结为Python语言上的缺陷。所以CPython实现中的GIL那是什么?我们来看看官方的解释:
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.
Python执行代码的执行Python 虚拟机(也称为解释器主循环,CPython版本)来控制,Python 在设计之初,人们认为只有一个线程在解释器的主循环中执行,即在任何时候,只有一个线程在解释器中运行。正确的Python 虚拟机的访问由全局解释程序锁(GIL)为了控制,正是这个锁可以确保同时只有一个线程在运行。
GIL 有什么福利待遇?简而言之,它在单线程的情况下速度更快,在和 C 在不考虑线程安全问题的情况下组合库会更方便,这也是一个早期阶段。 Python 最常见的应用场景和优势。此外,GIL简化系统的设计CPython的实现使对象模型(包括关键的内置类型,如字典)可以同时隐式访问。锁定全局解释器使其更容易支持多线程,但它也失去了多处理器主机的并行计算能力。
在多线程环境中,Python 该虚拟机按如下方式执行:
-
设置GIL
-
切换到要运行的线程
-
运行到指定数量的字节码指令,或者线程主动放弃控制(可以调用sleep(0))
-
将线程设置为休眠
-
解锁GIL
-
再次重复上述所有步骤
Python3.2前,GIL当前线程满足释放逻辑。IO操作或者ticks计数达到100(ticks可以看到python一种自己的计数器,专门使用GIL,每次释放后都为零,则可以传递此计数 sys.setcheckinterval 调整),松开。因为计算密集型线程正在释放GIL在那之后,我会立即申请。GIL,并且通常在调度其他线程之前重新获取它。GIL,它将导致一旦获得计算密集型线程。GIL,那么它将占据很长一段时间GIL,甚至直到线程执行结束。
Python 3.2开始使用新的GIL。新的GIL该实现使用固定的超时来指示当前线程放弃全局锁。当当前线程持有此锁并且其他线程请求此锁时,当前线程将5锁被强制在毫秒后释放。改进的是在单核的情况下,对于单线程的长期占用。GIL情况有所改善。
在单核CPU打开时,数百次间隔检查只会导致线程切换。在多核中CPU打开时,有一个严重的线条凹凸(thrashing)。每一次发布GIL锁、线程竞争锁和切换线程,这会消耗资源。单核下多线程,每次发布GIL,唤醒的线程可以获得它。GIL锁,所以它可以无缝执行,但在多核下,CPU0释放GIL后,其他CPU上线将会竞争,GIL可能会立即再次出现CPU0得到它,导致其他几个CPU在被唤醒后,线程会被唤醒并等待,直到切换时间,然后进入等待状态,这会导致线程动荡。(thrashing),导致效率较低。
此外,还可以从上述实现机制中推导出,Python多线程配对IO密集型代码CPU密集的代码更友好。
针对GIL应对措施:
-
使用更高版本Python(对GIL机构优化)
-
将多线程替换为多进程(否GIL,但该过程本身会消耗更多资源)
-
指定cpu运行线程(使用affinity模块)
-
使用Jython、IronPython等无GIL解释器
-
全IO多线程仅用于密集型任务。
-
使用关联(高效单线程模式,也称为微线程;通常用于多个进程)
-
使用关键组件C/C++编写为Python扩张,ctypes使Python直接程序调用C该语言编译的动态链接库的导出函数。(with nogil调出GIL限制)
Python多进程包multiprocessing
Python的threading该包主要使用多线程开发,但由于GIL的存在,Python多线程并不是真正的多线程,如果你想充分利用多核CPU资源,大多数情况下需要使用多个进程。在……里面Python 2.6介绍了该版本multiprocessing包,它完全复制了一套threading提供的接口方便了迁移。唯一的区别是它使用多个进程而不是多个线程。每个进程都有自己的独立进程GIL,所以不会有进程之间的冲突。GIL争抢。
借助这个multiprocessing,您可以轻松地完成从单进程到并发执行的转换。multiprocessing支持子进程,通信和共享数据,执行不同形式的同步,并提供Process、Queue、Pipe、Lock等组件。
Multiprocessing这一代人的背景
除了应对Python的GIL此外,它还生产multiprocessing另一个原因是Windows操作系统和Linux/Unix系统不一致。
Unix/Linux该操作系统提供了fork()系统调用,它非常特殊。普通函数,调用一次,返回一次,fork()调用一次,返回两次,因为操作系统会自动复制当前进程(父进程)(子进程)的副本,然后分别在父进程和子进程中返回。子进程始终返回0,并且父进程返回子进程的ID。这样做的原因是父进程可以fork有很多子进程,所以父进程应该写下ID,子进程只需调用getpid()您可以获取父进程ID。
Python的os该该模块已封装常见的系统调用,包括fork,可以在Python在程序中轻松创建子流程:
import os
print(Process (%s) start... % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print(I am child process (%s) and my parent is %s. % (os.getpid(), os.getppid()))
else:
print(I (%s) just created a child process (%s). % (os.getpid(), pid))
上面的代码在Linux、Unix和Mac上的执行结果为:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
有了fork调用时,进程可以在收到新任务时复制子进程以处理新任务。Apache每当有新的http当被要求时,它是fork处理新信息的子流程http请求。
由于Windows没有fork调用,上面的代码在Windows它不能继续运行。结果Python它是跨平台的,自然也应该提供跨平台的多进程支持。multiprocessing模块是多进程模块的跨平台版本。multiprocessing该模块已封装fork()打电话,这样我们就不需要注意了fork()详细情况。结果Windows没有fork因此,呼叫,multiprocessing需要“模拟”出来fork的效果。
multiprocessing常见组件和功能
要创建管理流程模块,请执行以下操作:
-
Process(用于创建进程)
-
Pool(用于创建管理进程池)
-
Queue(用于进程通信、资源共享)
-
Value,Array(用于进程通信、资源共享)
-
Pipe(用于管道通信)
-
Manager(用于资源共享)
同步子流程模块:
-
Condition(条件变量)
-
Event(事件)
-
Lock(互斥锁)
-
RLock(可重入互斥锁(相同的进程可以多次获取它,而不会导致阻塞。)
-
Semaphore(信号量)
接下来,让我们学习如何使用每个组件和函数。
Process(用于创建进程)
multiprocessing该模块提供了一个Process类来表示进程对象。
在multiprocessing,每个进程使用一个Process要表示的类。
施工方法:Process([group [, target [, name [, args [, kwargs]]]]])
-
group:GROUPING,不实际使用,值始终为None
-
target:表示调用对象,即子进程要执行的任务。您可以传入方法名。
-
name:设置子进程的名称
-
args:要传给target函数的位置参数作为元组传入。
-
kwargs:要传给target函数的DICTIONARY参数作为字典传入。
示例方法:
-
start():启动进程并调用p.run()
-
run():调用进程启动时运行的方法。target指定的函数,则必须在自定义类的类中实现此方法。
-
terminate():强制终止进程p,将不会执行任何清洁操作,如果p创建子流程后,该子进程将成为僵尸进程。使用此方法尤其要注意这种情况。如果p锁也被保存,因此它不会被释放,从而导致死锁。
-
is_alive():返回进程是否正在运行。如果p仍在运行,返回True
-
join([timeout]):进程同步,主进程等待子进程完成,然后执行以下代码。线程正在等待p终止(强调:主线程处于相等状态,并且p处于运行状态)。timeout是可选的超时时间(超过该时间,父线程将不再等待子线程并继续执行)。应该强调的是,p.join只能join住start打开流程,但不是join住run开放式流程
物业简介:
-
daemon:缺省值为False,如果已设置True,代表p后台运行的守护进程;当p当的父进程终止时,p它还会终止并设置True后,p您不能创建自己的新流程;必须在p.start()之前设置
-
name:进程的名称
-
pid:进程的pid
-
exitcode:进程正在运行None,如-N,这意味着要发出信号N结束(了解即可)
-
authkey:进程的身份验证密钥。,默认是由os.urandom()随机生成32字符串。此密钥的目的是为涉及网络连接的底层进程间通信提供安全性,只有当它们具有相同的身份验证密钥时才能成功(了解)
使用示例:(注意:在windows中Process()必须放到if name == ‘ main ’:下)
from multiprocessing import Process
import os
def run_proc(name):
print(Run child process %s (%s)... % (name, os.getpid()))
if __name__==__main__:
print(Parent process %s. % os.getpid())
p = Process(target=run_proc, args=(test,))
print(Child process will start.)
p.start()
p.join()
print(Child process end.)
Pool(用于创建管理进程池)
Pool类用于许多需要执行的目标,手动限制进程数太麻烦,如果目标很少且不需要控制进程数,则可以使用Process类。Pool您可以提供指定数量的进程供用户在提交新请求时调用Pool如果池未满,则会创建一个新进程来执行请求,但如果池中的进程数已达到指定的最大值,则请求将一直等待,直到池中的进程结束并重复使用进程池中的进程。
施工方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
-
processes :如果省略,则默认使用要创建的进程数cpu_count()号码被退回了。
-
initializer:每个辅助进程启动时要执行的可调用对象,默认为None。如果initializer是None,则将在开始时调用每个工作进程。initializer(*initargs)。
-
initargs:将被传承下去initializer参数组。
-
maxtasksperchild:在工作进程退出之前可以完成的任务数。完成后,使用新的工作进程来替换原始进程,以释放空闲资源。maxtasksperchild默认是None,这意味着只要Pool如果有一个工作流程,它就会存活下来。
-
context: 用于工作过程开始时的上下文中,通常用于Pool() 或者一个context对象的Pool()方法来创建池,这两种方法都进行了适当的设置。context。
示例方法:
-
apply(func[, args[, kwargs]]):在池进程中执行func(args,*kwargs),然后返回结果。需要强调的是,此操作不会在所有池进程中执行。func功能。如果您想通过不同的参数并发执行func函数,该函数必须从另一个线程调用p.apply()功能或用途p.apply_async()。它挡住了。apply很少使用
-
apply_async(func[, arg[, kwds={}[, callback=None]]]):在池进程中执行func(args,*kwargs),然后返回结果。这种方法的结果是AsyncResult类的一个实例,callback是接收输入参数的可调用对象。什么时候func当结果可用时,理解就通过了callback。callback禁止任何阻塞操作,否则会收到其他异步操作的结果。它是非阻塞的。
-
map(func, iterable[, chunksize=None]):Pool类中的map方法,使用内置的map函数的使用行为基本相同,它会阻塞进程,直到返回结果。请注意,尽管第二个参数是迭代器,但在实际使用中,程序不会运行子进程,直到整个队列准备就绪。
-
map_async(func, iterable[, chunksize=None]):map_async与map的关系同apply与apply_async
-
imap():imap 与 map不同的是,map但是当所有进程已经被执行并且结果已经被返回时,imap()是立即返回一个iterable可重复使用的对象。
-
imap_unordered():不能保证返回结果的顺序与进程添加的顺序一致。
-
close():关闭进程池以防止进一步操作。如果所有操作继续挂起,它们将在工作进程终止之前完成。
-
join():等待所有工作进程退出。此方法仅可用于close()或teminate()在呼叫之后,让它不再接受新的Process。
-
terminate():结束工作流程,不再处理未处理的任务。
方法apply_async()和map_async()的返回值为AsyncResul的实例obj。该实例具有以下方法:
-
get():返回结果,必要时等待结果到达。timeout是可选的。如果在指定的时间内没有到达,将抛出例外。如果在远程操作中引发异常,则在调用此方法时将再次引发该异常。
-
ready():如果调用完成,则返回True
-
successful():如果调用完成并且没有抛出异常,则返回True如果在结果准备好之前调用此方法,则会引发异常。
-
wait([timeout]):等待结果出来。
-
terminate():立即终止所有工作进程,而不执行任何清理或结束任何挂起的工作。如果p垃圾回收时,将自动调用此函数
使用示例:
# -*- coding:utf-8 -*-
# Pool+map
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = range(100)
pool = Pool(8)
pool.map(test, lists)
pool.close()
pool.join()
# -*- coding:utf-8 -*-
# 异步进程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
pool = Pool(8)
for i in range(100):
For执行循环中的步骤:
(1)循环遍历,将100将子进程添加到进程池(相对于父进程的块)
(2)每次执行8子进程,等待子进程执行,立即启动新的子进程。(相对于父进程不阻塞)
apply_async为异步进程池写入。异步是指启动进程的进程,以及父进程本身的执行(print)是异步的,而For将子进程添加到循环中的进程池的进程与父进程本身的执行同步。
pool.apply_async(test, args=(i,)) # 维护实施的流程总数为8当一个进程被执行时,一个新的进程被启动。.
print("test")
pool.close()
pool.join()
# -*- coding:utf-8 -*-
# 异步进程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
pool = Pool(8)
for i in range(100):
实际测试发现,for在循环中执行以下步骤:
(1)遍历100对象,将子进程放入进程池中。
(2)执行此子进程,子进程完成后,在执行前将子进程放入进程池中。(一次只执行一个子进程)
for循环完成后,将再次执行。print函数。
pool.apply(test, args=(i,)) # 维护实施的流程总数为8当一个进程被执行时,一个新的进程被启动。.
print("test")
pool.close()
pool.join()
Queue(用于进程通信、资源共享)
在使用多进程的过程中,最好不要使用共享资源。普通全局变量不能由缝制工艺共享,只能通过Multiprocessing该组件构建的数据结构可以共享。
Queue 是用于创建进程间资源共享队列的类,使用Queue可以实现多个进程之间的数据传输功能(缺点:仅适用Process类,不能在Pool在进程池中使用)。
施工方法:Queue([maxsize])
- maxsize是队列中允许的最大项目数,如果省略,则没有大小限制。
示例方法:
-
put():用于将数据插入队列。put该方法还有两个可选参数:blocked和timeout。如果blocked为True(默认)和timeout为正值,则该方法将timeout队列剩余空间之前的指定时间。如果超时,则将抛出Queue.Full例外。如果blocked为False,但该Queue满的,将立即抛出Queue.Full异常。
-
get():您可以从队列中读取和删除元素。get该方法有两个可选参数:blocked和timeout。如果blocked为True(默认)和timeout如果它是正的,则在等待时间内不获取任何元素并将被抛出。Queue.Empty例外。如果blocked为False,有两种情况,如果Queue如果值可用,则立即返回该值,否则,如果队列为空,则立即引发该值Queue.Empty例外。如果你不想待在empty在引发异常时,请使blocked为True或将所有参数设置为空。
-
get_nowait():同q.get(False)
-
put_nowait():同q.put(False)
-
empty():当调用此方法时q如果为空,则返回True,结果是不可靠的,例如在返回。True如果将项添加到队列中。
-
full():当调用此方法时q满了就退货True,结果是不可靠的,例如在返回。True如果队列中的项被移走。
-
qsize():返回队列中当前项的正确数量,由于同样的原因,结果不可靠。q.empty()和q.full()一样
使用示例:
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print(Process to write: %s % os.getpid())
for value in [A, B, C]:
print(Put %s to queue... % value)
q.put(value)
time.sleep(random.random())
def read(q):
print(Process to read: %s % os.getpid())
while True:
value = q.get(True)
print(Get %s from queue. % value)
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join() # 等待pw结束
pr.terminate() # pr这个过程是一个死循环,不能等待它的结束,只能强制终止
JoinableQueue 这就像是一个Queue对象,但该队列允许项的用户通知生成器该项已成功处理。通知过程使用共享信号和条件变量来实现。
施工方法:JoinableQueue([maxsize])
- maxsize:队列中允许的最大项目数,如果省略,则没有大小限制。
实例方法
JoinableQueue的实例p除了与Queue除了对象的相同方法外,它还具有:
-
task_done():用户使用此方法发出信号,指示q.get()的退货项目已处理。如果调用此方法的数量超过从队列中移除的项数,则将引发此方法。ValueError异常
-
join():生产者调用此方法进行阻塞,直到处理完队列中的所有项。阻塞将继续,直到调用队列中的每个项目q.task_done()方法
使用示例:
# -*- coding:utf-8 -*-
from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
while True:
res = q.get()
print(消费者得到了它 %s % res)
q.task_done()
def producer(seq, q):
for item in seq:
time.sleep(random.randrange(1,2))
q.put(item)
print(制片人做得很好 %s % item)
q.join()
if __name__ == "__main__":
q = JoinableQueue()
seq = (产品%s % i for i in range(5))
p = Process(target=consumer, args=(q,))
p.daemon = True # 在主线程停止时设置为守护进程p也停下来,但别担心,producer内调用q.join保证了consumer队列中的所有元素都已处理
p.start()
producer(seq, q)
print(主线程)
Value,Array(用于进程通信、资源共享)
multiprocessing 中Value和Array实现原理是在共享内存中创建。ctypes()对象实现数据共享的目的,两种实现方法相似,但选择不同ctypes数据类型。
Value
施工方法:Value((typecode_or_type, args[, lock])
-
typecode_or_type:定义ctypes()可以传递的对象的类型。Type code或 C Type具体对比表如下所示。
-
args:传递给typecode_or_type构造函数的参数
-
lock:默认为True,创建互斥锁以限制该对。Value对象访问,如果传递了锁,则Lock或RLock的实例将用于同步。如果有来电False,Value的实例将不受锁保护,它将不是进程安全的。
typecode_or_type支持类型:
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| b
| signed char | int | 1 |
| B
| unsigned char | int | 1 |
| u
| Py_UNICODE | Unicode character | 2 |
| h
| signed short | int | 2 |
| H
| unsigned short | int | 2 |
| i
| signed int | int | 2 |
| I
| unsigned int | int | 2 |
| l
| signed long | int | 4 |
| L
| unsigned long | int | 4 |
| q
| signed long long | int | 8 |
| Q
| unsigned long long | int | 8 |
| f
| float | float | 4 |
| d
| double | float | 8 |
参考地址:https://docs.python.org/3/library/array.html
Array
施工方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])
-
typecode_or_type:同上
-
size_or_initializer:如果是整数,则确定数组的长度,数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,它的长度决定了数组的长度。
-
kwds:传递给typecode_or_type构造函数的参数
-
lock:同上
使用示例:
import multiprocessing
def f(n, a):
n.value = 3.14
a[0] = 5
if __name__ == __main__:
num = multiprocessing.Value(d, 0.0)
arr = multiprocessing.Array(i, range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
注意:Value和Array只适用于Process类。
Pipe(用于管道通信)
多进程也有一种称为流水线原理的数据传输方法, Queue相同。Pipe您可以在进程之间创建管道并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须生成Process管道在对象之前生成。
施工方法:Pipe([duplex])
- dumplex:默认管道为全双工,如果duplex射成False,conn1只能用于接收,conn2只能用于发送。
示例方法:
-
send(obj):通过连接发送对象。obj是否有任何对象与序列化兼容
-
recv():接收conn2.send(obj)对象已发送。如果没有要接收的消息,recv该方法将始终阻塞。如果连接的另一端已关闭,则recv方法引发EOFError。
-
close():关闭连接。如果conn1垃圾回收时,将自动调用此方法
-
fileno():返回连接使用的整型文件描述符
-
poll([timeout]):如果连接上的数据可用,则返回True。timeout指定等待的最长时间限制。如果省略此参数,该方法将立即返回结果。如果timeout射成None,则该操作将无限期地等待数据到达。
-
recv_bytes([maxlength]):接收c.send_bytes()方法发送一条完整的字节消息。maxlength指定要接收的最大字节数。如果传入消息超过此最大值,则会引发该消息。IOError异常,并且无法对该连接进行进一步读取。如果连接的另一端已关闭,并且没有更多数据,则将导致EOFError异常。
-
send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,并且size要发送的字节数。生成的数据以单条消息的形式发送,然后调用c.recv_bytes()要接收的函数
-
recv_bytes_into(buffer [, offset]):接收完整的字节消息并将其保存在buffer对象,该对象支持可写缓冲区接口(即,bytearray对象或类似对象)。offset指定消息在缓冲区中放置的字节移位。返回值是接收到的字节数。如果消息长度大于可用缓冲区空间,则会抛出BufferTooShort异常。
使用示例:
from multiprocessing import Process, Pipe
import time
# 一种子流程执行方法
def f(Subconn):
time.sleep(1)
Subconn.send("吃了吗")
print("来自父亲的问候:", Subconn.recv())
Subconn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe() # 创建管道的两端
p = Process(target=f, args=(child_conn,)) # 创建子流程
p.start()
print("我儿子的问候:", parent_conn.recv())
parent_conn.send("嗯")
Manager(用于资源共享)
Manager()返回的manager该对象控制一个server进程,它包含python对象可以由其他进程传递proxies去拜访。从而实现多进程数据通信和安全。Manager模块常与Pool模块一起使用。
Manager支持的类型包括list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
管理器是一个独立子进程,其中真实对象作为服务器存在并运行。其他进程通过使用作为客户端运行的代理来访问共享对象。Manager()是BaseManager,返回一个开始的SyncManager()实例,这些实例可用于创建共享对象并返回访问它们的代理。
BaseManager ,创建管理服务器的基类
施工方法:BaseManager([address[, authkey]])
-
address:(hostname,port),指定服务器的URL地址。默认情况下,您只需分配一个空闲端口。
-
authkey:连接到服务器的客户端的身份验证,默认为current_process().authkey的值
示例方法:
-
start([initializer[, initargs]]):启动一个单独的子进程,并在该子进程中启动管理器服务器
-
get_server():获取服务器对象
-
connect():连接管理器对象
-
shutdown():关闭管理器对象,只能调用start()在方法之后调用
实例属性:
- address:只读属性,即管理服务器正在使用的地址。
SyncManager , 以下类型不是进程安全类型,需要锁定..
示例方法:
-
Array(self,*args,**kwds)
-
BoundedSemaphore(self,*args,**kwds)
-
Condition(self,*args,**kwds)
-
Event(self,*args,**kwds)
-
JoinableQueue(self,*args,**kwds)
-
Lock(self,*args,**kwds)
-
Namespace(self,*args,**kwds)
-
Pool(self,*args,**kwds)
-
Queue(self,*args,**kwds)
-
RLock(self,*args,**kwds)
-
Semaphore(self,*args,**kwds)
-
Value(self,*args,**kwds)
-
dict(self,*args,**kwds)
-
list(self,*args,**kwds)
使用示例:
import multiprocessing
def f(x, arr, l, d, n):
x.value = 3.14
arr[0] = 5
l.append(Hello)
d[1] = 2
n.a = 10
if __name__ == __main__:
server = multiprocessing.Manager()
x = server.Value(d, 0.0)
arr = server.Array(i, range(10))
l = server.list()
d = server.dict()
n = server.Namespace()
proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))
proc.start()
proc.join()
print(x.value)
print(arr)
print(l)
print(d)
print(n)
同步子流程模块
Lock(互斥锁)
Lock锁的作用是在多个进程需要访问共享资源时避免访问冲突。锁定确保了当多个进程修改同一条数据时,同一时间只能有一次修改,即串行修改,这牺牲了速度,但确保了数据安全。Lock包含两种状态-锁定和解锁,以及两种基本方法。
施工方法:Lock()
示例方法:
-
acquire([timeout]): 使线程进入同步阻塞状态,并尝试获取锁。
-
release(): 解开锁。线程在使用前必须已锁定,否则将引发异常。
使用示例:
from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire()
print("Hello Num: %s" % (num))
lock.release()
if __name__ == __main__:
lock = Lock() # 必须将其定义为全局
for num in range(20):
Process(target=l, args=(lock, num)).start()
RLock(可重入互斥锁(相同的进程可以多次获取它,而不会导致阻塞。)
RLock(可重入锁)是可由同一线程多次请求的同步指令。RLock使用“拥有线程”和“递归级别”的概念,当处于锁定状态时,RLock由一根线拥有。有RLock的线程可以再次调用。acquire(),您需要在释放锁时进行呼叫。release()相同的次数。可以认为,RLock包含锁定的池和初始值0每个成功调用的计数器。 acquire()/release(),柜台将+1/-1,为0锁被解锁了。
施工方法:RLock()
示例方法:
-
acquire([timeout]):同Lock
-
release(): 同Lock
Semaphore(信号量)
信号量是一种更高级的锁定机制。与Lock对象内部的锁标识不同,信号量内部有一个计数器,只有当占用信号量的线程数超过信号量时,线程才会被阻塞。这允许多个线程同时访问相同的代码区。例如,厕所有3一个坑,最多只允许3当一个人上厕所时,后面的人只能等里面的人出来才能进去。如果指定了信号量3,然后来一个人拿锁,数一数1,当计数相等时3届时,所有落在后面的人都需要等待。一旦释放,某人就可以获得一把锁。
施工方法:Semaphore([value])
- value:设置信号量,默认为1
示例方法:
-
acquire([timeout]):同Lock
-
release(): 同Lock
使用示例:
from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
sem.acquire()
print(%s 占据一个坑 % user)
time.sleep(random.randint(0, 3))
sem.release()
print(user, OK)
if __name__ == __main__:
sem = Semaphore(2)
p_l = []
for i in range(5):
p = Process(target=go_wc, args=(sem, user%s % i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
Condition(条件变量)
可以把Condition被理解为高级锁,它提供了更好的Lock, RLock更高级的功能允许我们控制复杂的线程同步问题。Condition在内部维护锁对象(缺省值为RLock),可以在其中创建Condigtion当对象作为参数传入时。Condition也提供了acquire, release方法,其含义与锁的acquire, release该方法是一致的,实际上它只是对内部锁对象的对应方法的简单调用。Condition还提供了其他方法。
施工方法:Condition([lock/rlock])
- 可以传递一个Lock/RLock实例是给构造方法的,否则它会自己生成一个。RLock实例。
示例方法:
-
acquire([timeout]):先进行acquire,然后判断一些条件。如果不满足条件wait
-
release():释放 Lock
-
wait([timeout]): 调用此方法将导致线程进入Condition的等待池等待通知,并解开锁。线程在使用前必须已锁定,否则将引发异常。处于wait状态线程在收到通知后将重新判断条件。
-
notify(): 调用此方法将从等待池中挑选一个线程并通知它,接收通知的线程将自动调用它。acquire()尝试获取锁(进入锁池);其他线程仍在池中等待。调用此方法不会释放锁。线程在使用前必须已锁定,否则将引发异常。
-
notifyAll(): 调用此方法将通知等待池中将进入锁池以尝试获取锁的所有线程。调用此方法不会释放锁。线程在使用前必须已锁定,否则将引发异常。
使用示例:
import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print(Starting, name)
with cond:
print({} done and ready for stage 2.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print(Starting, name)
with cond:
cond.wait()
print({} running.format(name))
if __name__ == __main__:
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name=s1,
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name=stage_2[{}].format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
Event(事件)
Event在内部有一个标志位,最初false。可以使用set()设置它的步骤true;或使用clear()从新设置false;可以使用is_set()检查标志位的状态;另一个最重要的功能是wait(timeout=None),用于阻止当前线程event设置的内部标志位true或者timeout暂停。如果内部标志位为true则wait()函数理解返回。
使用示例:
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print(wait_for_event: starting)
e.wait()
print(wait_for_event: e.is_set()->, e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print(wait_for_event_timeout: starting)
e.wait(t)
print(wait_for_event_timeout: e.is_set()->, e.is_set())
if __name__ == __main__:
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name=block,
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name=nonblock,
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print(main: waiting before calling Event.set())
time.sleep(3)
e.set()
print(main: event is set)
其他内容
multiprocessing.dummy 模块与 multiprocessing 模块之间的差异:dummy 模块是多线程的,而 multiprocessing 是多进程的, api 这一切都是普遍存在的。它们都可以很容易地在多线程和多进程之间切换代码。multiprocessing.dummy通常在IO可以使用场景,例如通过以下方式引入线程池。
from multiprocessing.dummy import Pool as ThreadPool
multiprocessing.dummy与早期的threading,不同的点似乎在多核CPU接下来,只绑定一个核(细节尚未验证)。
参考文档:
Python并发之concurrent.futures
Python标准库为我们提供了threading和multiprocessing该模块编写相应的多线程。/多进程代码。从…Python3.2一开始,标准库为我们提供了concurrent.futures模块,该模块提供ThreadPoolExecutor和ProcessPoolExecutor两个类,实现threading和multiprocessing编写线程池的更高级抽象。/进程池提供直接支持。concurrent.futures基本模块是executor和future。
Executor
Executor是不能直接使用的抽象类。它为特定的异步执行定义了一些基本方法。ThreadPoolExecutor和ProcessPoolExecutor继承了Executor分别用于创建线程池和进程池的代码。
ThreadPoolExecutor对象
ThreadPoolExecutor类是Executor使用线程池执行异步调用的子类。
class concurrent.futures.ThreadPoolExecutor(max_workers)
使用max_workers执行异步调用的线程池数量。
ProcessPoolExecutor对象
ThreadPoolExecutor类是Executor使用进程池执行异步调用的子类。
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
使用max_workers在以下情况下执行异步调用的进程池数量max_workers为None使用机器的处理器数量(例如,4核机器max_worker配置为None,然后使用4(进程的异步并发)。
submit()方法
Executor中定义了submit()方法时,此方法的作用是提交可执行回调。task,并返回一个future实例。future该对象表示给定的调用。
Executor.submit(fn, *args, **kwargs)
-
fn:需要异步执行的函数
-
*args, **kwargs:fn参数
使用示例:
from concurrent import futures
def test(num):
import time
return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 1)
print(future.result())
map()方法
除了submit,Exectuor它还为我们提供了map方法,该方法返回一个map(func, *iterables)迭代器,则迭代器中的回调执行返回的结果是有序的。
Executor.map(func, *iterables, timeout=None)
-
func:需要异步执行的函数
-
*iterables:可重复对象,如列表等。每次func执行,所有这些都来自iterables将参数传入。
-
timeout:设置每个异步操作的超时时间。timeout的值可以是int或float,如果操作超时,它将返回raisesTimeoutError;如果未指定timeout参数,不设置加班。
使用示例:
from concurrent import futures
def test(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)
shutdown()方法
释放系统资源,在Executor.submit()或 Executor.map()在异步操作后调用。使用with语句可以避免显式调用此方法。
Executor.shutdown(wait=True)
Future
Future它可以理解为在未来完成的操作,这是异步编程的基础。通常,我们会表演io运营、访问url(下图)将在等待结果返回之前阻止,cpu不能做其他的事情,而且Future的引入帮助我们在等待期间完成其他操作。
Future类封装了可调用的异步执行。Future 实例通过 Executor.submit()方法创建。
-
cancel():尝试取消呼叫。如果调用当前正在执行且无法取消,则该方法将返回False否则,调用将被取消,方法将返回True。
-
cancelled():如果呼叫成功取消,则返回True。
-
running():如果调用当前正在执行且无法取消,则返回True。
-
done():如果呼叫成功取消或结束,则返回True。
-
result(timeout=None):返回调用返回的值。如果调用尚未完成,则此方法将等待超时秒。如果呼叫未在超时秒数内完成,则会有一个Futures.TimeoutError将报出。timeout它可以是整形或浮点值,如果timeout未指定或None,等待的时间是无限的。如果futures还没完工就被取消了,然后 CancelledError 这会被报道的。
-
exception(timeout=None):返回调用引发的异常。如果调用未完成,则该方法将等待timeout在指定的持续时间内,如果呼叫在该持续时间后仍未完成,则会报告超时错误。futures.TimeoutError。timeout它可以是整形或浮点值,如果timeout未指定或None,等待的时间是无限的。如果futures还没完工就被取消了,然后 CancelledError 这会被报道的。如果调用完成并且无异常报出,返回None.
-
add_done_callback(fn):将被调用fn捆绑到future上,当Future取消或终止,fn作为future将调用的唯一参数。如果future已完全运行或已取消,fn将立即被呼叫。
-
wait(fs, timeout=None, return_when=ALL_COMPLETED)
-
等待fs提供的 Future 实例(possibly created by different Executor instances) 操作结束。返回一个命名的2元集合,子表表示已完成和未完成
-
return_when 指示函数应在何时返回。其值必须为下列值之一:
-
FIRST_COMPLETED :函数在任何future结束或取消时返回。
-
FIRST_EXCEPTION :在任何future因为如果不是,则在结束时返回异常future上报错误,效果相同
-
ALL_COMPLETED :功能齐全future它不会回来,直到它结束。
-
-
-
as_completed(fs, timeout=None):该参数为 Future 实例列表中,返回值是迭代器,运行结束后输出。 Future实例 。
使用示例:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
print(x.result())
print(2)
参考链接:
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除