python并发从0到1 | 宜武汇-ag真人国际厅网站

[toc]

python并发编程分三个方面:多线程(threading)、多进程(multiprocessing)、多协程(asynico)

cpu密集型计算:压缩/解压缩、加密解密、正则表达式搜索

io密集型计算:文件处理、网络爬虫、读写数据库

对比

  • 进程:

    • 优点:可以实现并行,且只有多进程可以实现并行
    • 缺点:占用资源多,可启动数目最少
  • 线程:

    • 占用资源少,轻量级
    • python的线程是无法并行的(占用多个cpu),只能进行并发
    • 切换线程也是有开销的。
    • 适合io密集型运算、同时运行任务不多(线程可启动数量也是有限制的)
  • 协程:

    • 优点:内存开销最小,可启动数量最多

    • 缺点:支持的库比较少,代码复杂,例如爬虫不支持,所以想用多协程爬取的话,可以用aiohttp,不能用requests

    • 适用于:io密集型、超多任务运行

关系

  • 一个进程中可以启动很多线程
  • 一个线程中可以启动很多协程

怎样选择

  • io密集型运算优先选择多进程
  • 若满足三点:1、需要超多任务量 2、有现成协程库支持 3、代码复杂度可以接受,则选择协程,否则选择线程

python慢的原因

两个原因

  • 是解释型语言,边解释边执行
  • gil,无法利用多核cpu

gil是什么,为什么有gil

全局解释器锁,是计算机程序设计语言解释器用于==同步==线程的一种机制,它使得任何时刻只有一个线程在运行

python设计初期为了解决线程并发的问题引入了gil,但是现在很难去除,本质是一种锁,它的好处在于简化了python对共享资源的管理

怎样规避gil带来的限制

  • io期间线程会释放gil,实现cpu和io的并行,因此gil的存在对于io密集型计算是友好的,但是对cpu密集型则会拖慢速度
  • 利用multiprocessing,可以利用多核cpu的优势

threading库

  • threading.active_count():返回当前存活的threading.thread线程对象数量,等同于len(threading.enumerate())。
  • threading.current_thread():返回此函数的调用者控制的threading.thread线程对象。如果当前调用者控制的线程不是通过threading.thread创建的,则返回一个功能受限的虚拟线程对象。
  • threading.get_ident():返回当前线程的线程标识符。注意当一个线程退出时,它的线程标识符可能会被之后新创建的线程复用。
  • threading.enumerate():返回当前存活的threading.thread线程对象列表。
  • threading.main_thread():返回主线程对象,通常情况下,就是程序启动时python解释器创建的threading._mainthread线程对象。
  • threading.stack_size([size]):返回创建线程时使用的堆栈大小。也可以使用可选参数size指定之后创建线程时的堆栈大小,size可以是0或者一个不小于32kib的正整数。如果参数没有指定,则默认为0。如果系统或者其他原因不支持改变堆栈大小,则会报runtimeerror错误;如果指定的堆栈大小不合法,则会报valueerror,但并不会修改这个堆栈的大小。32kib是保证能解释器运行的最小堆栈大小,当然这个值会因为系统或者其他原因有限制,比如它要求的值是大于32kib的某个值,只需根据要求修改即可。

线程对象:thread类

守护线程:只有所有守护线程都结束,整个python程序才会退出,但并不是说python程序会等待守护线程运行完毕,相反,当程序退出时,如果还有守护线程在运行,程序会去强制终结所有守护线程,当守所有护线程都终结后,程序才会真正退出。可以通过修改daemon属性或者初始化线程时指定daemon参数来指定某个线程为守护线程。

非守护线程:一般创建的线程默认就是非守护线程,包括主线程也是,即在python程序退出时,如果还有非守护线程在运行,程序会等待直到所有非守护线程都结束后才会退出。

注:守护线程会在程序关闭时突然关闭(如果守护线程在程序关闭时还在运行),它们占用的资源可能没有被正确释放,比如正在修改文档内容等,需要谨慎使用。

构造方法:

