Implement /health admin API endpoint to check node health

This commit is contained in:
Alex Auvolat 2022-12-05 14:59:15 +01:00
parent 243b7c9a1c
commit 2065f011ca
No known key found for this signature in database
GPG Key ID: 0E496D15096376BE
4 changed files with 103 additions and 0 deletions

View File

@ -1,3 +1,5 @@
use std::collections::HashMap;
use std::fmt::Write;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -15,6 +17,8 @@ use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder}; use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_rpc::layout::NodeRoleV;
use garage_util::data::Uuid;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use crate::generic_server::*; use crate::generic_server::*;
@ -76,6 +80,94 @@ impl AdminApiServer {
.body(Body::empty())?) .body(Body::empty())?)
} }
fn handle_health(&self) -> Result<Response<Body>, Error> {
let ring: Arc<_> = self.garage.system.ring.borrow().clone();
let quorum = self.garage.replication_mode.write_quorum();
let replication_factor = self.garage.replication_mode.replication_factor();
let nodes = self
.garage
.system
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<Uuid, _>>();
let n_nodes_connected = nodes.iter().filter(|(_, n)| n.is_up).count();
let storage_nodes = ring
.layout
.roles
.items()
.iter()
.filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
.collect::<Vec<_>>();
let n_storage_nodes_ok = storage_nodes
.iter()
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
let partitions = ring.partitions();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {
let pn = ring.get_nodes(h, ring.replication_factor);
pn.iter()
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
})
.collect::<Vec<usize>>();
let n_partitions_full_ok = partitions_n_up
.iter()
.filter(|c| **c == replication_factor)
.count();
let n_partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
let (status, status_str) = if n_partitions_quorum == partitions.len()
&& n_storage_nodes_ok == storage_nodes.len()
{
(StatusCode::OK, "Garage is fully operational")
} else if n_partitions_quorum == partitions.len() {
(
StatusCode::OK,
"Garage is operational but some storage nodes are unavailable",
)
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
"Quorum is not available for some/all partitions, reads and writes will fail",
)
};
let mut buf = status_str.to_string();
writeln!(
&mut buf,
"\nAll nodes: {} connected, {} known",
n_nodes_connected,
nodes.len()
)
.unwrap();
writeln!(
&mut buf,
"Storage nodes: {} connected, {} in layout",
n_storage_nodes_ok,
storage_nodes.len()
)
.unwrap();
writeln!(&mut buf, "Number of partitions: {}", partitions.len()).unwrap();
writeln!(&mut buf, "Partitions with quorum: {}", n_partitions_quorum).unwrap();
writeln!(
&mut buf,
"Partitions with all nodes available: {}",
n_partitions_full_ok
)
.unwrap();
Ok(Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "text/plain")
.body(Body::from(buf))?)
}
fn handle_metrics(&self) -> Result<Response<Body>, Error> { fn handle_metrics(&self) -> Result<Response<Body>, Error> {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
{ {
@ -124,6 +216,7 @@ impl ApiHandler for AdminApiServer {
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let expected_auth_header = let expected_auth_header =
match endpoint.authorization_type() { match endpoint.authorization_type() {
Authorization::None => None,
Authorization::MetricsToken => self.metrics_token.as_ref(), Authorization::MetricsToken => self.metrics_token.as_ref(),
Authorization::AdminToken => match &self.admin_token { Authorization::AdminToken => match &self.admin_token {
None => return Err(Error::forbidden( None => return Err(Error::forbidden(
@ -147,6 +240,7 @@ impl ApiHandler for AdminApiServer {
match endpoint { match endpoint {
Endpoint::Options => self.handle_options(&req), Endpoint::Options => self.handle_options(&req),
Endpoint::Health => self.handle_health(),
Endpoint::Metrics => self.handle_metrics(), Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await, Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,

View File

@ -6,6 +6,7 @@ use crate::admin::error::*;
use crate::router_macros::*; use crate::router_macros::*;
pub enum Authorization { pub enum Authorization {
None,
MetricsToken, MetricsToken,
AdminToken, AdminToken,
} }
@ -16,6 +17,7 @@ router_match! {@func
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint { pub enum Endpoint {
Options, Options,
Health,
Metrics, Metrics,
GetClusterStatus, GetClusterStatus,
ConnectClusterNodes, ConnectClusterNodes,
@ -88,6 +90,7 @@ impl Endpoint {
let res = router_match!(@gen_path_parser (req.method(), path, query) [ let res = router_match!(@gen_path_parser (req.method(), path, query) [
OPTIONS _ => Options, OPTIONS _ => Options,
GET "/health" => Health,
GET "/metrics" => Metrics, GET "/metrics" => Metrics,
GET "/v0/status" => GetClusterStatus, GET "/v0/status" => GetClusterStatus,
POST "/v0/connect" => ConnectClusterNodes, POST "/v0/connect" => ConnectClusterNodes,
@ -130,6 +133,7 @@ impl Endpoint {
/// Get the kind of authorization which is required to perform the operation. /// Get the kind of authorization which is required to perform the operation.
pub fn authorization_type(&self) -> Authorization { pub fn authorization_type(&self) -> Authorization {
match self { match self {
Self::Health => Authorization::None,
Self::Metrics => Authorization::MetricsToken, Self::Metrics => Authorization::MetricsToken,
_ => Authorization::AdminToken, _ => Authorization::AdminToken,
} }

View File

@ -34,6 +34,9 @@ pub struct Garage {
/// The parsed configuration Garage is running /// The parsed configuration Garage is running
pub config: Config, pub config: Config,
/// The replication mode of this cluster
pub replication_mode: ReplicationMode,
/// The local database /// The local database
pub db: db::Db, pub db: db::Db,
/// A background job runner /// A background job runner
@ -258,6 +261,7 @@ impl Garage {
// -- done -- // -- done --
Ok(Arc::new(Self { Ok(Arc::new(Self {
config, config,
replication_mode,
db, db,
background, background,
system, system,

View File

@ -1,3 +1,4 @@
#[derive(Clone, Copy)]
pub enum ReplicationMode { pub enum ReplicationMode {
None, None,
TwoWay, TwoWay,