Appearance
线程同步:Atomic 原子类型与内存顺序
Mutex
用起来简单,但是无法并发读,RwLock
可以并发读,但是使用场景较为受限且性能不够,那么有没有一种全能性选手呢? 那就是Atomic
从 Rust1.34 版本后,就正式支持原子类型。原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。
由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,几乎所有的语言都支持原子类型。
可以看出原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了CAS循环,当大量的冲突发生时,该等待还是得等待!但是总归比锁要好。
一、使用 Atomic 作为全局变量
rust
use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn add_n_times(n: u64) -> JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n {
R.fetch_add(1, Ordering::Relaxed);
}
})
}
fn main() {
let s = Instant::now();
let mut threads = Vec::with_capacity(N_THREADS);
for _ in 0..N_THREADS {
threads.push(add_n_times(N_TIMES));
}
for thread in threads {
thread.join().unwrap();
}
assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
println!("{:?}",Instant::now().sub(s));
}
use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn add_n_times(n: u64) -> JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n {
R.fetch_add(1, Ordering::Relaxed);
}
})
}
fn main() {
let s = Instant::now();
let mut threads = Vec::with_capacity(N_THREADS);
for _ in 0..N_THREADS {
threads.push(add_n_times(N_TIMES));
}
for thread in threads {
thread.join().unwrap();
}
assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
println!("{:?}",Instant::now().sub(s));
}
以上代码启动了数个线程,每个线程都在疯狂对全局变量进行加 1 操作, 最后将它与线程数 * 加1次数进行比较,如果发生了因为多个线程同时修改导致了脏数据,那么这两个必将不相等。好在,它没有让我们失望,不仅快速的完成了任务,而且保证了 100%的并发安全性。
Atomic的值具有内部可变性,你无需将其声明为mut:
rust
use std::sync::Mutex;
use std::sync::atomic::{Ordering, AtomicU64};
struct Counter {
count: u64
}
fn main() {
let n = Mutex::new(Counter {
count: 0
});
n.lock().unwrap().count += 1;
let n = AtomicU64::new(0);
n.fetch_add(0, Ordering::Relaxed);
}
use std::sync::Mutex;
use std::sync::atomic::{Ordering, AtomicU64};
struct Counter {
count: u64
}
fn main() {
let n = Mutex::new(Counter {
count: 0
});
n.lock().unwrap().count += 1;
let n = AtomicU64::new(0);
n.fetch_add(0, Ordering::Relaxed);
}
这里有一个奇怪的枚举成员Ordering::Relaxed
, 看上去很像是排序作用,但是我们并没有做排序操作啊?实际上它用于控制原子操作使用的内存顺序
。
限定内存顺序的 5 个规则
在理解了内存顺序可能存在的改变后,你就可以明白为什么 Rust 提供了Ordering::Relaxed
用于限定内存顺序了,事实上,该枚举有 5 个成员:
- Relaxed, 这是最宽松的规则,它对编译器和 CPU 不做任何限制,可以乱序
- Release 释放,设定内存屏障(Memory barrier),保证它之前的操作永远在它之前,但是它后面的操作可能被重排到它前面
- Acquire 获取, 设定内存屏障,保证在它之后的访问永远在它之后,但是它之前的操作却有可能被重排到它后面,往往和
Release
在不同线程中联合使用 - AcqRel, 是 Acquire 和 Release 的结合,同时拥有它们俩提供的保证。比如你要对一个
atomic
自增 1,同时希望该操作之前和之后的读取或写入操作不会被重新排序 - SeqCst 顺序一致性,
SeqCst
就像是AcqRel
的加强版,它不管原子操作是属于读取还是写入的操作,只要某个线程有用到SeqCst
的原子操作,线程中该SeqCst
操作前的数据操作绝对不会被重新排在该SeqCst
操作之后,且该SeqCst
操作后的数据操作也绝对不会被重新排在SeqCst
操作前。
多线程中使用 Atomic
rust
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{hint, thread};
fn main() {
let spinlock = Arc::new(AtomicUsize::new(1));
let spinlock_clone = Arc::clone(&spinlock);
let thread = thread::spawn(move|| {
spinlock_clone.store(0, Ordering::SeqCst);
});
// 等待其它线程释放锁
while spinlock.load(Ordering::SeqCst) != 0 {
hint::spin_loop();
}
if let Err(panic) = thread.join() {
println!("Thread had an error: {:?}", panic);
}
}
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{hint, thread};
fn main() {
let spinlock = Arc::new(AtomicUsize::new(1));
let spinlock_clone = Arc::clone(&spinlock);
let thread = thread::spawn(move|| {
spinlock_clone.store(0, Ordering::SeqCst);
});
// 等待其它线程释放锁
while spinlock.load(Ordering::SeqCst) != 0 {
hint::spin_loop();
}
if let Err(panic) = thread.join() {
println!("Thread had an error: {:?}", panic);
}
}