加入收藏 | 设为首页 | 会员中心 | 我要投稿 | RSS
您当前的位置:首页 > 学习资料

Rust tokio 如何以异步非阻塞方式运行大量任务

时间:2023-02-13 00:15:02  来源:  作者:
tokio 官方给了一个完整的例子:手动构建 runtime ,利用 block_on 来运行多个任务。tokio 的任务是由 之类的函数产生的 类型,而且是个 。f0K华陈数据科技
f0K华陈数据科技
而下面利用 和 await 编写了等价的版本(为了直观对比任务完成的实际顺序和总耗时,我对 sleep 的时间做了一些简化):f0K华陈数据科技
 
f0K华陈数据科技
usestd::time::Instant;f0K华陈数据科技
usetokio::time::;f0K华陈数据科技
#[tokio::main]f0K华陈数据科技
asyncfnmain() -> std::io::Result {f0K华陈数据科技
  letnow = Instant::now();f0K华陈数据科技
  letmuthandles =Vec::with_capacity(10);f0K华陈数据科技
  foriin..10{f0K华陈数据科技
    handles.push(tokio::spawn(my_bg_task(i)));f0K华陈数据科技
  }f0K华陈数据科技
  // Do something time-consuming while the background tasks execute.f0K华陈数据科技
  std::thread::sleep(Duration::from_millis(120));f0K华陈数据科技
  println!("Finished time-consuming task.");f0K华陈数据科技
  // Wait for all of them to complete.f0K华陈数据科技
  forhandleinhandles {f0K华陈数据科技
    handle.await?;f0K华陈数据科技
  }f0K华陈数据科技
  println!("总耗时:{} ms", now.elapsed().as_millis());f0K华陈数据科技
  Ok(())f0K华陈数据科技
}f0K华陈数据科技
asyncfnmy_bg_task(i:u64) {f0K华陈数据科技
  letmillis =100;f0K华陈数据科技
  println!("Task {} sleeping for {} ms.", i, millis);f0K华陈数据科技
  sleep(Duration::from_millis(millis)).await;f0K华陈数据科技
  println!("Task {} stopping.", i);f0K华陈数据科技
}
f0K华陈数据科技
f0K华陈数据科技
输出结果:
Tasksleepingfor100ms.f0K华陈数据科技
Task1sleepingfor100ms.f0K华陈数据科技
Task2sleepingfor100ms.f0K华陈数据科技
Task3sleepingfor100ms.f0K华陈数据科技
Task4sleepingfor100ms.f0K华陈数据科技
Task5sleepingfor100ms.f0K华陈数据科技
Task6sleepingfor100ms.f0K华陈数据科技
Task7sleepingfor100ms.f0K华陈数据科技
Task8sleepingfor100ms.f0K华陈数据科技
Task9sleepingfor100ms.f0K华陈数据科技
Task9stopping.f0K华陈数据科技
Taskstopping.f0K华陈数据科技
Task1stopping.f0K华陈数据科技
Task2stopping.f0K华陈数据科技
Task3stopping.f0K华陈数据科技
Task4stopping.f0K华陈数据科技
Task5stopping.f0K华陈数据科技
Task6stopping.f0K华陈数据科技
Task7stopping.f0K华陈数据科技
Task8stopping.f0K华陈数据科技
Finishedtime-consuming task.
总耗时:120msf0K华陈数据科技
如果把主线程的的 sleep 时间改成 100 ms: 则产生下面的结果:f0K华陈数据科技
 
