Tokio的mpsc:异步任务通信的“超级快递”
今天咱们来聊聊 Tokio 的mpsc,这可是 Rust 异步编程里一个超级实用的编程工具!如果你在写异步代码,或者正在折腾 Rust 的异步生态,那mpsc绝对是你不能错过的好东西。它就像是异步任务之间的“超级快递”,能把消息从一个任务快速、安全地送到另一个任务那里。接下来,我就带大家好好了解一下mpsc的用法。
一、tokio mpsc 是啥玩意儿?
想象一下,你和一群朋友在玩传球游戏。你在这一头,朋友们在那一头,中间有一条看不见的线,你把球扔过去,朋友们就能接到。Tokio 的mpsc就像是这条看不见的线,它能让不同的异步任务之间传递消息。
mpsc是“多生产者单消费者”的缩写(Multi-Producer,Single-Consumer)。这意味着你可以有多个任务(生产者)往里面扔消息,但只有一个任务(消费者)来接收这些消息。这在异步编程里特别有用,比如你有多个任务在生成数据,然后有一个任务专门负责处理这些数据,mpsc就能完美地完成这个任务。
二、为啥要用tokio mpsc?
在异步编程里,任务们都在自己的小世界里跑,它们的执行顺序和时间都不确定。这就像是在高速公路上开车,你不知道旁边的车什么时候会变道,也不知道前面的车什么时候会刹车。如果没有一种可靠的方式来传递消息,任务们就会像迷失在黑暗中的小船,找不到方向。
mpsc解决了这个问题。它提供了一种安全、高效的方式来让任务们互相通信。你可以把mpsc想象成一个快递站,任务们把消息“寄”到mpsc里,mpsc再把消息“送”给另一个任务。而且,mpsc是线程安全的,不用担心数据会在传递过程中出错,也不用担心任务们会因为通信而撞车。
三、tokio mpsc 的用法
1. 在Cargo.toml中添加tokio依赖
tokio = {version = "1", features = ["full"]}
Tokio有很多功能(TCP,UDP,Unix 套接字,定时器(timers),同步工具(sync utilities),多种调度器类型(multiple scheduler types) 等等)。
不是所有的应用都需要所有的功能,当我们尝试去优化编译时间或者应用占用空间时, 应用程序可以去选择仅仅它需要使用的特性。比如,应用程序只需要导入特性来使用tokio::spawn和TcpStream:
tokio = { version = "1", features = ["rt", "net"] }
2. 创建一个异步main函数
使用#[tokio::main] 函数宏,将 async fn main() 转换为一个初始化运行时实例且执行异步main函数的同步 fn main()。
比如说下面的示例:
#[tokio::main]
async fn main() {
println!("hello");
}
转换后的结果:
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
3. 创建一个 mpsc Channel
创建一个mpscChannel 非常简单。Tokio 提供了一个mpsc::channel函数,你只需要调用它,就能得到一对东西:发送端(Sender)和接收端(Receiver)。发送端可以被克隆,让多个任务都能往 Channel 里发送消息,而接收端只能有一个,用来接收消息。
use tokio::sync::mpsc;
let (tx, rx) = mpsc::channel(32);
这里,32是 Channel 的缓冲区大小,也就是说它可以同时存储 32 条消息。如果你的消息量特别大,可以调整这个数字;如果消息量不大,也可以调小一点。
4. 发送消息
发送消息也很简单。只需要调用发送端的send方法,把消息扔进去就行。send方法是一个异步方法,所以你需要用.await来等待它完成。
tx.send("Hello, world!").await.unwrap();
这里,"Hello, world!"就是你要发送的消息。unwrap是用来处理可能的错误的,比如接收端已经关闭了,发送就会失败。
5. 接收消息
接收消息也很简单。只需要调用接收端的recv方法,它会返回一个Option,里面有消息的内容。如果 Channel 里有消息,它会返回Some(message);如果没有消息,它会返回None。
while let Some(message) = rx.recv().await {
println!("收到消息:{}", message);
}
这里,recv方法也是一个异步方法,所以你需要用.await来等待消息的到来。while let Some(message)是一个很常见的模式,用来不断地接收消息,直到 Channel 关闭。
四、tokio mpsc 的实战演练
好啦,光说不练假把式,咱们来写点代码,看看mpsc是怎么工作的。官网的redis命令发送与接受服务器,就是最好的tokio通道示例。
1. 定义消息类型
在大多数情况下,使用消息传递时,接收消息的任务会响应多个命令。在我们的案例中,任务将响应GET 与SET 命令。我们首先定义一个 Command 的枚举,并为每种命令类型包含一个变体。
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
2. 创建通道和命令发送任务
在 main 中 创建 mpsc 通道.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// 创建一个最大容量为32的通道
let (mut tx, mut rx) = mpsc::channel(32);
// ...
}
mpsc 通道被用来发送一个命令到消息管理任务中。 多生产者的能力是能让许多的任务发送消息。创建的通道返回两个值:一个是发送者(Sender),一个是接收者(Receiver)。它们两者被分开使用。他们可能被移动到不同的任务中去。
被创建的通道容量为32。 如果消息的发送速度大于接收的速度,通道会储存它们。 一旦通道中存了32条消息时,就会使调用send(...).await 进入睡眠状态,
直到接收者删除一条消息为止,当接收者有能力能再次处理消息时, 睡眠状态才会结束。
通过clone Sender 可以完成多个任务的发送:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
//创建Get命令发送任务
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "hello".to_string(),
};
tx.send(cmd).await.unwrap();
});
//创建Set命令发送任务
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
//创建消息管理任务...
//let manager = tokio::spawn(async move {...})
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}
两条消息都发送到单个Receiver 处理。不可能克隆mpsc 通道中的接收者。
当每个 Sender 超出作用域范围或者被dropped时, 它不能再发送更多的消息到通道中。此时, Receiver 上的rev调用都将返回 None ,这意味着所有发送者都已经消失且通道已关闭。
3. 创建消息管理任务
创建一个管理任务来处理来自通道的消息。首先, 建立与Redis服务器的链接。然后,通过Redis链接发出从命令任务中接收到的命令消息。
let manager = tokio::spawn(async move {
// 建立与Server的链接
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// 开始接收消息
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await; //向Redis服务器发送命令
}
Set { key, val } => {
client.set(&key, val).await; //向Redis服务器发送命令
}
}
}
});
4. 接收Redis服务器的响应
为了传递响应,可以使用 oneshot 通道。 oneshot 通道是一个经过了优化的单生产者单消费者通道,用来发送单个值。与 mpsc 类似, oneshot 返回一个发送者(Sender)和一个接收者(Receiver)处理器。
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
与 mpsc 不同的是,oneshot 它不能指定任何容量,因为容量始终为1。另外, tx和rx都不能被克隆。
为了接收到来自管理任务的响应,在发送一个命令之前,一个 oneshot 通道将被创建。通道 Sender需要传送给管理任务,接收方Receiver保留在命令任务中,用来接收响应。
首先, 更新 Command 来包含一个Sender 。 为了方便, 为 Sender 定义一个类型别名:
use tokio::sync::oneshot;
use bytes::Bytes;
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>, //oneshot Sender
},
Set {
key: String,
val: Vec<u8>,
resp: Responder<()>, //oneshot Sender
},
}
更新Get和Set命令发送任务:
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
// 发送 GET 请求
tx.send(cmd).await.unwrap();
// 等待响应结果
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: b"bar".to_vec(),
resp: resp_tx,
};
// 发送 GET 请求
tx2.send(cmd).await.unwrap();
// 等待响应结果
let res = resp_rx.await;
println!("GOT = {:?}", res)
});
更新消息管理任务:
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
let _ = resp.send(res); //忽略错误
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val.into()).await;
let _ = resp.send(res); //忽略错误
}
}
}
在 oneshot::Sender 上调用 send 会立即完成而不需要 .await 操作。这是因为在 oneshot 通道上的 send 总是立即失败或者成功,而没有任何等待。
如果oneshot Receiver当接收一半时dropped了,比如使用drop(resp_rx)删除了通道。这时 在 oneshot 通道上的发送会返回 Err 。这表明接收方不再对响应感兴趣,在这里,接收方的取消操作是可以被接受的事件。resp.send(...) 返回的 Err 可以做忽略处理。
五、写在最后
Tokio 的mpsc是 Rust 异步编程里的一个超级实用的工具,它让任务之间的通信变得简单又安全。你可以用它来实现任务之间的数据共享、任务调度、任务通知等各种功能。希望这篇文章能让你对mpsc有个更深入的了解。如果你在使用mpsc的过程中遇到了问题,或者发现了什么好玩的用法,欢迎随时和我交流。加油,让我们一起在 Rust 的世界里探索!
