Rust进阶[part10]_并发-概念、线程、通信

Rust进阶[part10]_并发

基本概述

在Rust中,通过std::thread::spawn函数可以创建新线程,该函数接收一个闭包作为参数,闭包中包含线程要执行的代码。

基础线程创建

子线程的生命周期依赖于主线程:当主线程结束时,无论子线程是否执行完毕,都会被强制终止。

use std::thread;
use std::time::Duration;

fn main() {
    // 创建子线程
    thread::spawn(|| {
        for i in 1..10 {
            println!("spawn thread: {}", i);
            thread::sleep(Duration::from_millis(1)); // 模拟耗时操作
        }
    });

    // 主线程逻辑
    for i in 1..5 {
        println!("main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }
}

输出结果(可能因调度顺序略有不同):

main thread: 1
spawn thread: 1
main thread: 2
spawn thread: 2
main thread: 3
spawn thread: 3
main thread: 4
spawn thread: 4
spawn thread: 5  // 主线程已结束,子线程可能被中断

等待线程完成:JoinHandle

通过thread::spawn返回的JoinHandle类型,可以让主线程等待子线程执行完毕。调用join方法会阻塞当前线程,直到子线程完成。

use std::thread;
use std::time::Duration;

fn main() {
    // 获取子线程的JoinHandle
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("spawn thread: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    // 主线程逻辑
    for i in 1..5 {
        println!("main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }

    // 等待子线程完成(阻塞主线程)
    handle.join().unwrap(); // join返回Result,unwrap处理成功情况
}

输出结果:主线程会等待子线程打印到9后再结束。

使用move闭包

move闭包用于将外部变量的所有权转移到子线程中,解决多线程中变量生命周期的问题(避免子线程访问已被释放的变量)。

示例:不使用move导致的错误

use std::thread;

fn main() {
    let s = String::from("hello");
    // 错误:子线程可能比主线程生命周期长,s可能被提前释放
    thread::spawn(|| {
        println!("{}", s); // 编译错误:`s` does not live long enough
    });
}

使用move闭包解决

use std::thread;

fn main() {
    let s = String::from("hello");
    // move闭包将s的所有权转移到子线程
    thread::spawn(move || {
        println!("{}", s); // 正确:子线程拥有s的所有权
    }).join().unwrap();
}

多线程通信

Rust推荐通过消息传递实现安全并发,标准库提供的channel(通道)是实现这一机制的核心工具。

Channel(通道)基础

通道由发送端(Sender)接收端(Receiver) 组成,数据从发送端发送,接收端接收,实现线程间通信。

  • 导入:use std::sync::mpsc;(mpsc:多生产者单消费者,支持多个发送端向一个接收端发送数据)
  • 创建:let (tx, rx) = mpsc::channel();(返回包含发送端和接收端的元组)

基本使用示例

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 子线程发送数据
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap(); // 发送数据,失败时unwrap panic
            thread::sleep(Duration::from_millis(500)); // 模拟间隔发送
        }
    });

    // 主线程接收数据
    for received in rx { // 迭代接收,直到发送端全部关闭
        println!("Got: {}", received);
    }
}

输出结果

Got: hi
Got: from
Got: the
Got: thread

发送端与接收端方法

  • 发送端(Sender)

    • send(T) -> Result<(), SendError<T>>:发送数据,成功返回Ok(()),失败(如接收端已关闭)返回Err
  • 接收端(Receiver)

    • recv() -> Result<T, RecvError>:阻塞当前线程,直到收到数据或发送端关闭(返回Err)。
    • try_recv() -> Result<T, TryRecvError>:非阻塞,立即返回:
      • 有数据:Ok(T)
      • 无数据:Err(TryRecvError::Empty)
      • 发送端关闭:Err(TryRecvError::Disconnected)

多生产者示例

通过Sender::clone可以创建多个发送端,实现多线程向同一接收端发送数据。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx2 = tx.clone(); // 克隆发送端

    // 线程1发送数据
    thread::spawn(move || {
        let vals = vec![
            String::from("thread1: hi"),
            String::from("thread1: hello"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
    });

    // 线程2发送数据
    thread::spawn(move || {
        let vals = vec![
            String::from("thread2: bye"),
            String::from("thread2: goodbye"),
        ];
        for val in vals {
            tx2.send(val).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
    });

    // 接收数据
    for received in rx {
        println!("Got: {}", received);
    }
}

练习

练习1——实现多线程文件处理器

任务描述

你需要编写一个多线程文件处理器,它从一个通道(channel)中接收文件路径,并在线程池中处理这些文件。文件处理的具体任务可以是读取文件内容并打印到控制台。你需要使用 Rust 的带缓冲区的 channel 来控制并发线程的数量,从而限制同时处理的文件数量。

具体要求

1.文件处理任务:定义一个函数process_file,该函数接受一个文件路径,读取文件内容,并将内容打印到控制台。

2.多线程控制:

  • 创建一个带缓冲区的channel,用于在主线程和工作线程之间传递文件路径。
  • 使用多线程来实现文件处理的并发性,限制线程的并发数量(例如,最多同时处理 4 个文件)。

3.主线程作为生产者:主线程负责向通道发送文件路径。假设我们有 10 个文件路径要处理。

4.工作线程作为消费者:创建多个工作线程,每个线程从通道中接收文件路径,并调用process_file函数来处理文件。

use crossbeam_channel::bounded;
use std::fs;
use std::path::Path;

// 实现多线程文件处理器,同时可以处理4个文件读取,说明有四个接受端 ,一个发送端
/* */
fn process_file(filename: &str) {
    match fs::read_to_string(filename) {
        Ok(content) => {
            println!("Processing file: {}", filename);
            println!("Content: {}", content);
        }
        Err(error) => {
            eprintln!("Error reading file {}: {}", filename, error);
        }
    }
}

#[test]
fn test_thread_file_processor() {
    let (tx, rx) = bounded(4);
    let num_threads = 4;
    let file_paths = vec![
        "file1.txt",
        "file2.txt",
        "file3.txt",
        "file4.txt",
        "file5.txt",
        "file6.txt",
        "file7.txt",
        "file8.txt",
        "file9.txt",
        "file10.txt",
    ];

    let mut handles = vec![];
    // 启动 4 个工作线程
    for i in 0..num_threads {
        let rx: crossbeam_channel::Receiver<String> = rx.clone();
        let handle: thread::JoinHandle<()> = thread::spawn(move || {
            for path in rx {
                let content = fs::read_to_string(path).unwrap_or_else(|e| format!("Error: {}", e));
                println!("Worker {}: {}", i, content);
            }
        });
        handles.push(handle);
    }

    // 发送文件路径
    for path in file_paths {
        tx.send(path.to_string()).unwrap();
    }
  
    
    drop(tx);

    for h in handles {
        h.join().unwrap()
    }
}

坑⚠️:

需要使用use crossbeam_channel::bounded; 才能满足接收者可以克隆,mpsc::channel()不能克隆接收者,只能克隆发送者

let (tx, rx) = bounded(4);

需要 drop(tx);来显式关闭发送端,使接收端退出 for path in rx 循环

for path in rx这是一个阻塞循环,只有在发送端关闭时才会退出

练习2——使用 Channel 实现程序的优雅停止

任务描述

  • 任务描述
    ○ 在这道练习中,你需要编写一个多线程程序,该程序会创建多个工作线程,持续处理任务。在接收到停止信号时,所有工作线程应该优雅地停止工作,并确保所有未完成的任务都被处理完毕。
    ○ 你将使用 Rust 的 channel 来实现任务的调度和优雅停止机制。

具体要求

  • 工作线程:
    ○ 创建一个工作线程池,工作线程从通道接收任务并处理。
    ○ 工作线程应能够响应停止信号,并在完成当前任务后优雅地退出。
  • 任务结构:
    ○ 任务可以是简单的打印操作,模拟一些耗时工作,例如打印任务 ID 并暂停一段时间。
  • 优雅停止:
    ○ 通过发送一个特殊的停止信号,通知所有工作线程停止接收新的任务,并在完成当前任务后退出。
    ○ 确保所有已接收的任务都被处理完毕。
  • 主线程控制:
    ○ 主线程应当能够发送任务,也能够在适当的时候发送停止信号,触发工作线程的优雅停止。

答案:

  • 优雅停止:工作线程处理完当前任务后退出,不中断正在执行的任务。
  • 实现:通过特殊信号(如Option::None)通知线程停止,或关闭发送端触发接收端退出。

实现代码

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

// 任务类型:包含任务ID和模拟耗时
type Task = Option<u32>; // None作为停止信号

// 处理任务
fn process_task(task_id: u32) {
    println!("处理任务{}...", task_id);
    thread::sleep(Duration::from_secs(1)); // 模拟耗时1秒
    println!("任务{}处理完成", task_id);
}

fn main() {
    let (tx, rx) = bounded(4);
    let num_workers = 3;
    let mut handles = vec![];

    // 创建工作线程
    for i in 0..num_workers {
        let rx = rx.clone();
        let handle = thread::spawn(move || {
            println!("工作线程{}启动", i);
            // 循环接收任务,直到收到None(停止信号)
            while let Ok(task) = rx.recv() {
                match task {
                    Some(task_id) => process_task(task_id),
                    None => {
                        println!("工作线程{}收到停止信号,退出", i);
                        break; // 退出循环,线程结束
                    }
                }
            }
        });
        handles.push(handle);
    }

    // 主线程发送任务
    for task_id in 1..=5 {
        tx.send(Some(task_id)).unwrap();
    }

    // 发送停止信号(每个线程一个)
    for _ in 0..num_workers {
        tx.send(None).unwrap();
    }

    // 等待所有工作线程退出
    for handle in handles {
        handle.join().unwrap();
    }
    println!("所有线程已优雅退出");
}

✅ 总结

方案 支持克隆 Receiver 是否推荐 说明
mpsc::sync_channel 不支持克隆 Receiver
mpsc::channel 可克隆 Sender,适合多发送者单接收者
crossbeam-channel ✅✅ 支持克隆 Receiver,适合多消费者
Arc<Mutex<Receiver>> 不推荐,性能差,易出错
select! + bounded 适合多线程监听同一通道

📌 推荐方案

优先使用 crossbeam-channelboundedunbounded 通道,它不仅支持克隆 Receiver,还提供了更强大的功能,如 select!try_recvrecv_timeout

共享内存实现并发

除了消息传递,还可以通过共享内存实现并发。Rust通过Mutex(互斥锁)和Arc(原子引用计数)保证共享内存的线程安全。

Mutex<T>(互斥锁)

Mutex确保同一时间只有一个线程能访问共享数据,通过“锁”机制实现:

  • 访问数据前需调用lock()获取锁(返回MutexGuard,智能指针)。
  • MutexGuard离开作用域时自动释放锁,避免死锁。

基本使用

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5); // 创建包含i32的Mutex

    {
        let mut num = m.lock().unwrap(); // 获取锁,返回MutexGuard
        *num = 6; // 通过解引用修改数据
    } // MutexGuard离开作用域,自动释放锁

    println!("m = {:?}", m.lock().unwrap()); // 输出:m = 6
}

Arc<T>(原子引用计数)

Rc<T>不适合多线程(非线程安全),Arc<T>是线程安全版本的引用计数指针,通过原子操作保证计数的线程安全。常与Mutex配合,实现多线程共享数据。

多线程共享计数器示例

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0)); // Arc包裹Mutex,实现多线程共享
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter); // 克隆Arc,增加引用计数
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap(); // 获取锁
            *num += 1; // 修改计数器
        });
        handles.push(handle);
    }

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap()); // 输出:Result: 10
}

