Python多线程、多进程

知识点

多线程

  • 备注:

    1. 多线程有两种应用场景,第一种是直接调用Thread()的方法,适用于实现过程不复杂的情景。第二种通过继承Thread()类,然后重写run()方法来实现,适用于实现过程复杂的情景
    2. 多线程是共享全局变量的
  • 第一种方法:适用于单一方法实现功能的情况

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import threading
    import time


    def say(message):
    for i in range(5):
    print("第{0}次说话,说话内容:{1}".format(i, message))
    time.sleep(1)


    def dance():
    for i in range(5):
    print("第{0}次跳舞".format(i))
    time.sleep(1)


    if __name__ == '__main__':
    # 线程的执行是没有顺序的,由系统决定
    t1 = threading.Thread(target=say, args = (message,)) # args传递的是函数运行所需要的变量值
    t2 = threading.Thread(target=dance) # 此处只是创建一个子线程对象,仍然属于主线程中
    t1.start() # 运行了start以后才会创建子线程,并让这个子线程运行起来
    t2.start()

    while True:
    print(threading.enumerate()) # 利用enumerate方法可以直接打印出列表、元组等元素的元组格式数据
    if len(threading.enumerate()) <= 1:
    break
  • 第二种方法:适用于多方法实现功能的情况

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import threading
    import time


    class Thread_Test(threading.Thread):
    # 通过继承类,则一定要重写run方法才可以
    def run(self):
    for i in range(5):
    self.talk() # 该处的talk和speak方法会同步执行
    self.speak()
    time.sleep(1)

    def talk(self):
    print("i talk le ")

    def speak(self):
    print("i speak le ")


    if __name__ == '__main__':
    test = Thread_Test()
    # 多线程本质上是调用Thread类中的run方法,所以此处使用start方法
    test.start()
  • 多线程问题1:造成资源竞争(对同一个数据进行写入,导致数据不准确),

    解决方案:当某个线程需要更改共享数据时,先将共享的资源进行锁定,此时其它线程不能更改,直到该线程释放资源,将资源的状态变为非锁定,其它线程才可以再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的安全性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import threading
    import time

    NUM = 100
    mutex = threading.Lock() # 创建互斥锁,初始未锁定状态

    def say(message):
    globe NUM # 不可变类型的全局变量使用前需要先声明
    for i in range(message):
    # 锁定数据的要求:锁定的代码越少越好,
    mutex.acquire() # 开始锁定数据
    NUM += i
    mutex.release() # 释放互斥锁

    def dance(message)
    globe NUM
    for i in range(message):
    mutex.acquire() # 开始锁定数据
    NUM += i
    mutex.release() # 释放互斥锁

    if __name__ == '__main__':
    # 线程的执行是没有顺序的,由系统决定
    t1 = threading.Thread(target=say, args = (message,)) # args传递的是函数运行所需要的变量值
    t2 = threading.Thread(target=dance) # 此处只是创建一个子线程对象,仍然属于主线程中
    t1.start() # 运行了start以后才会创建子线程,并让这个子线程运行起来
    t2.start()
  • 多线程问题2:死锁(线程之间共享多个资源,若两个线程之间分别占有一部分资源并且同时等待对方的资源,就会造成死锁)

    解决方案1:优化设计算法(银行家算法)

    解决方案2:添加超时时间

多进程

  • 备注

    1. 可执行文件未运行前叫程序,运行起来以后叫进程。程序是没有资源的(内存、硬件调用),进程是有资源的(内存、软硬件等),进程是程序运行以后代码与所占用资源的综合
  1. 创建子进程以后,主进程和子进程的代码、资源都是独立且完全一样的
  • 通过Process()类创建子进程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import multiprocessing
    from time import sleep


    def test1(num):
    for i in range(100):
    print("test1: {0}, talk: {1}".format(i, num))
    sleep(1)


    def test2():
    for i in range(100):
    print("test2: {0}".format(i))
    sleep(1)


    def main():
    # 多进程的实现原理与多线程类似,只是多进程之间的资源都是独立的
    p1 = multiprocessing.Process(target=test1, args=(9,))
    p2 = multiprocessing.Process(target=test2)
    p1.start()
    p2.start()


    if __name__ == '__main__':
    main()
  • 多进程问题:资源不共享导致多进程之间无法进行通信

    解决方案1:通过本地存储的文件进行交互

    解决方案2:通过multiprocess中Queue()类(队列)实现数据的交互。queue实现原理是在内存中开辟一部分空间,将需要交互的数据存储在内存中,当其它进程需要交互数据时就从内存中queue开辟的空间中读取,谁先存入,在读取时就优先读取出来。queue的存在使得多进程中耦合度降低(解耦,耦合度越高,程序越不好,反之更好)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import multiprocessing
    from time import sleep


    def put_data(queue_list):
    list_data = [1, 2, 3] * 50
    for num in list_data:
    queue_list.put(num) # 向队列中追加元素,当队列元素满载时,此时会堵塞在此,直到队列中可以给put
    print("put_data: 上传元素:{0}".format(num))


    def get_data(queue_list):
    result = 0
    while True:
    # empty(): 判断队列中是否是空队列,空队列返回True
    # full():判断队列是否已经满了,满队列返回True
    if not queue_list.empty():
    sleep(1)
    num = queue_list.get() # 从队列提取元素,当队列元素为空时,此时会堵塞在此,直到队列中可以提取到元素
    # num = queue_list.get_nowait() # 即时提取队列元素,不等待,若提取不到元素则报错
    print("get-data: 上传元素:{0}".format(num))
    result += num
    else:
    break
    print("所有元素的和为{0}".format(result))


    def main():
    # 主线程中创建队列,然后将队列作为参数传入到子进程中,若不填队列数,则系统会根据内存自动分配
    queue_list = multiprocessing.Queue(3) # 3表示队列中元素最多为3个,满载时无法put,队列为空时无法get
    p1 = multiprocessing.Process(target=put_data, args=(queue_list,))
    p2 = multiprocessing.Process(target=get_data, args=(queue_list,))
    p1.start()
    p2.start()


    if __name__ == '__main__':
    main()

