no script

你的浏览器禁用了JavaScript, 请开启后刷新浏览器获得更好的体验!

进入课程

Tokio的mpsc:异步任务通信的“超级快递”

今天咱们来聊聊 Tokio 的mpsc,这可是 Rust 异步编程里一个超级实用的编程工具!如果你在写异步代码,或者正在折腾 Rust 的异步生态,那mpsc绝对是你不能错过的好东西。它就像是异步任务之间的“超级快递”,能把消息从一个任务快速、安全地送到另一个任务那里。接下来,我就带大家好好了解一下mpsc的用法。

一、tokio mpsc 是啥玩意儿?

想象一下,你和一群朋友在玩传球游戏。你在这一头,朋友们在那一头,中间有一条看不见的线,你把球扔过去,朋友们就能接到。Tokio 的mpsc就像是这条看不见的线,它能让不同的异步任务之间传递消息。

mpsc是“多生产者单消费者”的缩写(Multi-Producer,Single-Consumer)。这意味着你可以有多个任务(生产者)往里面扔消息,但只有一个任务(消费者)来接收这些消息。这在异步编程里特别有用,比如你有多个任务在生成数据,然后有一个任务专门负责处理这些数据,mpsc就能完美地完成这个任务。

二、为啥要用tokio mpsc?

在异步编程里,任务们都在自己的小世界里跑,它们的执行顺序和时间都不确定。这就像是在高速公路上开车,你不知道旁边的车什么时候会变道,也不知道前面的车什么时候会刹车。如果没有一种可靠的方式来传递消息,任务们就会像迷失在黑暗中的小船,找不到方向。

mpsc解决了这个问题。它提供了一种安全、高效的方式来让任务们互相通信。你可以把mpsc想象成一个快递站,任务们把消息“寄”到mpsc里,mpsc再把消息“送”给另一个任务。而且,mpsc是线程安全的,不用担心数据会在传递过程中出错,也不用担心任务们会因为通信而撞车。

三、tokio mpsc 的用法

1. 在Cargo.toml中添加tokio依赖
tokio = {version = "1", features = ["full"]}

Tokio有很多功能(TCP,UDP,Unix 套接字,定时器(timers),同步工具(sync utilities),多种调度器类型(multiple scheduler types) 等等)。
不是所有的应用都需要所有的功能,当我们尝试去优化编译时间或者应用占用空间时, 应用程序可以去选择仅仅它需要使用的特性。比如,应用程序只需要导入特性来使用tokio::spawnTcpStream:

tokio = { version = "1", features = ["rt", "net"] }
2. 创建一个异步main函数

使用#[tokio::main] 函数宏,将 async fn main() 转换为一个初始化运行时实例且执行异步main函数的同步 fn main()

比如说下面的示例:

#[tokio::main]
async fn main() {
    println!("hello");
}

转换后的结果:

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}
3. 创建一个 mpsc Channel

创建一个mpscChannel 非常简单。Tokio 提供了一个mpsc::channel函数,你只需要调用它,就能得到一对东西:发送端(Sender)和接收端(Receiver)。发送端可以被克隆,让多个任务都能往 Channel 里发送消息,而接收端只能有一个,用来接收消息。

use tokio::sync::mpsc;

let (tx, rx) = mpsc::channel(32);

这里,32是 Channel 的缓冲区大小,也就是说它可以同时存储 32 条消息。如果你的消息量特别大,可以调整这个数字;如果消息量不大,也可以调小一点。

4. 发送消息

发送消息也很简单。只需要调用发送端的send方法,把消息扔进去就行。send方法是一个异步方法,所以你需要用.await来等待它完成。

tx.send("Hello, world!").await.unwrap();

这里,"Hello, world!"就是你要发送的消息。unwrap是用来处理可能的错误的,比如接收端已经关闭了,发送就会失败。

5. 接收消息

接收消息也很简单。只需要调用接收端的recv方法,它会返回一个Option,里面有消息的内容。如果 Channel 里有消息,它会返回Some(message);如果没有消息,它会返回None

while let Some(message) = rx.recv().await {
    println!("收到消息:{}", message);
}

这里,recv方法也是一个异步方法,所以你需要用.await来等待消息的到来。while let Some(message)是一个很常见的模式,用来不断地接收消息,直到 Channel 关闭。

四、tokio mpsc 的实战演练

好啦,光说不练假把式,咱们来写点代码,看看mpsc是怎么工作的。官网的redis命令发送与接受服务器,就是最好的tokio通道示例。

058fb697-af58-46ed-853c-b29ce9308ae8.png

1. 定义消息类型

在大多数情况下,使用消息传递时,接收消息的任务会响应多个命令。在我们的案例中,任务将响应GETSET 命令。我们首先定义一个 Command 的枚举,并为每种命令类型包含一个变体。

use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}
2. 创建通道和命令发送任务

在 main 中 创建 mpsc 通道.

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建一个最大容量为32的通道
    let (mut tx, mut rx) = mpsc::channel(32);
    // ... 
}