Send 和 Sync trait

Rust通过这两个标记 trait 保证线程安全:

  • Send:标记类型可以安全地转移到另一个线程(所有权转移)。
  • Sync:标记类型可以安全地被多个线程共享(即&TSend的)。

默认实现

  • 基本类型(i32bool等)和大多数标准库类型(StringVec等)都实现了SendSync
  • Rc<T>不实现SendSync(非线程安全)。
  • Arc<T>实现SendSync(线程安全)。
  • Mutex<T>TSend时,Mutex<T>实现Sync

练习3——实现多线程任务调度器

任务描述

你需要编写一个简单的多线程任务调度器,它能够接收多个任务,并将这些任务分发到多个工作线程中执行。调度器使用 Channel 进行任务的分发和结果的收集。你需要使用 Rust 的SendSync特性来确保任务调度器在多线程环境中的安全性。

具体要求

定义一个Task结构体,表示需要执行的任务。任务包含一个唯一的id和一个用于执行的闭包。

调度器结构:

创建一个Scheduler结构体,包含一个任务队列和一个线程池。调度器应当使用channel来分发任务到不同的工作线程。

功能实现:

调度器应当具有以下功能:

  • 添加任务:向调度器添加一个任务。
  • 启动调度器:启动多个线程,开始从任务队列中获取任务并执行。
  • 获取结果:在所有任务完成后,收集并打印每个任务的执行结果。

