Appearance
线程同步:锁、Condvar 和信号量
一、单线程中使用 Mutex
使用方法m.lock()向m申请一个锁, 该方法会阻塞当前线程,直到获取到锁,因此当多个线程同时访问该数据时,只有一个线程能获取到锁,其它线程只能阻塞着等待,这样就保证了数据能被安全的修改!
rust
use std::sync::Mutex;
fn main() {
// 使用`Mutex`结构体的关联函数创建新的互斥锁实例
let m = Mutex::new(5);
{
// 获取锁,然后deref为`m`的引用
// lock返回的是Result
let mut num = m.lock().unwrap();
*num = 6;
// 锁自动被drop
}
println!("m = {:?}", m);
}
use std::sync::Mutex;
fn main() {
// 使用`Mutex`结构体的关联函数创建新的互斥锁实例
let m = Mutex::new(5);
{
// 获取锁,然后deref为`m`的引用
// lock返回的是Result
let mut num = m.lock().unwrap();
*num = 6;
// 锁自动被drop
}
println!("m = {:?}", m);
}
m.lock()
方法也有可能报错,例如当前正在持有锁的线程panic了。在这种情况下,其它线程不可能再获得锁,因此lock方法会返回一个错误。m.lock()
返回一个智能指针MutexGuard<T>
它实现了Deref特征,会被自动解引用后获得一个引用类型,该引用指向Mutex内部的数据, 还实现了Drop特征,在超出作用域后,自动释放锁,以便其它线程能继续获取锁
二、多线程中使用 Mutex
由于Mutex<T>
可以支持修改内部数据,当结合Arc<T>
一起使用时,可以实现多线程的内部可变性。
rust
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
- 在使用数据前必须先获取锁
- 在数据使用完成后,必须及时的释放锁,比如文章开头的例子,使用内部语句块的目的就是为了及时的释放锁
三、单线程死锁
在一个线程中,当获取一个锁之后,线程会被阻塞,除非这个锁被释放了。所以下面这个例子会被死锁
rust
use std::sync::Mutex;
fn main() {
let data = Mutex::new(0);
let d1 = data.lock();
let d2 = data.lock();
} // d1锁在此处释放
use std::sync::Mutex;
fn main() {
let data = Mutex::new(0);
let d1 = data.lock();
let d2 = data.lock();
} // d1锁在此处释放
四、多线程死锁
当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁时,就可能发生死锁
rust
use std::{sync::{Mutex, MutexGuard}, thread};
use std::thread::sleep;
use std::time::Duration;
use lazy_static::lazy_static;
lazy_static! {
static ref MUTEX1: Mutex<i64> = Mutex::new(0);
static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}
fn main() {
// 存放子线程的句柄
let mut children = vec![];
for i_thread in 0..2 {
children.push(thread::spawn(move || {
for _ in 0..1 {
// 线程1
if i_thread % 2 == 0 {
// 锁住MUTEX1
let guard: MutexGuard<i64> = MUTEX1.lock().unwrap();
println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread);
// 当前线程睡眠一小会儿,等待线程2锁住MUTEX2
sleep(Duration::from_millis(10));
// 去锁MUTEX2
let guard = MUTEX2.lock().unwrap();
// 线程2
} else {
// 锁住MUTEX2
let _guard = MUTEX2.lock().unwrap();
println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread);
let _guard = MUTEX1.lock().unwrap();
}
}
}));
}
// 等子线程完成
for child in children {
let _ = child.join();
}
println!("死锁没有发生");
}
use std::{sync::{Mutex, MutexGuard}, thread};
use std::thread::sleep;
use std::time::Duration;
use lazy_static::lazy_static;
lazy_static! {
static ref MUTEX1: Mutex<i64> = Mutex::new(0);
static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}
fn main() {
// 存放子线程的句柄
let mut children = vec![];
for i_thread in 0..2 {
children.push(thread::spawn(move || {
for _ in 0..1 {
// 线程1
if i_thread % 2 == 0 {
// 锁住MUTEX1
let guard: MutexGuard<i64> = MUTEX1.lock().unwrap();
println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread);
// 当前线程睡眠一小会儿,等待线程2锁住MUTEX2
sleep(Duration::from_millis(10));
// 去锁MUTEX2
let guard = MUTEX2.lock().unwrap();
// 线程2
} else {
// 锁住MUTEX2
let _guard = MUTEX2.lock().unwrap();
println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread);
let _guard = MUTEX1.lock().unwrap();
}
}
}));
}
// 等子线程完成
for child in children {
let _ = child.join();
}
println!("死锁没有发生");
}
线程 1 锁住了MUTEX1, 并且线程2锁住了MUTEX2,然后线程 1 试图去访问MUTEX2,同时线程2试图去访问MUTEX1,就会死锁。 因为线程 2 需要等待线程 1 释放MUTEX1后,才会释放MUTEX2,而与此同时,线程 1 需要等待线程 2 释放MUTEX2后才能释放MUTEX1,这种情况造成了两个线程都无法释放对方需要的锁,最终死锁。
五、使用try_lock防止死锁
try_lock
会尝试去获取一次锁,如果无法获取会返回一个错误,因此不会发生阻塞:
rust
use std::{sync::{Mutex, MutexGuard}, thread};
use std::thread::sleep;
use std::time::Duration;
use lazy_static::lazy_static;
lazy_static! {
static ref MUTEX1: Mutex<i64> = Mutex::new(0);
static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}
fn main() {
// 存放子线程的句柄
let mut children = vec![];
for i_thread in 0..2 {
children.push(thread::spawn(move || {
for _ in 0..1 {
// 线程1
if i_thread % 2 == 0 {
// 锁住MUTEX1
let guard: MutexGuard<i64> = MUTEX1.lock().unwrap();
println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread);
// 当前线程睡眠一小会儿,等待线程2锁住MUTEX2
sleep(Duration::from_millis(10));
// 去锁MUTEX2
let guard = MUTEX2.try_lock();
println!("线程 {} 获取 MUTEX2 锁的结果: {:?}", i_thread, guard);
// 线程2
} else {
// 锁住MUTEX2
let _guard = MUTEX2.lock().unwrap();
println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread);
sleep(Duration::from_millis(10));
let guard = MUTEX1.try_lock();
println!("线程 {} 获取 MUTEX1 锁的结果: {:?}", i_thread, guard);
}
}
}));
}
// 等子线程完成
for child in children {
let _ = child.join();
}
println!("死锁没有发生");
}
use std::{sync::{Mutex, MutexGuard}, thread};
use std::thread::sleep;
use std::time::Duration;
use lazy_static::lazy_static;
lazy_static! {
static ref MUTEX1: Mutex<i64> = Mutex::new(0);
static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}
fn main() {
// 存放子线程的句柄
let mut children = vec![];
for i_thread in 0..2 {
children.push(thread::spawn(move || {
for _ in 0..1 {
// 线程1
if i_thread % 2 == 0 {
// 锁住MUTEX1
let guard: MutexGuard<i64> = MUTEX1.lock().unwrap();
println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread);
// 当前线程睡眠一小会儿,等待线程2锁住MUTEX2
sleep(Duration::from_millis(10));
// 去锁MUTEX2
let guard = MUTEX2.try_lock();
println!("线程 {} 获取 MUTEX2 锁的结果: {:?}", i_thread, guard);
// 线程2
} else {
// 锁住MUTEX2
let _guard = MUTEX2.lock().unwrap();
println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread);
sleep(Duration::from_millis(10));
let guard = MUTEX1.try_lock();
println!("线程 {} 获取 MUTEX1 锁的结果: {:?}", i_thread, guard);
}
}
}));
}
// 等子线程完成
for child in children {
let _ = child.join();
}
println!("死锁没有发生");
}
当try_lock失败时,会报出一个错误:Err("WouldBlock"),接着线程中的剩余代码会继续执行,不会被阻塞。
六、读写锁 RwLock
Mutex会对每次读写都进行加锁,但某些时候,我们需要大量的并发读,Mutex就无法满足需求了,此时就可以使用RwLock:
rust
use std::sync::RwLock;
fn main() {
let lock = RwLock::new(5);
// 同一时间允许多个读
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // 读锁在此处被drop
// 同一时间只允许一个写
{
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);
// 以下代码会阻塞发生死锁,因为读和写不允许同时存在
// 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
// let r1 = lock.read();
// println!("{:?}",r1);
}// 写锁在此处被drop
}
use std::sync::RwLock;
fn main() {
let lock = RwLock::new(5);
// 同一时间允许多个读
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // 读锁在此处被drop
// 同一时间只允许一个写
{
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);
// 以下代码会阻塞发生死锁,因为读和写不允许同时存在
// 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
// let r1 = lock.read();
// println!("{:?}",r1);
}// 写锁在此处被drop
}
首先简单性上Mutex
完胜,因为使用RwLock
你得操心几个问题:
- 读和写不能同时发生,如果使用
try_xxx
解决,就必须做大量的错误处理和失败重试机制 - 当读多写少时,写操作可能会因为一直无法获得锁导致连续多次失败(writer starvation)
- RwLock 其实是操作系统提供的,实现原理要比
Mutex
复杂的多,因此单就锁的性能而言,比不上原生实现的Mutex
七、三方库推荐
- parking_lot, 功能更完善、稳定,社区较为活跃,star 较多,更新较为活跃
- spin, 在多数场景中性能比
parking_lot
高一点,最近没怎么更新
八、使用信号量 Semaphore 控制最大并发量
rust
use std::sync::Arc;
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let mut join_handles = Vec::new();
for _ in 0..5 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_handles.push(tokio::spawn(async move {
//
// 在这里执行任务...
//
drop(permit);
}));
}
for handle in join_handles {
handle.await.unwrap();
}
}
use std::sync::Arc;
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let mut join_handles = Vec::new();
for _ in 0..5 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_handles.push(tokio::spawn(async move {
//
// 在这里执行任务...
//
drop(permit);
}));
}
for handle in join_handles {
handle.await.unwrap();
}
}
上面代码创建了一个容量为 3 的信号量,当正在执行的任务超过 3 时,剩下的任务需要等待正在执行任务完成并减少信号量后到 3 以内时,才能继续执行。