mpsc 通道被用来发送一个命令到消息管理任务中。 多生产者的能力是能让许多的任务发送消息。创建的通道返回两个值:一个是发送者(Sender),一个是接收者(Receiver)。它们两者被分开使用。他们可能被移动到不同的任务中去。

被创建的通道容量为32。 如果消息的发送速度大于接收的速度,通道会储存它们。 一旦通道中存了32条消息时,就会使调用send(...).await 进入睡眠状态,
直到接收者删除一条消息为止,当接收者有能力能再次处理消息时, 睡眠状态才会结束。

通过clone Sender 可以完成多个任务的发送:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();
	//创建Get命令发送任务
    let t1 = tokio::spawn(async move {
        let cmd = Command::Get {
        	key: "hello".to_string(),
    	};
    	tx.send(cmd).await.unwrap();
    });
	//创建Set命令发送任务
    let t2 = tokio::spawn(async move {
       let cmd = Command::Set {
        	key: "foo".to_string(),
        	val: "bar".into(),
    	};
    	tx2.send(cmd).await.unwrap();
    });

    //创建消息管理任务...
    //let manager = tokio::spawn(async move {...})
    
    t1.await.unwrap();
    t2.await.unwrap();
    manager.await.unwrap();
}

两条消息都发送到单个Receiver 处理。不可能克隆mpsc 通道中的接收者。

当每个 Sender 超出作用域范围或者被dropped时, 它不能再发送更多的消息到通道中。此时, Receiver 上的rev调用都将返回 None ,这意味着所有发送者都已经消失且通道已关闭。

3. 创建消息管理任务

创建一个管理任务来处理来自通道的消息。首先, 建立与Redis服务器的链接。然后,通过Redis链接发出从命令任务中接收到的命令消息。

let manager = tokio::spawn(async move {
    // 建立与Server的链接
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();
    // 开始接收消息
    while let Some(cmd) = rx.recv().await {
        use Command::*;
        match cmd {
            Get { key } => {
                client.get(&key).await; //向Redis服务器发送命令
            }
            Set { key, val } => {
                client.set(&key, val).await; //向Redis服务器发送命令
            }
        }
    }
});
4. 接收Redis服务器的响应

为了传递响应,可以使用 oneshot 通道。 oneshot 通道是一个经过了优化的单生产者单消费者通道,用来发送单个值。与 mpsc 类似, oneshot 返回一个发送者(Sender)和一个接收者(Receiver)处理器。

use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();

mpsc 不同的是,oneshot 它不能指定任何容量,因为容量始终为1。另外, tx和rx都不能被克隆。

为了接收到来自管理任务的响应,在发送一个命令之前,一个 oneshot 通道将被创建。通道 Sender需要传送给管理任务,接收方Receiver保留在命令任务中,用来接收响应。

首先, 更新 Command 来包含一个Sender 。 为了方便, 为 Sender 定义一个类型别名:

use tokio::sync::oneshot;
use bytes::Bytes;

type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>, //oneshot Sender
    },
    Set {
        key: String,
        val: Vec<u8>,
        resp: Responder<()>, //oneshot Sender
    },
}

更新Get和Set命令发送任务:

let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "hello".to_string(),
        resp: resp_tx,
    };
    // 发送 GET 请求
    tx.send(cmd).await.unwrap();
    // 等待响应结果
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: b"bar".to_vec(),
        resp: resp_tx,
    };
    // 发送 GET 请求
    tx2.send(cmd).await.unwrap();
    // 等待响应结果
    let res = resp_rx.await;
    println!("GOT = {:?}", res)
});

更新消息管理任务:

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            let _ = resp.send(res); //忽略错误
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val.into()).await;
            let _ = resp.send(res); //忽略错误
        }
    }
}

oneshot::Sender 上调用 send 会立即完成而不需要 .await 操作。这是因为在 oneshot 通道上的 send 总是立即失败或者成功,而没有任何等待。

如果oneshot Receiver当接收一半时dropped了,比如使用drop(resp_rx)删除了通道。这时 在 oneshot 通道上的发送会返回 Err 。这表明接收方不再对响应感兴趣,在这里,接收方的取消操作是可以被接受的事件。resp.send(...) 返回的 Err 可以做忽略处理。

五、写在最后

Tokio 的mpsc是 Rust 异步编程里的一个超级实用的工具,它让任务之间的通信变得简单又安全。你可以用它来实现任务之间的数据共享、任务调度、任务通知等各种功能。希望这篇文章能让你对mpsc有个更深入的了解。如果你在使用mpsc的过程中遇到了问题,或者发现了什么好玩的用法,欢迎随时和我交流。加油,让我们一起在 Rust 的世界里探索!

0
chujiao_0a9e6**7125
盘丝大仙我的剑只有我的心上人才能拔出
  • 赞同
  • 威望

相关问题

    Copyright © 2025