Tasksleepingfor100ms.f0K华陈数据科技
Task1sleepingfor100ms.f0K华陈数据科技
Task2sleepingfor100ms.f0K华陈数据科技
Task3sleepingfor100ms.f0K华陈数据科技
Task4sleepingfor100ms.f0K华陈数据科技
Task5sleepingfor100ms.f0K华陈数据科技
Task6sleepingfor100ms.f0K华陈数据科技
Task7sleepingfor100ms.f0K华陈数据科技
Task8sleepingfor100ms.f0K华陈数据科技
Task9sleepingfor100ms.f0K华陈数据科技
Finishedtime-consuming task.f0K华陈数据科技
Task3stopping.f0K华陈数据科技
Taskstopping.f0K华陈数据科技
Task1stopping.f0K华陈数据科技
Task2stopping.f0K华陈数据科技
Task9stopping.f0K华陈数据科技
Task4stopping.f0K华陈数据科技
Task5stopping.f0K华陈数据科技
Task6stopping.f0K华陈数据科技
Task7stopping.f0K华陈数据科技
Task8stopping.
f0K华陈数据科技
总耗时:103msf0K华陈数据科技
可以看到, 实际是异步非阻塞执行的 :f0K华陈数据科技
异步:因为每个任务不必等待其结果就可以开始下一个任务,即;f0K华陈数据科技
// 异步f0K华陈数据科技
Tasksleepingfor100ms.f0K华陈数据科技
Task1sleepingfor100ms.f0K华陈数据科技
...f0K华陈数据科技
// 同步f0K华陈数据科技
Tasksleepingfor100ms.f0K华陈数据科技
Taskstopping.f0K华陈数据科技
Task1sleepingfor100ms.f0K华陈数据科技
Task1stopping.f0K华陈数据科技
...f0K华陈数据科技
f0K华陈数据科技
非阻塞:每个任务之间可以快速切换,不必等待其他任务完成才切换,这个例子表现在:f0K华陈数据科技
任务 0-9 以乱序方式 stopf0K华陈数据科技
与 的打印顺序只与任务各自的运行 (sleep) 时间有关,与源代码的声明执行顺序无关。只有任务之间快速切换才能做到这一点。回顾官网的例子:10 个任务的 sleep 时间线性递减 (),从 6 个任务开始小于主线程 sleep 任务的时间(750 ms),而等待 10 个任务执行的语句 显然位于 之后,所以任务之间非阻塞执行的话,打印结果为 sleep 时间越短的任务先完成,时间越长的任务后完成,总耗时为任务中的最长耗时:f0K华陈数据科技
 
Tasksleepingfor1000ms.f0K华陈数据科技
Task1sleepingfor950ms.f0K华陈数据科技
Task2sleepingfor900ms.f0K华陈数据科技
Task3sleepingfor850ms.f0K华陈数据科技
Task4sleepingfor800ms.f0K华陈数据科技
Task5sleepingfor750ms.f0K华陈数据科技
Task6sleepingfor700ms.f0K华陈数据科技
Task7sleepingfor650ms.f0K华陈数据科技
Task8sleepingfor600ms.f0K华陈数据科技
Task9sleepingfor550ms.f0K华陈数据科技
Task9stopping.f0K华陈数据科技
Task8stopping.f0K华陈数据科技
Task7stopping.f0K华陈数据科技
Task6stopping.f0K华陈数据科技
Finished time-consuming task.f0K华陈数据科技
Task5stopping.f0K华陈数据科技
Task4stopping.f0K华陈数据科技
Task3stopping.f0K华陈数据科技
Task2stopping.f0K华陈数据科技
Task1stopping.f0K华陈数据科技
Taskstopping.
f0K华陈数据科技
总耗时:1001ms// 非常完美f0K华陈数据科技
一般情况下,对于 async block/fn 你至少有以下一些做法:f0K华陈数据科技
对 async block/fn 调用 来等待结果;f0K华陈数据科技
对可列举的少数 Future 调用 或者 来同时等待多个结果 或者 等待多个分支的第一个结果;f0K华陈数据科技
对大量 Future 调用 join 或者 select 一类支持传入 Vec / iter 参数类型的函数,比如这个例子中的 部分就可以改写成 ;f0K华陈数据科技
把 async block/fn 变成任务,然后调用 (等价地,对任务 await)来执行许多任务。f0K华陈数据科技
容易犯的错误是,希望异步非阻塞时,对所有 async block/fn 进行了 await,而没有进行任务化处理(即 把 Future 通过 spwan 函数转化成任务):
f0K华陈数据科技
usestd::time::Instant;f0K华陈数据科技
usetokio::time::;f0K华陈数据科技
#[tokio::main]f0K华陈数据科技
asyncfnmain() {f0K华陈数据科技
letnow = Instant::now();f0K华陈数据科技
letmuthandles =Vec::with_capacity(10);f0K华陈数据科技
foriin..10{f0K华陈数据科技
handles.push(my_bg_task(i));// 没有把 Future 变成任务f0K华陈数据科技
}f0K华陈数据科技
std::thread::sleep(Duration::from_millis(120));f0K华陈数据科技
println!("Finished time-consuming task.");f0K华陈数据科技
forhandleinhandles {f0K华陈数据科技
handle.await;// 而且每个 handle 必须执行完才能执行下一个 handlef0K华陈数据科技
}f0K华陈数据科技
println!("总耗时:{} ms", now.elapsed().as_millis());f0K华陈数据科技
}f0K华陈数据科技
asyncfnmy_bg_task(i:u64) {f0K华陈数据科技
letmillis =100;f0K华陈数据科技
println!("Task {} sleeping for {} ms.", i, millis);f0K华陈数据科技
sleep(Duration::from_millis(millis)).await;f0K华陈数据科技
println!("Task {} stopping.", i);f0K华陈数据科技
}
f0K华陈数据科技
运行结果:同步阻塞f0K华陈数据科技
 
