0%

Python简单多线程

线程池使用

1. 基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
import concurrent.futures


def func():
print("thread start!")


# 1. 导入设定线程池大小
thread_num = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as pool:
# 用with控制,可以不用显示调用pool.shutdown(wait=True)函数
# 2. 调用submit函数向线程池提交任务,当线程池有空余时,线程池会自动启动线程,这个过程不需要人工控制
pool.submit(func)

2. 获取线程工作状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import concurrent.futures
import time


def func():
print("thread start!")
time.sleep(5)



# 1. 导入设定线程池大小
thread_num = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as pool:
# 用with控制,可以不用显示调用pool.shutdown(wait=True)函数
# 2. 调用submit函数向线程池提交任务,当线程池有空余时,线程池会自动启动线程,这个过程不需要人工控制
# 2.1. 同时用一个变量接收返回值
future = pool.submit(func)

# 3. 调用done()判断线程是否结束
# 注意done()不会阻塞线程, 如果done()为false则会直接跳过
if future.done():
print("thread done!")
else:
print("thread not done!")

3. 获取线程返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import concurrent.futures
import time


def func():
print("thread start!")
time.sleep(5)
return "func return!"


# 1. 导入设定线程池大小
thread_num = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as pool:
# 用with控制,可以不用显示调用pool.shutdown(wait=True)函数
# 2. 调用submit函数向线程池提交任务,当线程池有空余时,线程池会自动启动线程,这个过程不需要人工控制
# 2.1. 同时用一个变量接收返回值
future = pool.submit(func)

# 3. 调用result()获取线程返回值
# 注意result()会阻塞线程,一直等待直到获得返回值
print(future.result())
'''
thread start!
func return!
'''

4. 向线程传递参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import concurrent.futures
import time


def func(thread_idx, sleep_secs):
print("thread_idx = %d" % thread_idx)
time.sleep(sleep_secs)
return "thread_%d sleep_secs:%d return!" % (thread_idx, sleep_secs)


# 1. 导入设定线程池大小
thread_num = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as pool:
# 用with控制,可以不用显示调用pool.shutdown(wait=True)函数
# 2. 调用submit函数向线程池提交任务,当线程池有空余时,线程池会自动启动线程,这个过程不需要人工控制
# 2.1. 同时用一个变量接收返回值
# 2.2. 目标函数后面可以直接按顺序接多个参数
future1 = pool.submit(func, 1, 4)

# 2.3. 也可以这样
params = (2, 3)
future2 = pool.submit(func, *params)

# 2.4. 也可以用map指定参数名
params_map = {
"sleep_secs" : 2,
"thread_idx" : 3
}
future3 = pool.submit(func, **params_map)

# 3. 调用result()获取返回值
print(future1.result())

print(future2.result())

print(future3.result())
'''
thread_idx = 1
thread_idx = 2
thread_idx = 3
thread_1 sleep_secs:4 return!
thread_2 sleep_secs:3 return!
thread_3 sleep_secs:2 return!
'''

5. 无阻塞获得返回值

在第4节中会发现 ,当调用future1.result()时,如果future1对应的线程还没有结束,那么就会阻塞在这里等待future1结束,那么即使future2先结束了,也不能提前得到主线程的处理。

接下来介绍「只要有线程执行完毕就立即获得响应」的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import concurrent.futures
import time


def func(thread_idx, sleep_secs):
print("thread_idx = %d" % thread_idx)
time.sleep(sleep_secs)
return "func return sleep_secs = %d" % sleep_secs


# 1. 导入设定线程池大小
thread_num = 3
future_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as pool:
# 用with控制,可以不用显示调用pool.shutdown(wait=True)函数
# 2. 调用submit函数向线程池提交任务,当线程池有空余时,线程池会自动启动线程,这个过程不需要人工控制
# 2.1. 同时用一个变量接收返回值

for sleep_sec in range(5):
# 不同的线程给到不同的睡眠时间
# 2.2. 传入参数
params_map = {
"sleep_secs" : 5 - sleep_sec,
"thread_idx" : sleep_sec
}
future_list.append(pool.submit(func, **params_map))
# 3. 使用concurrent.futures.as_completed()生成器
for future in concurrent.futures.as_completed(future_list):
# as_completed()返回一个generator,当给它的future list中有线程完成时会向其中添加一项,这样不会阻塞,哪个线程先完成就会先被调用,没有特定顺序
print(future.result())
# * 打印结果不一定与下面的一致 *
'''
thread_idx = 0
thread_idx = 1
thread_idx = 2
thread_idx = 3
func return sleep_secs = 3
thread_idx = 4
func return sleep_secs = 4
func return sleep_secs = 5
func return sleep_secs = 1
func return sleep_secs = 2
'''

7. 异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import concurrent.futures
import time


def func():
print("thread start!")
time.sleep(3)
raise(Exception("manual error")) # 手动抛出异常
return "func return!"


# 1. 导入设定线程池大小
thread_num = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as pool:
# 用with控制,可以不用显示调用pool.shutdown(wait=True)函数
# 2. 调用submit函数向线程池提交任务,当线程池有空余时,线程池会自动启动线程,这个过程不需要人工控制
# 2.1. 同时用一个变量接收返回值
future = pool.submit(func)

# 3. 调用exception()获取异常, 注意exception()和result()一样,都会阻塞主线程
print(future.exception())
'''
thread start!
manual error
'''

# 4. 如果调用result()则会直接抛出异常
print(future.result())

多线程 + 生成器获取返回值

1. 生成器Generator

当一个函数中出现yield时,它就会变成一个Generator,一般用for循环遍历,也可以手动调用它的.next()函数。

Generator的特性是:遇到yield,则返回yield表达式中的内容,同时暂停运行,直到下一次调用.next()时再从yield的下一条语句继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def counter(target):
for i in range(target):
yield i

counter_generator = counter(10)
for item in counter_generator:
print(item)
'''
0
1
2
3
4
5
6
7
8
9
'''

2. 使用Generator获取线程返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import concurrent.futures
import time


def func(num, sleep_secs):
# 输入num, 返回num * 2
time.sleep(sleep_secs)
return num * 2

def run_async():
thread_num = 3
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as pool:
future_list = []
for i in range(10):
# 总共10个线程
params_map = {
"sleep_secs" : 1,
"num" : i
}
future_list.append(pool.submit(func, **params_map))
for future in concurrent.futures.as_completed(future_list):
# as_completed()返回一个generator,当给它的future list中有线程完成时会向其中添加一项,这样不会阻塞,哪个线程先完成就会先被调用,没有特定顺序
yield future.result()

results = run_async()
for r in results:
print(r)
'''
2
4
0
10
8
6
12
14
16
18
'''