fix clippy warnings on model
This commit is contained in:
parent
f5a0cf0414
commit
4a1e079e8f
@ -1,4 +1,4 @@
|
|||||||
use std::path::PathBuf;
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@ -137,7 +137,7 @@ impl BlockManager {
|
|||||||
Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
|
Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
|
||||||
Message::GetBlock(h) => self.read_block(h).await,
|
Message::GetBlock(h) => self.read_block(h).await,
|
||||||
Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
|
Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
|
||||||
_ => Err(Error::BadRPC(format!("Unexpected RPC message"))),
|
_ => Err(Error::BadRPC("Unexpected RPC message".to_string())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,8 +280,8 @@ impl BlockManager {
|
|||||||
if let Err(e) = self.resync_iter(&mut must_exit).await {
|
if let Err(e) = self.resync_iter(&mut must_exit).await {
|
||||||
warn!("Error in block resync loop: {}", e);
|
warn!("Error in block resync loop: {}", e);
|
||||||
select! {
|
select! {
|
||||||
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => (),
|
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {},
|
||||||
_ = must_exit.changed().fuse() => (),
|
_ = must_exit.changed().fuse() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -304,15 +304,15 @@ impl BlockManager {
|
|||||||
} else {
|
} else {
|
||||||
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
|
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
|
||||||
select! {
|
select! {
|
||||||
_ = delay.fuse() => (),
|
_ = delay.fuse() => {},
|
||||||
_ = self.resync_notify.notified().fuse() => (),
|
_ = self.resync_notify.notified().fuse() => {},
|
||||||
_ = must_exit.changed().fuse() => (),
|
_ = must_exit.changed().fuse() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
select! {
|
select! {
|
||||||
_ = self.resync_notify.notified().fuse() => (),
|
_ = self.resync_notify.notified().fuse() => {},
|
||||||
_ = must_exit.changed().fuse() => (),
|
_ = must_exit.changed().fuse() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -342,7 +342,7 @@ impl BlockManager {
|
|||||||
|
|
||||||
let mut who = self.replication.write_nodes(&hash);
|
let mut who = self.replication.write_nodes(&hash);
|
||||||
if who.len() < self.replication.write_quorum() {
|
if who.len() < self.replication.write_quorum() {
|
||||||
return Err(Error::Message(format!("Not trying to offload block because we don't have a quorum of nodes to write to")));
|
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
||||||
}
|
}
|
||||||
who.retain(|id| *id != self.system.id);
|
who.retain(|id| *id != self.system.id);
|
||||||
|
|
||||||
@ -362,14 +362,14 @@ impl BlockManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::Message(
|
||||||
"Unexpected response to NeedBlockQuery RPC"
|
"Unexpected response to NeedBlockQuery RPC".to_string(),
|
||||||
)));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if need_nodes.len() > 0 {
|
if !need_nodes.is_empty() {
|
||||||
trace!(
|
trace!(
|
||||||
"Block {:?} needed by {} nodes, sending",
|
"Block {:?} needed by {} nodes, sending",
|
||||||
hash,
|
hash,
|
||||||
@ -478,7 +478,7 @@ impl BlockManager {
|
|||||||
|
|
||||||
fn repair_aux_read_dir_rec<'a>(
|
fn repair_aux_read_dir_rec<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
path: &'a PathBuf,
|
path: &'a Path,
|
||||||
must_exit: &'a watch::Receiver<bool>,
|
must_exit: &'a watch::Receiver<bool>,
|
||||||
) -> BoxFuture<'a, Result<(), Error>> {
|
) -> BoxFuture<'a, Result<(), Error>> {
|
||||||
// Lists all blocks on disk and adds them to the resync queue.
|
// Lists all blocks on disk and adds them to the resync queue.
|
||||||
|
@ -50,7 +50,7 @@ impl TableSchema for BlockRefTable {
|
|||||||
type Filter = DeletedFilter;
|
type Filter = DeletedFilter;
|
||||||
|
|
||||||
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||||
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
|
let block = &old.as_ref().or_else(|| new.as_ref()).unwrap().block;
|
||||||
let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
|
let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
|
||||||
let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
|
let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
|
||||||
if is_after && !was_before {
|
if is_after && !was_before {
|
||||||
|
@ -49,13 +49,6 @@ pub struct BucketParams {
|
|||||||
pub website: crdt::LWW<bool>,
|
pub website: crdt::LWW<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CRDT for BucketParams {
|
|
||||||
fn merge(&mut self, o: &Self) {
|
|
||||||
self.authorized_keys.merge(&o.authorized_keys);
|
|
||||||
self.website.merge(&o.website);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BucketParams {
|
impl BucketParams {
|
||||||
/// Create an empty BucketParams with no authorized keys and no website accesss
|
/// Create an empty BucketParams with no authorized keys and no website accesss
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
@ -66,6 +59,19 @@ impl BucketParams {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl CRDT for BucketParams {
|
||||||
|
fn merge(&mut self, o: &Self) {
|
||||||
|
self.authorized_keys.merge(&o.authorized_keys);
|
||||||
|
self.website.merge(&o.website);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for BucketParams {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Bucket {
|
impl Bucket {
|
||||||
/// Initializes a new instance of the Bucket struct
|
/// Initializes a new instance of the Bucket struct
|
||||||
pub fn new(name: String) -> Self {
|
pub fn new(name: String) -> Self {
|
||||||
|
@ -95,7 +95,7 @@ impl Garage {
|
|||||||
BlockRefTable {
|
BlockRefTable {
|
||||||
block_manager: block_manager.clone(),
|
block_manager: block_manager.clone(),
|
||||||
},
|
},
|
||||||
data_rep_param.clone(),
|
data_rep_param,
|
||||||
system.clone(),
|
system.clone(),
|
||||||
&db,
|
&db,
|
||||||
"block_ref".to_string(),
|
"block_ref".to_string(),
|
||||||
@ -121,7 +121,7 @@ impl Garage {
|
|||||||
background: background.clone(),
|
background: background.clone(),
|
||||||
version_table: version_table.clone(),
|
version_table: version_table.clone(),
|
||||||
},
|
},
|
||||||
meta_rep_param.clone(),
|
meta_rep_param,
|
||||||
system.clone(),
|
system.clone(),
|
||||||
&db,
|
&db,
|
||||||
"object".to_string(),
|
"object".to_string(),
|
||||||
@ -141,7 +141,7 @@ impl Garage {
|
|||||||
info!("Initialize key_table_table...");
|
info!("Initialize key_table_table...");
|
||||||
let key_table = Table::new(
|
let key_table = Table::new(
|
||||||
KeyTable,
|
KeyTable,
|
||||||
control_rep_param.clone(),
|
control_rep_param,
|
||||||
system.clone(),
|
system.clone(),
|
||||||
&db,
|
&db,
|
||||||
"key".to_string(),
|
"key".to_string(),
|
||||||
@ -152,9 +152,9 @@ impl Garage {
|
|||||||
let garage = Arc::new(Self {
|
let garage = Arc::new(Self {
|
||||||
config,
|
config,
|
||||||
db,
|
db,
|
||||||
system: system.clone(),
|
|
||||||
block_manager,
|
|
||||||
background,
|
background,
|
||||||
|
system,
|
||||||
|
block_manager,
|
||||||
bucket_table,
|
bucket_table,
|
||||||
key_table,
|
key_table,
|
||||||
object_table,
|
object_table,
|
||||||
|
@ -40,6 +40,7 @@ impl Object {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a version if it wasn't already present
|
/// Adds a version if it wasn't already present
|
||||||
|
#[allow(clippy::result_unit_err)]
|
||||||
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
|
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
|
||||||
match self
|
match self
|
||||||
.versions
|
.versions
|
||||||
@ -145,18 +146,12 @@ impl ObjectVersion {
|
|||||||
|
|
||||||
/// Is the object version currently being uploaded
|
/// Is the object version currently being uploaded
|
||||||
pub fn is_uploading(&self) -> bool {
|
pub fn is_uploading(&self) -> bool {
|
||||||
match self.state {
|
matches!(self.state, ObjectVersionState::Uploading(_))
|
||||||
ObjectVersionState::Uploading(_) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Is the object version completely received
|
/// Is the object version completely received
|
||||||
pub fn is_complete(&self) -> bool {
|
pub fn is_complete(&self) -> bool {
|
||||||
match self.state {
|
matches!(self.state, ObjectVersionState::Complete(_))
|
||||||
ObjectVersionState::Complete(_) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Is the object version available (received and not a tombstone)
|
/// Is the object version available (received and not a tombstone)
|
||||||
@ -207,8 +202,7 @@ impl CRDT for Object {
|
|||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.rev()
|
.rev()
|
||||||
.filter(|(_, v)| v.is_complete())
|
.find(|(_, v)| v.is_complete())
|
||||||
.next()
|
|
||||||
.map(|(vi, _)| vi);
|
.map(|(vi, _)| vi);
|
||||||
|
|
||||||
if let Some(last_vi) = last_complete {
|
if let Some(last_vi) = last_complete {
|
||||||
|
Loading…
Reference in New Issue
Block a user