f0K华陈数据科技
Finishedtime-consuming task.f0K华陈数据科技
Tasksleepingfor100ms.f0K华陈数据科技
Taskstopping.f0K华陈数据科技
Task1sleepingfor100ms.f0K华陈数据科技
Task1stopping.f0K华陈数据科技
Task2sleepingfor100ms.f0K华陈数据科技
Task2stopping.f0K华陈数据科技
Task3sleepingfor100ms.f0K华陈数据科技
Task3stopping.f0K华陈数据科技
Task4sleepingfor100ms.f0K华陈数据科技
Task4stopping.f0K华陈数据科技
Task5sleepingfor100ms.f0K华陈数据科技
Task5stopping.f0K华陈数据科技
Task6sleepingfor100ms.f0K华陈数据科技
Task6stopping.f0K华陈数据科技
Task7sleepingfor100ms.f0K华陈数据科技
Task7stopping.f0K华陈数据科技
Task8sleepingfor100ms.f0K华陈数据科技
Task8stopping.f0K华陈数据科技
Task9sleepingfor100ms.f0K华陈数据科技
Task9stopping.
f0K华陈数据科技
总耗时:1130msf0K华陈数据科技
或者像这样:f0K华陈数据科技
 
f0K华陈数据科技
usestd::time::Instant;f0K华陈数据科技
usetokio::time::;f0K华陈数据科技
#[tokio::main]f0K华陈数据科技
asyncfnmain() {f0K华陈数据科技
letnow = Instant::now();f0K华陈数据科技
letmuthandles =Vec::with_capacity(10);f0K华陈数据科技
foriin..10{f0K华陈数据科技
handles.push(my_bg_task(i));// 没有把 Future 变成任务f0K华陈数据科技
}f0K华陈数据科技
std::thread::sleep(Duration::from_millis(120));f0K华陈数据科技
println!("Finished time-consuming task.");f0K华陈数据科技
futures::future::join_all(handles).await;// 但是 join_all 会等待所有 Future 并发执行完f0K华陈数据科技
println!("总耗时:{} ms", now.elapsed().as_millis());f0K华陈数据科技
}f0K华陈数据科技
asyncfnmy_bg_task(i:u64) {f0K华陈数据科技
letmillis =100;f0K华陈数据科技
println!("Task {} sleeping for {} ms.", i, millis);f0K华陈数据科技
sleep(Duration::from_millis(millis)).await;f0K华陈数据科技
println!("Task {} stopping.", i);f0K华陈数据科技
}
f0K华陈数据科技
运行结果:异步阻塞f0K华陈数据科技
 
f0K华陈数据科技
Finishedtime-consuming task.f0K华陈数据科技
Tasksleepingfor100ms.f0K华陈数据科技
Task1sleepingfor100ms.f0K华陈数据科技
Task2sleepingfor100ms.f0K华陈数据科技
Task3sleepingfor100ms.f0K华陈数据科技
Task4sleepingfor100ms.f0K华陈数据科技
Task5sleepingfor100ms.f0K华陈数据科技
Task6sleepingfor100ms.f0K华陈数据科技
Task7sleepingfor100ms.f0K华陈数据科技
Task8sleepingfor100ms.f0K华陈数据科技
Task9sleepingfor100ms.f0K华陈数据科技
Taskstopping.f0K华陈数据科技
Task1stopping.f0K华陈数据科技
Task2stopping.f0K华陈数据科技
Task3stopping.f0K华陈数据科技
Task4stopping.f0K华陈数据科技
Task5stopping.f0K华陈数据科技
Task6stopping.f0K华陈数据科技
Task7stopping.f0K华陈数据科技
Task8stopping.f0K华陈数据科技
Task9stopping.
f0K华陈数据科技
总耗时:221msf0K华陈数据科技
f0K华陈数据科技
P.S. 关于代码中 和 的区别,参考这篇文章 Async: What is blocking? (by Alice Ryhl) 。f0K华陈数据科技
 
来顶一下
返回首页
返回首页
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表
推荐资讯
实现php间隔一段时间执行一次某段代码
实现php间隔一段时间
相关文章
    无相关信息
栏目更新
栏目热门