消息传递
消息传递
消息传递是并发模型里面大家比较推崇的模式,不仅仅是因为使用起来比较简单,关键还在于它可以减少数据竞争,提高并发效率,为此值得深入学习。
通道(Channel)
channel
std::sync::mpsc
。通道的两端分别是发送者Sender
Receiver
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个通道
let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) =
mpsc::channel();
// 创建线程用于发送消息
thread::spawn(move || {
// 发送一个消息,此处是数字id
tx.send(1).unwrap();
});
// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}
程序说明参见代码中的注释,程序执行结果为:
receive 1
结果表明 main
所在的主线程接收到了新建线程发送的消息,用Rust
的 Channel
的几个问题:
- 通道能保证消息的顺序吗?是否先发送的消息,先接收?
- 通道能缓存消息吗?如果能的话能缓存多少?
- 通道的发送者和接收者支持
N:1 ,1:N,N:M 模式吗? - 通道能发送任何数据吗?
- 发送后的数据,在线程中继续使用没有问题吗?
消息类型
上面的例子中,我们传递的消息类型为
use std::fmt;
use std::sync::mpsc;
use std::thread;
use std::rc::Rc;
pub struct Student {
id: u32
}
impl fmt::Display for Student {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "student {}", self.id)
}
}
fn main() {
// 创建一个通道
let (tx, rx): (mpsc::Sender<Rc<Student>>, mpsc::Receiver<Rc<Student>>) =
mpsc::channel();
// 创建线程用于发送消息
thread::spawn(move || {
// 发送一个消息,此处是数字id
tx.send(Rc::new(Student{
id: 1,
})).unwrap();
});
// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}
error: the trait `core::marker::Send` is not
implemented for the type `alloc::rc::Rc<Student>` [E0277]
note: `alloc::rc::Rc<Student>` cannot be sent between threads safely
并不是所有类型的消息都可以通过通道发送,消息类型必须实现
不过由于
对于不是Send
的情况(!Send
- 原始指针,包括
*mut T
和*const T
,因为不同线程通过指针都可以访问数据,从而可能引发线程安全问题。 Rc
和Weak
也不是,因为引用计数会被共享,但是并没有做并发控制。
虽然有这些!Send
的情况,但是逃不过编译器的火眼金睛,只要你错误地使用了消息类型,编译器都会给出类似于上面的错误提示。我们要担心的不是这些,因为错误更容易出现在新创建的自定义类,有下面两点需要注意:
- 如果自定义类的所有字段都是
Send
,那么这个自定义类也是Send
。反之,如果有一个字段是!Send
,那么这个自定义类也是!Send
。如果类的字段存在递归包含的情况,按照该原则以此类推来推论类是Send
还是!Send
。 - 在为一个自定义类实现
Send
或者!Send
时,必须确保符合它的约定。
异步通道(Channel)
use std::sync::mpsc;
use std::thread;
// 线程数量
const THREAD_COUNT :i32 = 2;
fn main() {
// 创建一个通道
let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();
// 创建线程用于发送消息
for id in 0..THREAD_COUNT {
// 注意Sender是可以clone的,这样就可以支持多个发送者
let thread_tx = tx.clone();
thread::spawn(move || {
// 发送一个消息,此处是数字id
thread_tx.send(id + 1).unwrap();
println!("send {}", id + 1);
});
}
thread::sleep_ms(2000);
println!("wake up");
// 在主线程中接收子线程发送的消息并输出
for _ in 0..THREAD_COUNT {
println!("receive {}", rx.recv().unwrap());
}
}
send 1
send 2
wake up
receive 1
receive 2
在代码中,我们故意让main
所在的主线程睡眠
clone
的方式来实现。这类似于Rc
的共享机制。其实从Channel
所在的库名std::sync::mpsc
也可以知道这点。因为mpsc
就是多生产者单消费者
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个通道
let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();
// 创建线程用于发送消息
let new_thread = thread::spawn(move || {
// 发送无穷多个消息
let mut i = 0;
loop {
i = i + 1;
// add code here
println!("send {}", i);
match tx.send(i) {
Ok(_) => (),
Err(e) => {
println!("send error: {}, count: {}", e, i);
return;
},
}
}
});
// 在主线程中接收子线程发送的消息并输出
new_thread.join().unwrap();
println!("receive {}", rx.recv().unwrap());
}
最后的结果就是耗费内存为止。
上面介绍的内容大多是关于发送者和通道的,下面开始考察一下接收端。通过上面的几个例子,细心一点的可能已经发现接收者的recv
方法应该会阻塞当前线程,如果不阻塞,在多线程的情况下,发送的消息就不可能接收完全。所以没有发送者发送消息,那么接收者将会一直等待,这一点要谨记。在某些场景下,一直等待是符合实际需求的。但某些情况下并不需一直等待,那么就可以考虑释放通道,只要通道释放了,recv
方法就会立即返回。
同步通道
同步通道在使用上同异步通道一样,接收端也是一样的,唯一的区别在于发送端,我们先来看下面的例子:
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个同步通道
let (tx, rx): (mpsc::SyncSender<i32>, mpsc::Receiver<i32>) = mpsc::sync_channel(0);
// 创建线程用于发送消息
let new_thread = thread::spawn(move || {
// 发送一个消息,此处是数字id
println!("before send");
tx.send(1).unwrap();
println!("after send");
});
println!("before sleep");
thread::sleep_ms(5000);
println!("after sleep");
// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
new_thread.join().unwrap();
}
运行结果:
before sleep
before send
after sleep
receive 1
after send
除了多了一些输出代码之外,上面这段代码几乎和前面的异步通道的没有什么区别,唯一不同的在于创建同步通道的那行代码。同步通道是sync_channel
,对应的发送者也变成了SyncSender
。为了显示出同步通道的区别,故意添加了一些打印。和异步通道相比,存在两点不同:
- 同步通道是需要指定缓存的消息个数的,但需要注意的是,最小可以是
0 ,表示没有缓存。 - 发送者是会被阻塞的。当通道的缓存队列不能再缓存消息时,发送者发送消息时,就会被阻塞。
对照上面两点和运行结果来分析,由于主线程在接收消息前先睡眠了,从而子线程这个时候会被调度执行发送消息,由于通道能缓存的消息为tx.send(1).unwrap()
就会阻塞子线程,直到主线程接收消息,即执行println!("receive {}", rx.recv().unwrap());
。运行结果印证了这点,要是没阻塞,那么在before send
之后就应该是after send
了。
相比较而言,异步通道更没有责任感一些,因为消息发送者一股脑的只管发送,不管接收者是否能快速处理。这样就可能出现通道里面缓存大量的消息得不到处理,从而占用大量的内存,最终导致内存耗尽。而同步通道则能避免这种问题,把接受者的压力能传递到发送者,从而一直传递下去。