Well they still have to exit when we're exiting though

This commit is contained in:
Alex Auvolat 2020-04-22 17:04:33 +00:00
parent e8214cb180
commit 8971f34c81
3 changed files with 12 additions and 12 deletions

View File

@ -2,6 +2,8 @@ use core::future::Future;
use std::pin::Pin; use std::pin::Pin;
use futures::future::join_all; use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch, Notify}; use tokio::sync::{mpsc, watch, Notify};
@ -88,7 +90,7 @@ impl BackgroundRunner {
} }
async fn runner(self: Arc<Self>, i: usize) { async fn runner(self: Arc<Self>, i: usize) {
let stop_signal = self.stop_signal.clone(); let mut stop_signal = self.stop_signal.clone();
loop { loop {
let must_exit: bool = *stop_signal.borrow(); let must_exit: bool = *stop_signal.borrow();
if let Some(job) = self.dequeue_job(must_exit).await { if let Some(job) = self.dequeue_job(must_exit).await {
@ -100,7 +102,10 @@ impl BackgroundRunner {
info!("Background runner {} exiting", i); info!("Background runner {} exiting", i);
return; return;
} }
self.job_notify.notified().await; select! {
_ = self.job_notify.notified().fuse() => (),
_ = stop_signal.recv().fuse() => (),
}
} }
} }
} }

View File

@ -263,4 +263,3 @@ impl RpcHttpClient {
} }
} }
} }

View File

@ -47,16 +47,13 @@ where
let begin_time = Instant::now(); let begin_time = Instant::now();
let whole_body = hyper::body::to_bytes(req.into_body()).await?; let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?; let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
let req_str = debug_serialize(&msg);
match handler(msg, sockaddr).await { match handler(msg, sockaddr).await {
Ok(resp) => { Ok(resp) => {
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?; let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
trace!( let rpc_duration = (Instant::now() - begin_time).as_millis();
"]RPC:{},ok ({} ms), request: {}", if rpc_duration > 100 {
name, debug!("RPC {} ok, took long: {} ms", name, rpc_duration,);
(Instant::now() - begin_time).as_millis(), }
req_str,
);
Ok(Response::new(Body::from(resp_bytes))) Ok(Response::new(Body::from(resp_bytes)))
} }
Err(e) => { Err(e) => {
@ -65,11 +62,10 @@ where
let mut err_response = Response::new(Body::from(rep_bytes)); let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = e.http_status_code(); *err_response.status_mut() = e.http_status_code();
warn!( warn!(
"RPC error ({}): {} ({} ms), request: {}", "RPC error ({}): {} ({} ms)",
name, name,
e, e,
(Instant::now() - begin_time).as_millis(), (Instant::now() - begin_time).as_millis(),
req_str,
); );
Ok(err_response) Ok(err_response)
} }