WHCSRL 技术网

Python 线程threading

Python多线程适用于I/O密集型

GIL的全称是Global Interpreter Lock(全局解释器锁),为了数据安全,GIL保证同一时间只能有一个线程拿到数据。所以,在python中,同时只能执行一个线程。而IO密集型,多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。所以python多线程对IO密集型代码比较友好。

而CPU密集型(各种循环处理、计算等等),由于计算工作多,计时器很快就会达到阈值,然后触发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python多线程对CPU密集型代码并不友好。

Python多线程的工作过程

Python在使用多线程的时候,调用的是c语言的原生线程。

  1. 拿到公共数据
  2. 申请gil
  3. python解释器调用os原生线程
  4. os操作cpu执行运算
  5. 当该线程执行时间到后,无论运算是否已经执行完,gil都被要求释放
  6. 由其他进程重复上面的过程
  7. 等其他进程执行完后,又会切换到之前的线程(从他记录的上下文继续执行),整个过程是每个线程执行自己的运算,当执行时间到就进行切换(context switch)。

构造函数

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)


调用这个构造函数时,必需带有关键字参数。参数如下:

  1. group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。
  2. target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。
  3. name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数。多个线程可以赋予相同的名称。
  4. args 是用于调用目标函数的参数元组。默认是 ()。
  5. kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。
  6. daemon 参数如果不是 None,将显式地设置该线程是否为守护模式。 如果是 None (默认值),线程将继承当前线程的守护模式属性。3.3 版及以上才具有该属性。

    一定要在调用 start() 前设置好,不然会抛出 RuntimeError 。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False。

    当没有存活的非守护线程时,整个Python程序才会退出

  7. 如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__())。

线程函数

start()
开始线程活动。

它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。

如果同一个线程对象中调用这个方法的次数大于一次,会抛出 RuntimeError 。

join(timeout=None)
        等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。

当 timeout 参数存在而且不是 None 时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为 join() 总是返回 None ,所以你一定要在 join() 后调用 is_alive() 才能判断是否发生超时 -- 如果线程仍然存活,则 join() 超时。

当 timeout 参数不存在或者是 None ,这个操作会阻塞直到线程终结。

一个线程可以被 join() 很多次。

如果尝试加入当前线程会导致死锁, join() 会引起 RuntimeError 异常。如果尝试 join() 一个尚未开始的线程,也会抛出相同的异常。

创建线程

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('3s')

if __name__ == '__main__':
    th = threading.Thread(target=run,name="thread_1" args=("thread 1",), daemon=True)

    # 把子进程设置为守护线程,必须在start()之前设置
    th.setDaemon(True)
    th.start()

   # 设置主线程等待子线程结束
    th.join()
    print("end")

线程加锁

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,
    所以出现了线程锁,即同一时刻允许一个线程执行操作。线程锁用于锁定资源,可以定义多个锁,像下面的代码,当需要独占某一个资源时,任何一个锁都可以锁定这个资源,就好比你用不同的锁都可以把这个相同的门锁住一样。
        由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,我们因此也称为“线程不安全”。
        为了防止上面情况的发生,就出现了锁。

锁函数

1.acquire(blocking=True, timeout=-1)

        可以阻塞或非阻塞地获得锁。

        当调用时参数 blocking 设置为 True (缺省值),阻塞直到锁被释放,然后将锁锁定并返回 True 。

        在参数 blocking 被设置为 False 的情况下调用,将不会发生阻塞。如果调用时 blocking 设为 True 会阻塞,并立即返回 False ;否则,将锁锁定并返回 True。

        当浮点型 timeout 参数被设置为正值调用时,只要无法获得锁,将最多阻塞 timeout 设定的秒数。timeout 参数被设置为 -1 时将无限等待。当 blocking 为 false 时,timeout 指定的值将被忽略。

        如果成功获得锁,则返回 True,否则返回 False (例如发生超时的时候)。

2.release()

        释放一个锁。这个方法可以在任何线程中调用,不单指获得锁的线程。

        当锁被锁定,将它重置为未锁定,并返回。

        如果其他线程正在等待这个锁解锁而被阻塞,只允许其中一个允许。

        在未锁定的锁调用时,会引发 RuntimeError 异常。

        没有返回值。

互斥锁 threading.Lock

原始锁对象的类。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。

from threading import Thread,Lock
import os,time


def work():

    # 共享全局变量n
    global n

    # 申请锁
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1

    # 释放锁
    lock.release()


if __name__ == '__main__':

    # 实例化锁的对象
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

 递归锁 threading.RLock

支持嵌套,在多个锁没有释放的时候一般会使用RLcok类。

acquire()/release() 对可以嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其他线程继续处理 acquire() 阻塞。

import threading
import time

def Func(lock):
    global gl_num

    # 申请锁
    lock.acquire()
    gl_num += 1
    time.sleep(1)
    print(gl_num)

    # 释放锁
    lock.release()

if __name__ == '__main__':
    gl_num = 0

   # 实例化对象
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=Func, args=(lock,))
        t.start()

信号量 threading.Semaphore(value=1)

互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading
import time

def run(n, semaphore):

    # 加锁
    semaphore.acquire()
    time.sleep(1)
    print("run the thread:%%s " %% n)

    # 释放
    semaphore.release()  