thread(group=none,target=none,name=none,arg=(),kwargs=none,*,daemon=none)

  • group: 线程组,目前还没有实现,库引用中必须是none

  • target: 要执行的方法

  • name: 线程名

  • args/kwargs: 要传入方法的参数

  • daemon:

    ​ 默认为false(不适用于idle的交互模式或脚本运行模式,因为交互模式下的主线程只有退出python时才终止)

    • 当子线程的daemon属性为false时,主线程结束时会检测子线程是否结束,如果子线程尚未完成,则主线程会等待子线程完成后再退出
    • 当子线程的daemon属性为true时,主线程运行结束时不对子线程进行检查而直接退出,同时子线程随主线程一起结束而不论是否运行完成

实例方法:

  • isalive(): 返回线程是否在运行,正在运行指启动后、终止前

  • getname(): 获取线程名

  • isdaemon(): 获取是否为后台线程

  • join(timeout=none): ==阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout,说人话就是会把程序卡在这里,直到这个用了join的线程执行结束才可以执行其他线程,像join(5)就是等待这个线程运行5秒,不加参数就是一直等他运行结束。==

    这里也可以看出join也有一定的==同步==的作用

  • setdaemon(): 设置为后台线程

  • setname(name): 设置线程名

  • start(): 启动线程

锁对象:lock & 递归锁对象:rlock类

由于线程的随机调度:某线程可能在执行n条后,cpu接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。

实例方法:

  • acquire([timeout]): 尝试获得锁定,使线程进入同步阻塞状态。
  • release(): 释放锁,使用前线程必须已获得锁定,否则抛出异常。

lock属于全局,重复锁定会产生死锁;rlock属于线程,可重复施加锁,需要执行相同次数的锁释放。

import threading,time gl_num=0 lock=threading.rlock() def action(): lock.acquire() global gl_num gl_num =1 time.sleep(1) print('gl_num') lock.release() for i in range(10): t=threading.thread(target=action) t.start() 

条件变量对象:condition类

condition通常与一个锁关联,需要在多个contidion中共享一个锁时,可以传递一个lock/rlock实例给构造方法,否则它将自动生产一个rlock实例。
可以认为,除了lock带有的锁定池外,condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyall()通知;得到通知后线程进入锁定池等待锁定。

实例方法:

  • acquire([timeout])/release():调用关联的锁的相应方法
  • wait([timeout]):调用这个方法将使线程进入condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常
  • notify():调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定,其他线程仍然在等待池中
    -notifyall():调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定

事件对象:event类

event是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。event内置了一个为false的标志,当调用set()时设为true,调用clear()时重置为false。wait()将阻塞线程至等待阻塞状态。

实例方法:

  • isset(): 当内置标志为true时返回true
  • set(): 将标志设为true,并通知所有处于等待阻塞状态的线程恢复运行状态
  • clear(): 将标志设为false
  • wait([timeout]): 如果标志true将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()

定时器对象:timer类

timer(定时器)是thread的派生类,用于在指定时间后调用一个方法。

构造方法:
timer(interval,function,args=[],kwarg={})

  • interval:指定的时间
  • function:要执行的方法
  • args/kwargs:方法的参数

local类

local是一个小写字母开头的类,用于管理thread-local(线程局部)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。

创建多线程

1、准备一个函数

def my_func(a,b): do_craw(a,b) 

2、创建线程

import threading t=threading.thread(target=myfunc,args=(100,200,)) 

3、启动线程

t.start() 

4、等待结束

t.join() 

多线程爬虫

基础

main.py

import requests urls = [ f"https://test.com/#p{page}"for page in range(1, 51) ] print(urls) def craw(url): r = requests.get(url) print(len(r.text)) craw(urls[0]) 

test.py

import main import threading def single_thread(): for url in main.urls: main.craw(url) def multi_thread(): threads = [] for url in main.urls: threads.append( threading.thread(target=main.craw, args=(url,)) ) for thread in threads: thread.start() for thread in threads: thread.join() '''for thread in threads: thread.join() 的作用是等待每个线程执行结束。在多线程程序中,线程是并行执行的,如果不等待线程执行完成,程序可能会在某个线程还没有完成运行的情况下就结束了。''' single_thread() multi_thread() 

