Rust进阶[part10]_并发-概念、线程、通信

Rust进阶[part10]_并发-概念、线程、通信
SoniaChenRust进阶[part10]_并发
基本概述
在Rust中,通过std::thread::spawn
函数可以创建新线程,该函数接收一个闭包作为参数,闭包中包含线程要执行的代码。
基础线程创建
子线程的生命周期依赖于主线程:当主线程结束时,无论子线程是否执行完毕,都会被强制终止。
use std::thread;
use std::time::Duration;
fn main() {
// 创建子线程
thread::spawn(|| {
for i in 1..10 {
println!("spawn thread: {}", i);
thread::sleep(Duration::from_millis(1)); // 模拟耗时操作
}
});
// 主线程逻辑
for i in 1..5 {
println!("main thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
}
输出结果(可能因调度顺序略有不同):
main thread: 1
spawn thread: 1
main thread: 2
spawn thread: 2
main thread: 3
spawn thread: 3
main thread: 4
spawn thread: 4
spawn thread: 5 // 主线程已结束,子线程可能被中断
等待线程完成:JoinHandle
通过thread::spawn
返回的JoinHandle
类型,可以让主线程等待子线程执行完毕。调用join
方法会阻塞当前线程,直到子线程完成。
use std::thread;
use std::time::Duration;
fn main() {
// 获取子线程的JoinHandle
let handle = thread::spawn(|| {
for i in 1..10 {
println!("spawn thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
// 主线程逻辑
for i in 1..5 {
println!("main thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
// 等待子线程完成(阻塞主线程)
handle.join().unwrap(); // join返回Result,unwrap处理成功情况
}
输出结果:主线程会等待子线程打印到9后再结束。
使用move闭包
move
闭包用于将外部变量的所有权转移到子线程中,解决多线程中变量生命周期的问题(避免子线程访问已被释放的变量)。
示例:不使用move导致的错误
use std::thread;
fn main() {
let s = String::from("hello");
// 错误:子线程可能比主线程生命周期长,s可能被提前释放
thread::spawn(|| {
println!("{}", s); // 编译错误:`s` does not live long enough
});
}
使用move闭包解决
use std::thread;
fn main() {
let s = String::from("hello");
// move闭包将s的所有权转移到子线程
thread::spawn(move || {
println!("{}", s); // 正确:子线程拥有s的所有权
}).join().unwrap();
}
多线程通信
Rust推荐通过消息传递实现安全并发,标准库提供的channel
(通道)是实现这一机制的核心工具。
Channel(通道)基础
通道由发送端(Sender) 和接收端(Receiver) 组成,数据从发送端发送,接收端接收,实现线程间通信。
- 导入:
use std::sync::mpsc;
(mpsc:多生产者单消费者,支持多个发送端向一个接收端发送数据) - 创建:
let (tx, rx) = mpsc::channel();
(返回包含发送端和接收端的元组)
基本使用示例
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 子线程发送数据
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap(); // 发送数据,失败时unwrap panic
thread::sleep(Duration::from_millis(500)); // 模拟间隔发送
}
});
// 主线程接收数据
for received in rx { // 迭代接收,直到发送端全部关闭
println!("Got: {}", received);
}
}
输出结果:
Got: hi
Got: from
Got: the
Got: thread
发送端与接收端方法
-
发送端(Sender):
send(T) -> Result<(), SendError<T>>
:发送数据,成功返回Ok(())
,失败(如接收端已关闭)返回Err
。
-
接收端(Receiver):
recv() -> Result<T, RecvError>
:阻塞当前线程,直到收到数据或发送端关闭(返回Err
)。try_recv() -> Result<T, TryRecvError>
:非阻塞,立即返回:- 有数据:
Ok(T)
- 无数据:
Err(TryRecvError::Empty)
- 发送端关闭:
Err(TryRecvError::Disconnected)
- 有数据:
多生产者示例
通过Sender::clone
可以创建多个发送端,实现多线程向同一接收端发送数据。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx2 = tx.clone(); // 克隆发送端
// 线程1发送数据
thread::spawn(move || {
let vals = vec![
String::from("thread1: hi"),
String::from("thread1: hello"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(500));
}
});
// 线程2发送数据
thread::spawn(move || {
let vals = vec![
String::from("thread2: bye"),
String::from("thread2: goodbye"),
];
for val in vals {
tx2.send(val).unwrap();
thread::sleep(Duration::from_millis(500));
}
});
// 接收数据
for received in rx {
println!("Got: {}", received);
}
}
练习
练习1——实现多线程文件处理器
任务描述
你需要编写一个多线程文件处理器,它从一个通道(channel)中接收文件路径,并在线程池中处理这些文件。文件处理的具体任务可以是读取文件内容并打印到控制台。你需要使用 Rust 的带缓冲区的 channel 来控制并发线程的数量,从而限制同时处理的文件数量。
具体要求
1.文件处理任务:定义一个函数process_file
,该函数接受一个文件路径,读取文件内容,并将内容打印到控制台。
2.多线程控制:
- 创建一个带缓冲区的
channel
,用于在主线程和工作线程之间传递文件路径。 - 使用多线程来实现文件处理的并发性,限制线程的并发数量(例如,最多同时处理 4 个文件)。
3.主线程作为生产者:主线程负责向通道发送文件路径。假设我们有 10 个文件路径要处理。
4.工作线程作为消费者:创建多个工作线程,每个线程从通道中接收文件路径,并调用process_file
函数来处理文件。
use crossbeam_channel::bounded;
use std::fs;
use std::path::Path;
// 实现多线程文件处理器,同时可以处理4个文件读取,说明有四个接受端 ,一个发送端
/* */
fn process_file(filename: &str) {
match fs::read_to_string(filename) {
Ok(content) => {
println!("Processing file: {}", filename);
println!("Content: {}", content);
}
Err(error) => {
eprintln!("Error reading file {}: {}", filename, error);
}
}
}
#[test]
fn test_thread_file_processor() {
let (tx, rx) = bounded(4);
let num_threads = 4;
let file_paths = vec![
"file1.txt",
"file2.txt",
"file3.txt",
"file4.txt",
"file5.txt",
"file6.txt",
"file7.txt",
"file8.txt",
"file9.txt",
"file10.txt",
];
let mut handles = vec![];
// 启动 4 个工作线程
for i in 0..num_threads {
let rx: crossbeam_channel::Receiver<String> = rx.clone();
let handle: thread::JoinHandle<()> = thread::spawn(move || {
for path in rx {
let content = fs::read_to_string(path).unwrap_or_else(|e| format!("Error: {}", e));
println!("Worker {}: {}", i, content);
}
});
handles.push(handle);
}
// 发送文件路径
for path in file_paths {
tx.send(path.to_string()).unwrap();
}
drop(tx);
for h in handles {
h.join().unwrap()
}
}
坑⚠️:
需要使用
use crossbeam_channel::bounded;
才能满足接收者可以克隆,mpsc::channel()
不能克隆接收者,只能克隆发送者let (tx, rx) = bounded(4);
需要
drop(tx);
来显式关闭发送端,使接收端退出for path in rx
循环
for path in rx
这是一个阻塞循环,只有在发送端关闭时才会退出
练习2——使用 Channel 实现程序的优雅停止
任务描述
- 任务描述
○ 在这道练习中,你需要编写一个多线程程序,该程序会创建多个工作线程,持续处理任务。在接收到停止信号时,所有工作线程应该优雅地停止工作,并确保所有未完成的任务都被处理完毕。
○ 你将使用 Rust 的 channel 来实现任务的调度和优雅停止机制。
具体要求
- 工作线程:
○ 创建一个工作线程池,工作线程从通道接收任务并处理。
○ 工作线程应能够响应停止信号,并在完成当前任务后优雅地退出。 - 任务结构:
○ 任务可以是简单的打印操作,模拟一些耗时工作,例如打印任务 ID 并暂停一段时间。 - 优雅停止:
○ 通过发送一个特殊的停止信号,通知所有工作线程停止接收新的任务,并在完成当前任务后退出。
○ 确保所有已接收的任务都被处理完毕。 - 主线程控制:
○ 主线程应当能够发送任务,也能够在适当的时候发送停止信号,触发工作线程的优雅停止。
答案:
- 优雅停止:工作线程处理完当前任务后退出,不中断正在执行的任务。
- 实现:通过特殊信号(如
Option::None
)通知线程停止,或关闭发送端触发接收端退出。
实现代码
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// 任务类型:包含任务ID和模拟耗时
type Task = Option<u32>; // None作为停止信号
// 处理任务
fn process_task(task_id: u32) {
println!("处理任务{}...", task_id);
thread::sleep(Duration::from_secs(1)); // 模拟耗时1秒
println!("任务{}处理完成", task_id);
}
fn main() {
let (tx, rx) = bounded(4);
let num_workers = 3;
let mut handles = vec![];
// 创建工作线程
for i in 0..num_workers {
let rx = rx.clone();
let handle = thread::spawn(move || {
println!("工作线程{}启动", i);
// 循环接收任务,直到收到None(停止信号)
while let Ok(task) = rx.recv() {
match task {
Some(task_id) => process_task(task_id),
None => {
println!("工作线程{}收到停止信号,退出", i);
break; // 退出循环,线程结束
}
}
}
});
handles.push(handle);
}
// 主线程发送任务
for task_id in 1..=5 {
tx.send(Some(task_id)).unwrap();
}
// 发送停止信号(每个线程一个)
for _ in 0..num_workers {
tx.send(None).unwrap();
}
// 等待所有工作线程退出
for handle in handles {
handle.join().unwrap();
}
println!("所有线程已优雅退出");
}
✅ 总结
方案 | 支持克隆 Receiver | 是否推荐 | 说明 |
---|---|---|---|
mpsc::sync_channel |
❌ | ❌ | 不支持克隆 Receiver |
mpsc::channel |
❌ | ✅ | 可克隆 Sender ,适合多发送者单接收者 |
crossbeam-channel |
✅ | ✅✅ | 支持克隆 Receiver ,适合多消费者 |
Arc<Mutex<Receiver>> |
❌ | ❌ | 不推荐,性能差,易出错 |
select! + bounded |
✅ | ✅ | 适合多线程监听同一通道 |
📌 推荐方案
优先使用 crossbeam-channel
的 bounded
或 unbounded
通道,它不仅支持克隆 Receiver
,还提供了更强大的功能,如 select!
、try_recv
、recv_timeout
等
共享内存实现并发
除了消息传递,还可以通过共享内存实现并发。Rust通过Mutex
(互斥锁)和Arc
(原子引用计数)保证共享内存的线程安全。
Mutex<T>(互斥锁)
Mutex
确保同一时间只有一个线程能访问共享数据,通过“锁”机制实现:
- 访问数据前需调用
lock()
获取锁(返回MutexGuard
,智能指针)。 MutexGuard
离开作用域时自动释放锁,避免死锁。
基本使用
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5); // 创建包含i32的Mutex
{
let mut num = m.lock().unwrap(); // 获取锁,返回MutexGuard
*num = 6; // 通过解引用修改数据
} // MutexGuard离开作用域,自动释放锁
println!("m = {:?}", m.lock().unwrap()); // 输出:m = 6
}
Arc<T>(原子引用计数)
Rc<T>
不适合多线程(非线程安全),Arc<T>
是线程安全版本的引用计数指针,通过原子操作保证计数的线程安全。常与Mutex
配合,实现多线程共享数据。
多线程共享计数器示例
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0)); // Arc包裹Mutex,实现多线程共享
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter); // 克隆Arc,增加引用计数
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap(); // 获取锁
*num += 1; // 修改计数器
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap()); // 输出:Result: 10
}
Send 和 Sync trait
Rust通过这两个标记 trait 保证线程安全:
- Send:标记类型可以安全地转移到另一个线程(所有权转移)。
- Sync:标记类型可以安全地被多个线程共享(即
&T
是Send
的)。
默认实现:
- 基本类型(
i32
、bool
等)和大多数标准库类型(String
、Vec
等)都实现了Send
和Sync
。 Rc<T>
不实现Send
和Sync
(非线程安全)。Arc<T>
实现Send
和Sync
(线程安全)。Mutex<T>
当T
是Send
时,Mutex<T>
实现Sync
。
练习3——实现多线程任务调度器
任务描述
你需要编写一个简单的多线程任务调度器,它能够接收多个任务,并将这些任务分发到多个工作线程中执行。调度器使用 Channel 进行任务的分发和结果的收集。你需要使用 Rust 的Send
和Sync
特性来确保任务调度器在多线程环境中的安全性。
具体要求
定义一个Task
结构体,表示需要执行的任务。任务包含一个唯一的id
和一个用于执行的闭包。
调度器结构:
创建一个Scheduler
结构体,包含一个任务队列和一个线程池。调度器应当使用channel
来分发任务到不同的工作线程。
功能实现:
调度器应当具有以下功能:
- 添加任务:向调度器添加一个任务。
- 启动调度器:启动多个线程,开始从任务队列中获取任务并执行。
- 获取结果:在所有任务完成后,收集并打印每个任务的执行结果。
多线程安全:
- 通过使用
Arc
和Mutex
确保任务队列在多个线程之间的安全访问。 - 确保任务的结果能够正确地在线程之间传递和收集。
任务分析
- 核心:用channel分发任务,用
Arc
+Mutex
管理任务队列和结果收集。 - 结构:
Scheduler
包含任务发送端、结果接收端和工作线程句柄。
实现代码
use crossbeam_channel as channel;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
// ------------------实现多线程任务调度器------------------------
/**
* 多线程任务调度器,能够接收多个任务,并将多个任务分配给多个线程执行。需要使用send和sync trait
* 来确保任务调度器可以安全地被多个线程访问。
*/
use std::sync::Arc;
// 任务结构体:包含ID和执行闭包(返回结果)
struct Task {
id: u32,
func: Box<dyn FnOnce() -> String + Send + 'static>, // 闭包需实现Send
}
// 调度器结构体
struct Scheduler {
task_tx: Option<channel::Sender<Task>>, // 任务发送端 (使用Option便于take)
result_rx: Arc<Mutex<channel::Receiver<String>>>, // 结果接收端(共享)
workers: Option<Vec<thread::JoinHandle<()>>>, // 工作线程句柄队列 (使用Option便于take)
}
impl Scheduler {
// 创建新调度器
fn new(num_workers: usize) -> Self {
let (task_tx, task_rx) = channel::unbounded();
let (result_tx, result_rx) = channel::unbounded();
let result_rx = Arc::new(Mutex::new(result_rx));
let mut workers = vec![];
// 创建工作线程
for _ in 0..num_workers {
// 注意:crossbeam-channel的Receiver支持clone
let task_rx_clone: channel::Receiver<Task> = task_rx.clone();
let result_tx: channel::Sender<String> = result_tx.clone();
let handle = thread::spawn(move || {
// 循环接收任务并执行
while let Ok(task) = task_rx_clone.recv() {
println!("执行任务{}", task.id);
let result = (task.func)(); // 执行任务
result_tx.send(result).unwrap(); // 发送结果
}
});
workers.push(handle);
}
Scheduler {
task_tx: Some(task_tx),
result_rx,
workers: Some(workers),
}
}
// 添加任务
fn add_task(&self, id: u32, func: Box<dyn FnOnce() -> String + Send + 'static>) {
if let Some(ref tx) = self.task_tx {
tx.send(Task { id, func }).unwrap();
}
}
// 等待所有任务完成并收集结果
fn wait_and_collect_results(&mut self) -> Vec<String> {
// 关闭任务发送端,让工作线程接收完毕后退出
self.task_tx.take(); // 使用take获取所有权并使原位置为None
// 等待所有工作线程完成
if let Some(workers) = self.workers.take() {
// 使用take获取所有权
for handle in workers {
handle.join().unwrap();
}
}
// 收集结果
let mut results = vec![];
let result_rx = self.result_rx.lock().unwrap();
// 使用try_iter()来获取已有的结果,避免阻塞
for result in result_rx.try_iter() {
results.push(result);
}
results
}
}
fn main() {
let mut scheduler = Scheduler::new(2); // 创建2个工作线程
// 添加任务
for i in 1..=5 {
scheduler.add_task(
i,
Box::new(move || {
thread::sleep(Duration::from_secs(1)); // 模拟耗时
format!("任务{}执行完成", i)
}),
);
}
// 收集结果
let results = scheduler.wait_and_collect_results();
println!("所有任务结果:");
for res in results {
println!("{}", res);
}
}
总结
Rust通过所有权系统、Send
/Sync
trait、channel和Mutex
/Arc
等工具,在编译期保证并发安全,避免数据竞争。开发中应根据场景选择消息传递(channel)或共享内存(Mutex
+Arc
)模式,优先使用消息传递(更易保证安全)。