多线程安全:

  • 通过使用ArcMutex确保任务队列在多个线程之间的安全访问。
  • 确保任务的结果能够正确地在线程之间传递和收集。

任务分析

  • 核心:用channel分发任务,用Arc+Mutex管理任务队列和结果收集。
  • 结构:Scheduler包含任务发送端、结果接收端和工作线程句柄。

实现代码

use crossbeam_channel as channel;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

// ------------------实现多线程任务调度器------------------------
/**
 * 多线程任务调度器,能够接收多个任务,并将多个任务分配给多个线程执行。需要使用send和sync trait
 * 来确保任务调度器可以安全地被多个线程访问。
 */
use std::sync::Arc;
// 任务结构体:包含ID和执行闭包(返回结果)
struct Task {
    id: u32,
    func: Box<dyn FnOnce() -> String + Send + 'static>, // 闭包需实现Send
}

// 调度器结构体
struct Scheduler {
    task_tx: Option<channel::Sender<Task>>, // 任务发送端 (使用Option便于take)
    result_rx: Arc<Mutex<channel::Receiver<String>>>, // 结果接收端(共享)
    workers: Option<Vec<thread::JoinHandle<()>>>, // 工作线程句柄队列 (使用Option便于take)
}

impl Scheduler {
    // 创建新调度器
    fn new(num_workers: usize) -> Self {
        let (task_tx, task_rx) = channel::unbounded();
        let (result_tx, result_rx) = channel::unbounded();
        let result_rx = Arc::new(Mutex::new(result_rx));
        let mut workers = vec![];

        // 创建工作线程
        for _ in 0..num_workers {
            // 注意:crossbeam-channel的Receiver支持clone
            let task_rx_clone: channel::Receiver<Task> = task_rx.clone();
            let result_tx: channel::Sender<String> = result_tx.clone();

            let handle = thread::spawn(move || {
                // 循环接收任务并执行
                while let Ok(task) = task_rx_clone.recv() {
                    println!("执行任务{}", task.id);
                    let result = (task.func)(); // 执行任务
                    result_tx.send(result).unwrap(); // 发送结果
                }
            });
            workers.push(handle);
        }

        Scheduler {
            task_tx: Some(task_tx),
            result_rx,
            workers: Some(workers),
        }
    }