生产者-消费者-队列

进程间通信:队列(queue) 管道(pipe)

queue模块实现了多生产者多消费者队列, 尤其适合多线程编程.

列表也可以用作队列,但是它第一个元素移出以后后面的数据都需要向前移动,导致效率很低

queue类中实现了所有需要的锁原语(这句话非常重要), queue模块实现了三种类型队列:

  • fifo(先进先出)队列, 第一加入队列的任务, 被第一个取出
  • lifo(后进先出)队列,最后加入队列的任务, 被第一个取出(操作类似与栈, 总是从栈顶取出, 这个队列还不清楚内部的实现)
  • priorityqueue(优先级)队列, 保持队列数据有序, 最小值被先取出(在c 中我记得优先级队列是可以自己重写排序规则的, python不知道可以吗)

==三个模块==

import queue #类 queue.queue(maxsize = 0) #构造一个fifo队列,maxsize设置队列大小的上界, 如果插入数据时, 达到上界会发生阻塞, 直到队列可以放入数据. 当maxsize小于或者等于0, 表示不限制队列的大小(默认) queue.lifoqueue(maxsize = 0) #构造一lifo队列,maxsize设置队列大小的上界, 如果插入数据时, 达到上界会发生阻塞, 直到队列可以放入数据. 当maxsize小于或者等于0, 表示不限制队列的大小(默认) queue.priorityqueue(maxsize = 0) #构造一个优先级队列,,maxsize设置队列大小的上界, 如果插入数据时, 达到上界会发生阻塞, 直到队列可以放入数据. 当maxsize小于或者等于0, 表示不限制队列的大小(默认). 优先级队列中, 最小值被最先取出 #异常 queue.empty #当调用非阻塞的get()获取空队列的元素时, 引发异常 queue.full #当调用非阻塞的put()向满队列中添加元素时, 引发异常 

queue

import queue q=queue.queue()#创建queue queue.empty() #如果队列为空, 返回true(注意队列为空时, 并不能保证调用put()不会阻塞); 队列不空返回false(不空时, 不能保证调用get()不会阻塞) queue.full() #如果队列为满, 返回true(不能保证调用get()不会阻塞), 如果队列不满, 返回false(并不能保证调用put()不会阻塞) queue.put(item[, block[, timeout]]) #向队列中放入元素, 如果可选参数block为true并且timeout参数为none(默认), 为阻塞型put(). 如果timeout是正数, 会阻塞timeout时间并引发queue.full异常. 如果block为false为非阻塞put queue.put_nowait(item) #等价于put(itme, false) queue.get([block[, timeout]]) #移除列队元素并将元素返回, block = true为阻塞函数, block = false为非阻塞函数. 可能返回queue.empty异常 queue.get_nowait() #等价于get(false) queue.task_done() #在完成一项工作之后,queue.task_done()函数向任务已经完成的队列发送一个信号 queue.join() #实际上意味着等到队列为空,再执行别的操作 

线程安全 lock

线程安全是指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成

由于线程的执行会随时发生切换,造成不可预料的结果,出现线程不安全

用法1:try-finally模式

import threading lock = threading.lock() lock.acquire() try: do something finally: lock.release() 

用法2:with模式

import threading lock = threading.lock() with lock: do something 

线程不安全的代码:

多线程并发的时候可能出现余额扣到负数的情况

import threading class account: def __init__(self, balance): self.balance = balance def draw(acc, amount): if acc.balance >= amount: print(threading.current_thread().name, "取钱成功") acc.balance -= amount print(threading.current_thread().name, "余额", acc.balance) else: print(threading.current_thread().name, "取钱失败,余额不足") if __name__ == "__main__": account = account(1000) ta = threading.thread(name="ta", target=draw, args=(account, 800)) tb = threading.thread(name="tb", target=draw, args=(account, 800)) tc = threading.thread(name="tc", target=draw, args=(account, 800)) td = threading.thread(name="td", target=draw, args=(account, 800)) te = threading.thread(name="te", target=draw, args=(account, 800)) ta.start() tb.start() tc.start() td.start() te.start() 

而在draw函数加一个sleep,则100%触发,因为sleep会阻塞线程且发生线程切换

