Appearance
5.多线程通信
一、多发送者,单接收者
rust
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个消息通道, 返回一个元组:(发送者,接收者)
let (tx, rx) = mpsc::channel();
// 创建线程,并发送消息
thread::spawn(move || {
// 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
tx.send(1).unwrap();
// 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
// tx.send(Some(1)).unwrap()
});
// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个消息通道, 返回一个元组:(发送者,接收者)
let (tx, rx) = mpsc::channel();
// 创建线程,并发送消息
thread::spawn(move || {
// 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
tx.send(1).unwrap();
// 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
// tx.send(Some(1)).unwrap()
});
// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}
- 接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭
- 需要使用move将tx的所有权转移到子线程的闭包中
二、单发送者,多接收者
可以直接对接收者 for 循环,这样就不需要一个个的rec了
rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
三、多发送者,多接收者
使用发送者的 clone
方法,就可以拷贝一个发送者
rust
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
});
thread::spawn(move || {
tx1.send(String::from("hi from cloned tx")).unwrap();
});
for received in rx {
println!("Got: {}", received);
}
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
});
thread::spawn(move || {
tx1.send(String::from("hi from cloned tx")).unwrap();
});
for received in rx {
println!("Got: {}", received);
}
}
四、不阻塞的 try_recv 方法
在下面的代码中,try_recv
并不会阻塞主线程的执行,只有当接收到消息才会打印值。增加一点延时,就可以正常接收到消息了,有点类似异步?
rust
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(1).unwrap();
});
// 这里可以增加一点延时,就可以正常接收到消息了,有点类似异步?
// thread::sleep(Duration::from_millis(200));
println!("receive {:?}", rx.try_recv());
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(1).unwrap();
});
// 这里可以增加一点延时,就可以正常接收到消息了,有点类似异步?
// thread::sleep(Duration::from_millis(200));
println!("receive {:?}", rx.try_recv());
}
五、传输具有所有权的数据
rust
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let s = String::from("我,飞走咯!");
tx.send(s).unwrap();
println!("val is {}", s);
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let s = String::from("我,飞走咯!");
tx.send(s).unwrap();
println!("val is {}", s);
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
以上代码中,String底层的字符串是存储在堆上,并没有实现Copy特征,当它被发送后,会将所有权从发送端的s转移给接收端的received,之后s将无法被使用。
也就是说发送的数据,需要实现Copy
特征
六、同步channel
之前我们使用的都是异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞
使用mpsc::sync_channel
可以创建同步管道。必须等待接收到消息后,才可以继续执行:
rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx)= mpsc::sync_channel(0);
let handle = thread::spawn(move || {
println!("发送之前");
tx.send(1).unwrap(); // 此处会被阻塞,等到主线程接收到消息,才会执行下一行代码
println!("发送之后");
});
println!("睡眠之前");
thread::sleep(Duration::from_secs(3));
println!("睡眠之后");
println!("receive {}", rx.recv().unwrap());
handle.join().unwrap();
}
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx)= mpsc::sync_channel(0);
let handle = thread::spawn(move || {
println!("发送之前");
tx.send(1).unwrap(); // 此处会被阻塞,等到主线程接收到消息,才会执行下一行代码
println!("发送之后");
});
println!("睡眠之前");
thread::sleep(Duration::from_secs(3));
println!("睡眠之后");
println!("receive {}", rx.recv().unwrap());
handle.join().unwrap();
}
但是我们可以发现,sync_channel
有一个参数 0, 这是用来干嘛的呢? 该值可以用来指定同步通道的消息缓存条数,当你设定为N时,发送者就可以无阻塞的往通道中发送N条消息,当消息缓冲队列满了后,新的消息发送将被阻塞(如果没有接收者消费缓冲队列中的消息,那么第N+1条消息就将触发发送阻塞
因此,使用异步消息虽然能非常高效且不会造成发送线程的阻塞,但是存在消息未及时消费,最终内存过大的问题。在实际项目中,可以考虑使用一个带缓冲值的同步通道来避免这种风险。
七、关闭channel
通道关闭的两个条件:发送者全部drop或接收者被drop,要结束for循环显然是要求发送者全部drop,但是由于send自身没有被drop,会导致该循环永远无法结束,最终主线程会一直阻塞。
drop(send)
可以关闭通道
八、如何传输多种类型的数据?
let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel()
通过这一行,我们指定了,发送和接收的类型为一个枚举值,也就代表我们可以放进去任何类型的值
rust
use std::sync::mpsc::{self, Receiver, Sender};
enum Fruit {
Apple(u8),
Orange(String)
}
fn main() {
let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();
tx.send(Fruit::Orange("sweet".to_string())).unwrap();
tx.send(Fruit::Apple(2)).unwrap();
for _ in 0..2 {
match rx.recv().unwrap() {
Fruit::Apple(count) => println!("received {} apples", count),
Fruit::Orange(flavor) => println!("received {} oranges", flavor),
}
}
}
use std::sync::mpsc::{self, Receiver, Sender};
enum Fruit {
Apple(u8),
Orange(String)
}
fn main() {
let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();
tx.send(Fruit::Orange("sweet".to_string())).unwrap();
tx.send(Fruit::Apple(2)).unwrap();
for _ in 0..2 {
match rx.recv().unwrap() {
Fruit::Apple(count) => println!("received {} apples", count),
Fruit::Orange(flavor) => println!("received {} oranges", flavor),
}
}
}
九、常用的开源库
如果你需要 mpmc(多发送者,多接收者)或者需要更高的性能,可以考虑第三方库:
- crossbeam-channel, 老牌强库,功能较全,性能较强,之前是独立的库,但是后面合并到了
crossbeam
主仓库中 - flume, 官方给出的性能数据某些场景要比 crossbeam 更好些