0%

Windows原理与应用(三)——线程间通信与同步

Windows原理与应用(三)——线程间通信与同步

线程

  • 进程是计算机分配资源的单位,线程是运行调度单位
  • 进程中的线程也具有线程控制块,包含内容有所属进程ID,创建和退出时间,线程启动地址等

线程创建过程

  1. 在进程的地址空间中为线程创建用户态堆栈。
  2. 初始化线程硬件上下文。
  3. 创建线程对象。
  4. 通知内核系统为线程运行准备。
  5. 新创建线程handle和线程ID值返回到调用者。
  6. 线程进入调度准备执行。

线程的生命周期

启动→执行→暂停→终止

工作线程的结束

  • 线程正常结束
    • 自动消亡,OS清理
  • 线程非正常结束,被KILL
    • OS无法控制,造成系统损失或破坏
  • 控制线程正常终止的方法
    • 低级事件对象ManualResetEvent

线程非正常结束的后果

  • 内存无法回收 - 内存泄漏
  • 文件缓冲没写入 - 文件被破坏
  • 文件句柄未回收 - 被占用
  • 共享资源的占用(网络端口,管道,DLL)

线程的创建与启动 - C#

多线程与线程跨域访问

Windows中每个进程都至少有一个线程,称为主线程,对于通常的窗体应用,主线程与窗体界面绑定
如果有耗时很长的处理函数在主线程中运行,会导致界面停滞,失去响应,所以需要把工作交给工作线程执行,执行结束后将结果通知给界面线程

处于安全性考虑,不同线程之间不可以相互调用对方的资源,即不允许直接跨域访问,需要使用委托回调的方式调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Work
{
// 回调委托声明
public delegate void update_listbox(string new_info, string listname);
update_listbox callback; // 一个具体的委托
string list_name;
public Work(update_listbox callback, string list_name)
{
this.callback = callback; // 具体委托绑定另一个线程中的函数
this.list_name = list_name;
}
}