def draw(acc, amount): if acc.balance >= amount: time.sleep(0.1) print(threading.current_thread().name, "取钱成功") acc.balance -= amount print(threading.current_thread().name, "余额", acc.balance) else: print(threading.current_thread().name, "取钱失败,余额不足") 

引进lock就安全了:

import threading import time lock = threading.lock() class account: def __init__(self, balance): self.balance = balance def draw(acc, amount): with lock: if acc.balance >= amount: time.sleep(0.1) print(threading.current_thread().name, "取钱成功") acc.balance -= amount print(threading.current_thread().name, "余额", acc.balance) else: print(threading.current_thread().name, "取钱失败,余额不足") if __name__ == "__main__": account = account(1000) ta = threading.thread(name="ta", target=draw, args=(account, 800)) tb = threading.thread(name="tb", target=draw, args=(account, 800)) tc = threading.thread(name="tc", target=draw, args=(account, 800)) td = threading.thread(name="td", target=draw, args=(account, 800)) te = threading.thread(name="te", target=draw, args=(account, 800)) ta.start() tb.start() tc.start() td.start() te.start() 

线程池 threadpoolexecutor

线程池的基类是 concurrent.futures 模块中的 executor,executor 提供了两个子类,即 threadpoolexecutor 和 processpoolexecutor,其中 threadpoolexecutor 用于创建线程池,而 processpoolexecutor 用于创建进程池。

线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

用法

==exectuor 提供了如下常用方法==:

  • submit(fn, args, **kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, iterables, timeout=none, chunksize=1):该函数类似于全局函数 map(func, iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=true):关闭线程池。

==future 提供了如下方法==:(也就是submit打头,threadpoolexecutor.submit()方法将返回一个future对象)

  • cancel():取消该 future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 false;否则,程序会取消该任务,并返回 true。

  • cancelled():返回 future 代表的线程任务是否被成功取消。

  • running():如果该 future 代表的线程任务正在执行、不可被取消,该方法返回 true。

  • done():如果该 funture 代表的线程任务被成功取消或执行完成,则该方法返回 true。

  • ==result==(timeout=none):获取该 future 代表的线程任务最后返回的结果。如果 future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。

  • exception(timeout=none):获取该 future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 none。

  • add_done_callback(fn):为该 future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

    as_completed()方法用于将线程池返回的future对象按照线程完成的顺序排列,不加也可以,不加则返回的顺序为按线程创建顺序返回

    with语句将自动关闭线程池,也就是自动执行shutdown方法。

在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

步骤

  1. 调用 threadpoolexecutor 类的构造器创建一个线程池。
  2. 定义一个普通函数作为线程任务。
  3. 调用 threadpoolexecutor 对象的 submit() 方法来提交线程任务。
  4. 当不想提交任何任务时,调用 threadpoolexecutor 对象的 shutdown() 方法来关闭线程池。

map 和 submit对比

  • 易用性

map()方法比较容易使用,它的参数是一个==可迭代对象==(如列表)和一个函数,函数将被应用于可迭代对象中的每个元素,然后返回一个生成器对象,生成器对象可以逐个访问结果。submit()方法的使用要稍微复杂一点,需要单独执行每个线程执行任务,并使用future对象来管理和访问结果。

  • 控制方法

使用map()方法时,线程的数量是由线程池中的工作线程数量决定的。如果你想要更细粒度的控制,可以使用submit()方法,使用max_workers参数来指定线程池的大小,使用shutdown()方法来关闭线程池。

  • 访问方法

使用map()方法时,无法访问单独的线程和它们的结果,只能访问生成器对象中的一个接一个的结果。使用submit()方法时,可以访问每个线程的状态,可以使用future对象的方法来检查和访问线程结果。例如,可以使用done()方法来检查线程是否完成,使用result()方法来访问线程的返回值,使用exception()方法来访问线程的异常。

总的来说,map()方法更简单易用,并且适用于处理一组数据集。submit()方法更加灵活,允许你更好地控制线程池,并且可以访问单个线程状态和结果。

线程池原理 好处

