Code changes for windows, mostly to get rid of unix socket listening because tokio doesn't support it

main
Brian Picciano 6 months ago
parent b84a60ba69
commit bd09a1ad7b
  1. 15
      Cargo.lock
  2. 6
      default.nix
  3. 2
      src/api/Cargo.toml
  4. 62
      src/api/generic_server.rs
  5. 6
      src/block/manager.rs
  6. 33
      src/rpc/system.rs
  7. 2
      src/web/Cargo.toml
  8. 60
      src/web/web_server.rs

15
Cargo.lock generated

@ -1271,7 +1271,6 @@ dependencies = [
"http-range", "http-range",
"httpdate", "httpdate",
"hyper", "hyper",
"hyperlocal",
"idna", "idna",
"md-5", "md-5",
"multer", "multer",
@ -1465,7 +1464,6 @@ dependencies = [
"garage_util", "garage_util",
"http", "http",
"hyper", "hyper",
"hyperlocal",
"opentelemetry", "opentelemetry",
"percent-encoding", "percent-encoding",
"tokio", "tokio",
@ -1778,19 +1776,6 @@ dependencies = [
"tokio-io-timeout", "tokio-io-timeout",
] ]
[[package]]
name = "hyperlocal"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fafdf7b2b2de7c9784f76e02c0935e65a8117ec3b768644379983ab333ac98c"
dependencies = [
"futures-util",
"hex",
"hyper",
"pin-project",
"tokio",
]
[[package]] [[package]]
name = "iana-time-zone" name = "iana-time-zone"
version = "0.1.57" version = "0.1.57"

@ -74,6 +74,7 @@ let
in in
fenix.combine [ fenix.combine [
(mkToolchain fenix).rustc (mkToolchain fenix).rustc
(mkToolchain fenix).rustfmt
(mkToolchain fenix).cargo (mkToolchain fenix).cargo
(mkToolchain fenix.targets.${rustTarget}).rust-std (mkToolchain fenix.targets.${rustTarget}).rust-std
]; ];
@ -148,6 +149,11 @@ let
pkgs.protobuf pkgs.protobuf
]; ];
buildInputs = [
pkgsCross.stdenv.cc
pkgsCross.windows.pthreads
];
OPENSSL_STATIC = "1"; OPENSSL_STATIC = "1";
OPENSSL_LIB_DIR = "${pkgsCross.pkgsStatic.openssl.out}/lib"; OPENSSL_LIB_DIR = "${pkgsCross.pkgsStatic.openssl.out}/lib";
OPENSSL_INCLUDE_DIR = "${pkgsCross.pkgsStatic.openssl.dev}/include"; OPENSSL_INCLUDE_DIR = "${pkgsCross.pkgsStatic.openssl.dev}/include";

@ -45,7 +45,7 @@ http = "0.2"
httpdate = "1.0" httpdate = "1.0"
http-range = "0.1" http-range = "0.1"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } #hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
multer = "2.0" multer = "2.0"
percent-encoding = "2.1.0" percent-encoding = "2.1.0"
roxmltree = "0.18" roxmltree = "0.18"

@ -1,5 +1,4 @@
use std::fs::{self, Permissions}; //use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -12,9 +11,9 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server}; use hyper::{Body, Request, Response, Server};
use hyper::{HeaderMap, StatusCode}; use hyper::{HeaderMap, StatusCode};
use hyperlocal::UnixServerExt; //use hyperlocal::UnixServerExt;
use tokio::net::UnixStream; //use tokio::net::UnixStream;
use opentelemetry::{ use opentelemetry::{
global, global,
@ -114,18 +113,18 @@ impl<A: ApiHandler> ApiServer<A> {
} }
}); });
let unix_service = make_service_fn(|_: &UnixStream| { //let unix_service = make_service_fn(|_: &UnixStream| {
let this = self.clone(); // let this = self.clone();
let path = bind_addr.to_string(); // let path = bind_addr.to_string();
async move { // async move {
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { // Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
let this = this.clone(); // let this = this.clone();
this.handler(req, path.clone()) // this.handler(req, path.clone())
})) // }))
} // }
}); //});
info!( info!(
"{} API server listening on {}", "{} API server listening on {}",
@ -140,23 +139,24 @@ impl<A: ApiHandler> ApiServer<A> {
.with_graceful_shutdown(shutdown_signal) .with_graceful_shutdown(shutdown_signal)
.await? .await?
} }
UnixOrTCPSocketAddress::UnixSocket(ref path) => { UnixOrTCPSocketAddress::UnixSocket(_path) => {
if path.exists() { panic!("Unix sockets are not supported in this fork") // TODO(mediocregopher)
fs::remove_file(path)? } //UnixOrTCPSocketAddress::UnixSocket(ref path) => {
} // use std::os::unix::fs::PermissionsExt;
// remove_unix_socket_if_present(path).await?;
let bound = Server::bind_unix(path)?;
// let bound = Server::bind_unix(path)?;
fs::set_permissions(
path, // fs::set_permissions(
Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), // path,
)?; // Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)),
// )?;
bound
.serve(unix_service) // bound
.with_graceful_shutdown(shutdown_signal) // .serve(unix_service)
.await?; // .with_graceful_shutdown(shutdown_signal)
} // .await?;
//}
}; };
Ok(()) Ok(())

@ -771,11 +771,7 @@ impl BlockManagerLocked {
// Now, we do an fsync on the containing directory, to ensure that the rename // Now, we do an fsync on the containing directory, to ensure that the rename
// is persisted properly. See: // is persisted properly. See:
// http://thedjbway.b0llix.net/qmail/syncdir.html // http://thedjbway.b0llix.net/qmail/syncdir.html
let dir = fs::OpenOptions::new() let dir = fs::OpenOptions::new().read(true).open(directory).await?;
.read(true)
.mode(0)
.open(directory)
.await?;
dir.sync_all().await?; dir.sync_all().await?;
drop(dir); drop(dir);
} }