进程池

  • 背景

    1. 当需要创建的子进程数量不多时,可以直接利用multiprocessing中Process动态创建,但如果是上百个甚至是上千个目标(进程创建和销毁需要较多的工作量和资源),此时就需要用multiprocessing模块中的Pool方法
  • 实现方式

    初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool时,如果池还没满,那么就会创建一个新的进程用于执行该请求;但如果进程池的进程数已经达到指定的最大数,那么该请求就会等待,直到进程池有进程结束,才会用之前的进程来执行新的任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import multiprocessing
    import time
    import os
    import random

    def run_back(msg):
    start_time = time.time()
    # os.getpid() 可以获取当前程序的进程号
    print("执行进程为{0}, 进程号为{1}".format(msg, os.getpid()))
    time.sleep(random.randint(1, 3))
    end_time = time.time()
    print("执行完毕,耗时:{0}".format(start_time - end_time))

    def main():
    # 此处定义一个进程池,和定义队列类似,数字3表示允许的进程池的最大数量为3
    po = multiprocessing.Pool(3)
    for i in range(100):
    # Pool().apply_async(调用的目标, (传递给目标的参数元组,))
    # 每次循环将会用空闲出来的进程(不会终止)去调用目标
    po.apply_async(run_back, (i,)) # 此处开始创建进程池中的进程,并自动调度进程池的进程
    print("----------start---------")
    po.close() # 关闭进程池,关闭后po不再接受新的请求
    po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
    print("----------end-----------")

    if __name__ == '__main__':
    main()
  • 注意点

    1. 通过进程池创建的子进程和主进程彼此独立开,当创建子进程的语句全部执行完毕后,主进程依旧继续从上到下执行,不会等待子进程执行完·毕。如果主进程比子进程先执行完毕,那么程序照样结束,导致逻辑不对,所以必须加入Pool().join(),用于堵塞主线程,等待子线程全部执行完毕
    2. 进程池中的子进程异常的时候,是不会将报错信息打印出来的

创建文件复制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os
import multiprocessing
from time import sleep
from random import randint


def copy_file(q, filename, old_floder, new_floder):
with open(old_floder + '/' + filename, 'rb') as f:
file_content = f.read()
with open(new_floder + '/' + filename, 'wb') as f:
f.write(file_content)
q.put(filename)
sleep(randint(5))


def main():
# 希望能够让各个子线程能够与主线程进行交互,以便于了解复制的进度,可以利用队列
# 利用的manager中queue方法
q = multiprocessing.Manager().Queue(3)
# 创建进程池
po = multiprocessing.Pool(5)
# 输入要复制的文件夹
old_floder = input("请输入需要复制的文件夹名称: ")
# 输入需要新建的文件夹
new_floder = input("请输入需要将文件复制到的文件夹名称:")
os.mkdir(new_floder)
# 获取指定文件夹下所有的内容 listdir
file_names = os.listdir(old_floder)
# 遍历所有的文件 遍历在主进程中执行
for file_name in file_names:
# 读取文件的内容,然后将内容复制到新的文件夹对应的文件内容中 在子进程中进行读写
po.apply_async(copy_file, (q, file_name, old_floder, new_floder))
# 停止向线程组中添加子线程
po.close()
# 等待子线程结束, 由于需要与子线程进行交互,故可用循环替代join()方法, 当循环次数达到要求时则退出循环
# po.join()
all_file_num = len(file_names)
copy_result = list() # 创建空列表时采用该方法
while len(copy_result) < all_file_num:
file_name = q.get()
copy_result.append(file_name)
# 转移字符中\r表示回到本行的行首, 同时将print默认的换行符置空,则可以达到当前行输出进度的效果
print("\r完成进度:%.2f%%" % (len(copy_result) * 100 / all_file_num), end="")


if __name__ == '__main__':
main()

作业

  1. 打印出当前线程中所有同时在线的线程信息

  2. 利用继承多线程类的方式,实现以下两个方法的多线程同步

    1
    2
    3
    4
    5
    def talk(self):
    print("i talk le ")

    def speak(self):
    print("i speak le ")
  3. (简答)线程是共享全局资源的,那么如何利用互斥锁保障多线程中的资源竞争现象

  4. 利用多进程和queue队列实现进程之间的数据交互,需求如下:

    • 向一个列表中追加数据的同时,也自动的从列表中读取数据,并打印出来
  5. (问答)请简述利用进程池实现多进程的步骤

  6. (问答)使用进程池时的注意事项

  7. (问答)当多进程出现异常时,是否会打印出错误信息