新建线程需要分配资源、终止线程需要回收资源,如果可以重用线程,则可以减去新建/终止的开销

  • 提升性能:减去了大量新建、终止线程的开销,重用了线程资源
  • 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
  • 防御功能:能有效避免系统因为创建线程过多,从而导致系统负荷过大相应变慢等问题
  • 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁

使用方法示例

threadpoolexecutor()构造参数:

  • max_workers 设置线程池中最多能同时运行的线程数目。
  • thread_name_prefix 线程名字前缀
  • initializer 在每个工作线程启动之前,执行初始化函数,如果没有指定,默认为none。
  • initargs 传递给初始化函数的参数元组,如果没有指定,默认为空元组()。
from concurrent.futures import threadpoolexecutor,as_completed # 用法1:map函数,注意map的结果和入参顺序是对应的 with threadpoolexecutor() as pool: results = pool.map(func_name,args) for result in results: print(result) # 用法2:future模式,更强大,注意如果用as_completed顺序是线程执行完成的顺序 with threadpoolexecutor() as pool: futures = [pool.submit(func_name,args) for args in urls] # 2.1用法 for future in futures: print(future.result()) # 2.2用法 for future in as_completed(futures): print(future.result()) 

代码示例

from concurrent.futures import threadpoolexecutor import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = threadpoolexecutor(max_workers=2) # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # done方法用于判定某个任务是否完成 print(task1.done()) # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功 print(task2.cancel()) time.sleep(4) print(task1.done()) # result方法可以获取task的执行结果 print(task1.result()) # 执行结果 # false # 表明task1未执行完成 # false # 表明task2取消失败,因为已经放入了线程池中 # get page 2s finished # get page 3s finished # true # 由于在get page 3s finished之后才打印,所以此时task1必然完成了 # 3 # 得到task1的任务返回值 

web服务使用线程池加速

1)threaded : 多线程支持,默认为false,即不开启多线程;

app.run(threaded=true) 

2)processes:进程数量,默认为1.

app.run(processes=true) 

ps:多进程或多线程只能选择一个,不能同时开启

使用示例:

app.run(host=myaddr,port=myport,debug=false,threaded=true) ### threaded开启以后 不需要等队列 threaded=true #或者 #app.run(host=myaddr,port=myport,debug=false,processes=3) ### processes=n 进程数量,默认为1个 

flask加速

from flask import flask from time import sleep from concurrent.futures import threadpoolexecutor # docs https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.threadpoolexecutor executor = threadpoolexecutor(2) app = flask(__name__) @app.route('/jobs') def run_jobs(): # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞 executor.submit(long_task, 'hello', 123) return 'long task running.' def long_task(arg1, arg2): print("args: %s%s!" % (arg1, arg2)) sleep(5) print("task is done!") if __name__ == '__main__': app.run() 

有了多线程为什么还要有多进程?

如果是cpu密集型计算,多线程反而会降低速度(多线程只占用一个处理机,它能实现cpu运算和io同时运行,也就是一个线程进入io后能直接转入下一个线程的执行,但是也因为它只占用一个处理机,同一时刻只能有一个线程进行cpu运算)

多进程知识梳理

对于cpu密集型计算的运行时间对比

web服务使用进程池

和线程池类似,注意的地方就是flask使用进程池要傲娇一些

if __name__ == "__main__": process_pool = processpoolexecutor() app.run() 

单线程爬虫的执行路径

协程:在单线程内实现并发

异步io库:asyncio

import asyncio #获取事件循环 loop = asyncio.get_event_loop() #定义协程 async def myfunc(url): await get_url(url) #创建task列表 tasks = [loop.create_task(myfunc(url)) for url in urls] #执行爬虫事件列表 loop.run_until_complete(asyncio.wait(tasks)) 

==注意:==

要用在异步io编程中,依赖的库必须支持异步io特性

爬虫引用中:requests 不支持异步需要用aiohttp

原文链接:https://xz.aliyun.com/t/12766

网络摘文,本文作者:15h,如若转载,请注明出处:https://www.15cov.cn/2023/08/27/python并发从0到1/

发表评论

邮箱地址不会被公开。 必填项已用*标注

网站地图