// 主窗体函数
private void update_output(string new_info, string list_name)
{
// 同步方法
Dispatcher.Invoke(() =>
{
// 主窗体用Invoke同步线程更新控件
ListBox lb = FindName(list_name) as ListBox;
ListBoxItem li = new ListBoxItem();
li.Content = new_info;
lb.Items.Add(li);
});

// 异步方法
//Action actiondelegate = () =>
//{
// try
// {
// ListBox lb = FindName(list_name) as ListBox;
// //update_dele test = update_work_listbox;
// update_work_listbox(new_info, lb);
// }
// catch (Exception e)
// {
// MessageBox.Show("发生错误:" + e.Message, "错误", MessageBoxButton.OK, MessageBoxImage.Error);
// }
//};
//Dispatcher.BeginInvoke(actiondelegate);
// // 主窗体用BeginInvoke异步线程更新控件
// }

// 传入函数
Work wk = new Work(update_output, "listname");

异步线程

Thread

  1. 不含参数的构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
using System.Threading;

// 线程执行函数
void workThread(){
// to do
}

// 设定函数名为线程入口
ThreadStart s = new ThreadStart(workThread);

// 线程委托对象
Thread thread1 = new Thread(s);
thread.Name = "设定一个名称"; // 可以设定名称
thread.IsBackground = true; // 可以设定是否为后台
// 启动线程
thread1.Start();
// 启动含有参数的线程
thread1.Start(paraObject);
1
2
3
4
5
6
7
8
9
10
// 也可以使用匿名委托或lambda表达式
//通过Lambda表达式创建
Thread thread1 = new Thread(() => Console.WriteLine("我是通过Lambda表达式创建的委托"));
thread1.Start();

//通过匿名委托创建
Thread thread2 = new Thread(delegate() { Console.WriteLine("我是通过匿名委托创建的线程"); });
thread2.Start();


  1. 含参数的构造
1
2
3
4
5
6
7
8
9
10
11
// 含有参数的Thread
//通过ParameterizedThreadStart创建线程
Thread thread = new Thread(new ParameterizedThreadStart(Thread1));
//给方法传值
thread.Start("这是一个有参数的委托");


// 创建有参的方法,方法里面的参数类型必须是Object类型
void Thread1(object obj){
Console.WriteLine(obj);
}
  • Thread默认只有这两种构造方式,如果要传入多个参数,需要用类来实现

操作Thread的常用函数

方法名称 说明
Abort() 终止本线程
GetDomain() 返回当前线程正在其中运行的当前域
GetDomainId() 返回当前线程正在其中运行的当前域Id
Interrupt() 中断处于Wait Sleep Join线程状态的线程
Join() 阻塞调用线程,直到某个线程终止时为止
Resume() 继续运行已挂起的线程
Start() 执行本线程
Suspend() 挂起当前线程,如果当前线程已属于挂起状态则此不起作用
Sleep() 把正在运行的线程挂起一段时间

Thread的常用属性

属性名 说明
IsAlive 获取一个值,表示当前线程的执行状态
Name 获取或设置Thread的名称
ThreadState 获取一个值,表示当前线程的状态
IsBackground 获取一个值,指示当前线程是否为后台线程

Task

  • 无返回值的Task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
using System.Threading.Tasks;

//通过Lambda表达式创建
Task t = new Task(() =>
{
// to do
});
t.Start();

// 传入函数
void yourTaskFun(){
// to do
}
Task t2 = new Task(yourTaskFun);
t.Start();
  • 含返回值的Task
1
2
3
4
5
6
// 用Task<TResult>
Task<int> t = new Task<int>(() => {
return 114; // 返回int类型
});
t.Start();
// 可以用t.Result获得返回值

前台线程与后台线程

  • 前台线程
    • 所有前台线程都结束,应用程序才可以结束;默认创建的线程都是前台线程
  • 后台线程
    • 只要所有前台线程结束,后台线程自动结束
    • 通过Thread.IsBackground设置后台线程
    • 必须在Start之前设置后台线程

线程同步

互斥量

  • mutex
1
2
3
4
5
6
Mutex mut_operate = new Mutex(); // mutex信号量

// 使用
mut_operate.WaitOne();// 申请
// 互斥访问...
mut_operate.ReleaseMutex();// 释放
  • Semaphore
1
2
3
4
5
6
7
Semaphore semaphore = new Semaphore(usable_num , max_num + 1); // (初始值, 最大并发)

// 使用类似
semaphore.WaitOne();// 申请
// 互斥访问...
semaphore.Release();// 释放
// semaphore.Release(2); // 可以指定释放次数

ManualResetEvent与AutoResetEvent

  • ManualResetEvent
1
2
3
4
5
6
7
stop_signal = new ManualResetEvent(false); // 初始为false

// 等待1毫秒,如果在等待时间内信号触发则立即退出等待
// 这里相当于一直在while(!false),直到stop_signal为true
while (!stop_signal.WaitOne(1)){
// to do
}
1
2
3
4
5
// 设置stop_signal为允许线程运行,这会使得waitone为true
stop_signal.Set();

// 由于是手动信号,所以如果需要重置要手动重置
stop_signal.Reset();
  • AutoResetEvent

与ManualResetEvent效果相同,只不过WaitOne()有效并被读取一次后会立即自动置为False,不需要手动Reset,这意味着它一次只能通知一个线程

线程缺点

  • 线程上下文信息消耗计算机资源

  • 线程上下文切换过程,线程会带来资源特殊要求和潜在冲突。如果线程过多,系统管理线程的负担会加大,则其中大多数线程都不会产生明显的进度

  • 线程控制代码非常复杂,并可能产生许多bug

  • 线程的非正常终结会造成资源浪费影响系统的运行性能

信号量实例——生产者消费者协同

  • 生产一个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void produce_one()
{
string msg;
while (!stop_signal.WaitOne(1))
{

mut_operate.WaitOne();
if (usable_num == max_num)
{
mut_operate.ReleaseMutex(); // 资源已满,释放互斥区
continue; // 重新排队
}
mut_operate.ReleaseMutex();
msg = Thread.CurrentThread.Name + "即将开始工作!\n\t当前资源总量为:" + usable_num + ",资源上限为:" + max_num + "\n\t正在工作...";
callback(msg, listname); // callback是上文提到的线程跨域访问的回调,用于在窗体上显示信息,下文的callback同理
Random rand = new Random();
Thread.Sleep(rand.Next(0, Produce_time_upper_bound)); // 模拟工作时间

while (true)
{
mut_operate.WaitOne();
if (usable_num == max_num)
{
mut_operate.ReleaseMutex();// 满了则释放,重新申请
continue;
}
else
{
break;
}
}
usable_num++; // 不满则放入货架

msg = Thread.CurrentThread.Name + "生产完成,当前资源总量为:" + usable_num + "\n";
callback(msg, listname);
callback_2.Invoke(usable_num);
semaphore.Release(); // 信号量新增一个货物
mut_operate.ReleaseMutex(); // 允许修改访问货架

}
msg = Thread.CurrentThread.Name + "结束";
callback(msg, listname);
}
  • 消费一个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void consume_one()
{
string msg;
while (!stop_signal.WaitOne(1))
{
semaphore.WaitOne(); // 向货架申请一个资源

msg = Thread.CurrentThread.Name + "即将开始工作!\n\t当前资源总量为:" + usable_num + ",资源上限为:" + max_num + "\n\t正在工作...";
callback(msg, listname);
Random rand = new Random();
Thread.Sleep(rand.Next(0, Consume_time_upper_bound));// 模拟消费
mut_operate.WaitOne();// 修改货架数据
usable_num--;
mut_operate.ReleaseMutex();// 释放货架
msg = Thread.CurrentThread.Name + "消费完成,当前资源总量为:" + usable_num + "\n";
callback(msg, listname);
callback_2.Invoke(usable_num);
}
msg = Thread.CurrentThread.Name + "结束";
callback(msg, listname);
}
  • 初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void start_simulation()
{
stop_signal = new ManualResetEvent(false);

int i = 0;
while (i < producer_num)
{
Thread thread = new Thread(new ThreadStart(produce_one));
thread.Name = "生产者" + i + "号";
thread.IsBackground = true;
thread.Start();
i++;
}
i = 0;
while (i < consumer_num)
{
Thread thread = new Thread(new ThreadStart(consume_one));
thread.Name = "消费者" + i + "号";
thread.IsBackground = true;
thread.Start();
i++;
}
}
  • 信号量初始化
1
2
3
4
5
6
7
semaphore = new Semaphore(usable_num , max_num); 
// usable_num : 初始的资源数量
// max_num : 最大可以存放的资源数量

// 这里的max_num并不能起到限制所用
// 如果semaphore超过max_num会直接抛出异常
// 所以可以稍微设大一些,比如max_num+1