if __name__ == '__main__':
    num = 0

    #  最多允许5个线程同时运行
    semaphore = threading.BoundedSemaphore(5)
    for i in range(22):
        t = threading.Thread(target=run, args=("t-%%s" %% i, semaphore))
        t.start()
    while threading.active_count() != 1:
        pass  # print threading.active_count()
    else:
        print('-----all threads done-----')

条件对象 threading.Condition(lock=None)

实现条件变量对象的类。一个条件变量对象允许一个或多个线程在被其它线程所通知之前进行等待。

如果给出了非 None 的 lock 参数,则它必须为 Lock 或者 RLock 对象,并且它将被用作底层锁。否则,将会创建新的 RLock 对象,并将其用作底层锁。

acquire(*args)
请求底层锁。此方法调用底层锁的相应方法,返回值是底层锁相应方法的返回值。

release()
释放底层锁。此方法调用底层锁的相应方法。没有返回值。

wait(timeout=None)
等待直到被通知或发生超时。如果线程在调用此方法时没有获得锁,将会引发 RuntimeError 异常。

这个方法释放底层锁,然后阻塞,直到在另外一个线程中调用同一个条件变量的 notify() 或 notify_all() 唤醒它,或者直到可选的超时发生。一旦被唤醒或者超时,它重新获得锁并返回。

当提供了 timeout 参数且不是 None 时,它应该是一个浮点数,代表操作的超时时间,以秒为单位(可以为小数)。

当底层锁是个 RLock ,不会使用它的 release() 方法释放锁,因为当它被递归多次获取时,实际上可能无法解锁。相反,使用了 RLock 类的内部接口,即使多次递归获取它也能解锁它。 然后,在重新获取锁时,使用另一个内部接口来恢复递归级别。

返回 True ,除非提供的 timeout 过期,这种情况下返回 False。

wait_for(predicate, timeout=None)
等待,直到条件计算为真。 predicate 应该是一个可调用对象而且它的返回值可被解释为一个布尔值。可以提供 timeout 参数给出最大等待时间。

这个实用方法会重复地调用 wait() 直到满足判断式或者发生超时。返回值是判断式最后一个返回值,而且如果方法发生超时会返回 False 。

忽略超时功能,调用此方法大致相当于编写:

while not predicate():
    cv.wait()
因此,规则同样适用于 wait() :锁必须在被调用时保持获取,并在返回时重新获取。 随着锁定执行判断式。

notify(n=1)
默认唤醒一个等待这个条件的线程。如果调用线程在没有获得锁的情况下调用这个方法,会引发 RuntimeError 异常。

这个方法唤醒最多 n 个正在等待这个条件变量的线程;如果没有线程在等待,这是一个空操作。

当前实现中,如果至少有 n 个线程正在等待,准确唤醒 n 个线程。但是依赖这个行为并不安全。未来,优化的实现有时会唤醒超过 n 个线程。

注意:被唤醒的线程实际上不会返回它调用的 wait() ,直到它可以重新获得锁。因为 notify() 不会释放锁,只有它的调用者应该这样做。

notify_all()
唤醒所有正在等待这个条件的线程。这个方法行为与 notify() 相似,但并不只唤醒单一线程,而是唤醒所有等待线程。如果调用线程在调用这个方法时没有获得锁,会引发 RuntimeError 异常。

事件对象 threading.Event


实现事件对象的类。事件对象管理一个内部标志,调用 set() 方法可将其设置为true。调用 clear() 方法可将其设置为false。调用 wait() 方法将进入阻塞直到标志为true。这个标志初始时为false。

说明

is_set()
当且仅当内部旗标为时返回 True。

set()
将内部标志设置为true。所有正在等待这个事件的线程将被唤醒。当标志为true时,调用 wait() 方法的线程不会被被阻塞。

clear()
将内部标志设置为false。之后调用 wait() 方法的线程将会被阻塞,直到调用 set() 方法将内部标志再次设置为true。

wait(timeout=None)
阻塞线程直到内部变量为true。如果调用时内部标志为true,将立即返回。否则将阻塞线程,直到调用 set() 方法将标志设置为true或者发生可选的超时。

当提供了timeout参数且不是 None 时,它应该是一个浮点数,代表操作的超时时间,以秒为单位(可以为小数)。

当且仅当内部旗标在等待调用之前或者等待开始之后被设为真值时此方法将返回 True,也就是说,它将总是返回 True 除非设定了超时且操作发生了超时。

​示例

#利用Event类模拟红绿灯
import threading
import time

event = threading.Event()


def lighter():
    count = 0

    # 初始值为绿灯
    event.set()
    while True:
        if 5 < count <=10 :

             # 红灯,清除标志位
            event.clear()
            print("33[41;1mred light is on...33[0m")
        elif count > 10:

            # 绿灯,设置标志位
            event.set()
            count = 0
        else:
            print("33[42;1mgreen light is on...33[0m")

        time.sleep(1)
        count += 1

def car(name):
    while True:

        # 判断是否设置了标志位
        if event.is_set():
            print("[%%s] running..."%%name)
            time.sleep(1)
        else:
            print("[%%s] sees red light,waiting..."%%name)
            event.wait()
            print("[%%s] green light is on,start going..."%%name)

light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car,args=("MINI",))
car.start()

 

 

推荐阅读