    // 添加任务
    fn add_task(&self, id: u32, func: Box<dyn FnOnce() -> String + Send + 'static>) {
        if let Some(ref tx) = self.task_tx {
            tx.send(Task { id, func }).unwrap();
        }
    }

    // 等待所有任务完成并收集结果
    fn wait_and_collect_results(&mut self) -> Vec<String> {
        // 关闭任务发送端,让工作线程接收完毕后退出
        self.task_tx.take(); // 使用take获取所有权并使原位置为None

        // 等待所有工作线程完成
        if let Some(workers) = self.workers.take() {
            // 使用take获取所有权
            for handle in workers {
                handle.join().unwrap();
            }
        }

        // 收集结果
        let mut results = vec![];
        let result_rx = self.result_rx.lock().unwrap();
        // 使用try_iter()来获取已有的结果,避免阻塞
        for result in result_rx.try_iter() {
            results.push(result);
        }
        results
    }
}

fn main() {
    let mut scheduler = Scheduler::new(2); // 创建2个工作线程

    // 添加任务
    for i in 1..=5 {
        scheduler.add_task(
            i,
            Box::new(move || {
                thread::sleep(Duration::from_secs(1)); // 模拟耗时
                format!("任务{}执行完成", i)
            }),
        );
    }

    // 收集结果
    let results = scheduler.wait_and_collect_results();
    println!("所有任务结果:");
    for res in results {
        println!("{}", res);
    }
}

总结

Rust通过所有权系统、Send/Sync trait、channel和Mutex/Arc等工具,在编译期保证并发安全,避免数据竞争。开发中应根据场景选择消息传递(channel)或共享内存(Mutex+Arc)模式,优先使用消息传递(更易保证安全)。