@ -3,7 +3,6 @@ use std::collections::HashMap;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -198,6 +197,20 @@ pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> {
Ok(NodeID::from_slice(&key[..]).unwrap()) Ok(NodeID::from_slice(&key[..]).unwrap())
} }
#[cfg(not(windows))]
fn set_private_key_perms(path: &Path) -> Result<(), Error> {
use std::os::unix::fs::PermissionsExt;
let perm = std::fs::Permissions::from_mode(0o600);
std::fs::set_permissions(path, perm)?;
Ok(())
}
#[cfg(windows)]
fn set_private_key_perms(_path: &Path) -> Result<(), Error> {
// TODO(mediocregopher) figure out how to do this, but it's not strictly necessary
Ok(())
}
pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> { pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
let mut key_file = metadata_dir.to_path_buf(); let mut key_file = metadata_dir.to_path_buf();
key_file.push("node_key"); key_file.push("node_key");
@ -222,11 +235,8 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
let (pubkey, key) = ed25519::gen_keypair(); let (pubkey, key) = ed25519::gen_keypair();
{ {
use std::os::unix::fs::PermissionsExt;
let mut f = std::fs::File::create(key_file.as_path())?; let mut f = std::fs::File::create(key_file.as_path())?;
let mut perm = f.metadata()?.permissions(); set_private_key_perms(key_file.as_path())?;
perm.set_mode(0o600);
std::fs::set_permissions(key_file.as_path(), perm)?;
f.write_all(&key[..])?; f.write_all(&key[..])?;
} }
@ -890,6 +900,18 @@ impl NodeStatus {
} }
} }
#[cfg(windows)]
fn update_disk_usage(
&mut self,
_meta_dir: &Path,
_data_dir: &DataDirEnum,
_metrics: &SystemMetrics,
) {
// TODO(mediocregopher) it'd be nice to have this for windows too, but it seems to only be
// used for OpenTelemetry so it's not a real requirement.
}
#[cfg(not(windows))]
fn update_disk_usage( fn update_disk_usage(
&mut self, &mut self,
meta_dir: &Path, meta_dir: &Path,
@ -897,6 +919,7 @@ impl NodeStatus {
metrics: &SystemMetrics, metrics: &SystemMetrics,
) { ) {
use nix::sys::statvfs::statvfs; use nix::sys::statvfs::statvfs;
use std::sync::atomic::Ordering;
let mount_avail = |path: &Path| match statvfs(path) { let mount_avail = |path: &Path| match statvfs(path) {
Ok(x) => { Ok(x) => {
let avail = x.blocks_available() as u64 * x.fragment_size() as u64; let avail = x.blocks_available() as u64 * x.fragment_size() as u64;

@ -27,7 +27,7 @@ futures = "0.3"
http = "0.2" http = "0.2"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } #hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
tokio = { version = "1.0", default-features = false, features = ["net"] } tokio = { version = "1.0", default-features = false, features = ["net"] }

@ -1,5 +1,5 @@
use std::fs::{self, Permissions}; //use std::fs::{self, Permissions};
use std::os::unix::prelude::PermissionsExt; //use std::os::unix::prelude::PermissionsExt;
use std::{convert::Infallible, sync::Arc}; use std::{convert::Infallible, sync::Arc};
use futures::future::Future; use futures::future::Future;
@ -11,9 +11,9 @@ use hyper::{
Body, Method, Request, Response, Server, StatusCode, Body, Method, Request, Response, Server, StatusCode,
}; };
use hyperlocal::UnixServerExt; //use hyperlocal::UnixServerExt;
use tokio::net::UnixStream; //use tokio::net::UnixStream;
use opentelemetry::{ use opentelemetry::{
global, global,
@ -100,18 +100,18 @@ impl WebServer {
} }
}); });
let unix_service = make_service_fn(|_: &UnixStream| { //let unix_service = make_service_fn(|_: &UnixStream| {
let web_server = web_server.clone(); // let web_server = web_server.clone();
let path = addr.to_string(); // let path = addr.to_string();
async move { // async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| { // remove_unix_socket_if_present(&path).await.expect("could not remove existing unix socket");
let web_server = web_server.clone(); // Ok::<_, Error>(service_fn(move |req: Request<Body>| {
// let web_server = web_server.clone();
web_server.handle_request(req, path.clone()) // web_server.handle_request(req, path.clone())
})) // }))
} // }
}); //});
info!("Web server listening on {}", addr); info!("Web server listening on {}", addr);
@ -122,20 +122,22 @@ impl WebServer {
.with_graceful_shutdown(shutdown_signal) .with_graceful_shutdown(shutdown_signal)
.await? .await?
} }
UnixOrTCPSocketAddress::UnixSocket(ref path) => { UnixOrTCPSocketAddress::UnixSocket(_path) => {
if path.exists() { panic!("Unix sockets are not supported in this fork") // TODO(mediocregopher)
fs::remove_file(path)? } //UnixOrTCPSocketAddress::UnixSocket(ref path) => {
} // if path.exists() {
// fs::remove_file(path)?
let bound = Server::bind_unix(path)?; // }
fs::set_permissions(path, Permissions::from_mode(0o222))?; // let bound = Server::bind_unix(path)?;
bound // fs::set_permissions(path, Permissions::from_mode(0o222))?;
.serve(unix_service)
.with_graceful_shutdown(shutdown_signal) // bound
.await?; // .serve(unix_service)
} // .with_graceful_shutdown(shutdown_signal)
// .await?;
//}
}; };
Ok(()) Ok(())

Loading…
Cancel
Save