Change debug prints a bit
This commit is contained in:
parent
2f3b1a072f
commit
2a05fd135a
@ -96,7 +96,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut must_exit: watch::Receiver<bool>,
|
mut must_exit: watch::Receiver<bool>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
tokio::time::delay_for(Duration::from_secs(10));
|
tokio::time::delay_for(Duration::from_secs(10)).await;
|
||||||
|
|
||||||
self.todo.lock().await.add_full_scan(&self.table);
|
self.todo.lock().await.add_full_scan(&self.table);
|
||||||
let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
|
let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
|
||||||
@ -111,12 +111,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
select! {
|
select! {
|
||||||
_ = next_full_scan => {
|
_ = next_full_scan => {
|
||||||
next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
|
next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
|
||||||
eprintln!("Adding full scan to syncer todo list");
|
eprintln!("({}) Adding full scan to syncer todo list", self.table.name);
|
||||||
self.todo.lock().await.add_full_scan(&self.table);
|
self.todo.lock().await.add_full_scan(&self.table);
|
||||||
}
|
}
|
||||||
new_ring_r = s_ring_recv => {
|
new_ring_r = s_ring_recv => {
|
||||||
if let Some(new_ring) = new_ring_r {
|
if let Some(new_ring) = new_ring_r {
|
||||||
eprintln!("Adding ring difference to syncer todo list");
|
eprintln!("({}) Adding ring difference to syncer todo list", self.table.name);
|
||||||
self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
|
self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
|
||||||
prev_ring = new_ring;
|
prev_ring = new_ring;
|
||||||
}
|
}
|
||||||
@ -139,7 +139,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
if let Some(partition) = self.todo.lock().await.pop_task() {
|
if let Some(partition) = self.todo.lock().await.pop_task() {
|
||||||
let res = self.clone().sync_partition(&partition, &mut must_exit).await;
|
let res = self.clone().sync_partition(&partition, &mut must_exit).await;
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
eprintln!("Error while syncing {:?}: {}", partition, e);
|
eprintln!("({}) Error while syncing {:?}: {}", self.table.name, partition, e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tokio::time::delay_for(Duration::from_secs(1)).await;
|
tokio::time::delay_for(Duration::from_secs(1)).await;
|
||||||
@ -149,9 +149,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
|
async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
|
||||||
eprintln!("Calculating root checksum for {:?}...", partition);
|
eprintln!("({}) Calculating root checksum for {:?}...", self.table.name, partition);
|
||||||
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
|
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
|
||||||
eprintln!("Root checksum for {:?}: {:?}", partition, root_cks);
|
eprintln!("({}) Root checksum for {:?}: {:?}", self.table.name, partition, root_cks);
|
||||||
|
|
||||||
let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor);
|
let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor);
|
||||||
let mut sync_futures = nodes.iter()
|
let mut sync_futures = nodes.iter()
|
||||||
@ -160,7 +160,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
|
|
||||||
while let Some(r) = sync_futures.next().await {
|
while let Some(r) = sync_futures.next().await {
|
||||||
if let Err(e) = r {
|
if let Err(e) = r {
|
||||||
eprintln!("Sync error: {}", e);
|
eprintln!("({}) Sync error: {}", self.table.name, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !partition.retain {
|
if !partition.retain {
|
||||||
@ -198,7 +198,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
let v = self.range_checksum_inner(&range, must_exit).await?;
|
let v = self.range_checksum_inner(&range, must_exit).await?;
|
||||||
|
|
||||||
let mut cache = self.cache[range.level].lock().await;
|
let mut cache = self.cache[range.level].lock().await;
|
||||||
eprintln!("Checksum for {:?}: {:?}", range, v);
|
eprintln!("({}) Checksum for {:?}: {:?}", self.table.name, range, v);
|
||||||
cache.insert(range.clone(), v.clone());
|
cache.insert(range.clone(), v.clone());
|
||||||
Ok(v)
|
Ok(v)
|
||||||
}.boxed()
|
}.boxed()
|
||||||
@ -281,7 +281,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
todo.push_back(root_ck);
|
todo.push_back(root_ck);
|
||||||
|
|
||||||
while !todo.is_empty() && !*must_exit.borrow() {
|
while !todo.is_empty() && !*must_exit.borrow() {
|
||||||
eprintln!("Sync with {:?}: {} remaining", who, todo.len());
|
eprintln!("({}) Sync with {:?}: {} remaining", self.table.name, who, todo.len());
|
||||||
|
|
||||||
let end = std::cmp::min(16, todo.len());
|
let end = std::cmp::min(16, todo.len());
|
||||||
let step = todo.drain(..end).collect::<Vec<_>>();
|
let step = todo.drain(..end).collect::<Vec<_>>();
|
||||||
@ -308,7 +308,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
|
async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
|
||||||
eprintln!("Sending {} items to {:?}", item_list.len(), who);
|
eprintln!("({}) Sending {} items to {:?}", self.table.name, item_list.len(), who);
|
||||||
|
|
||||||
let mut values = vec![];
|
let mut values = vec![];
|
||||||
for item in item_list.iter() {
|
for item in item_list.iter() {
|
||||||
|
Loading…
Reference in New Issue
Block a user