|
|
|
@ -5,6 +5,16 @@ use std::{collections, sync}; |
|
|
|
|
#[derive(Clone)] |
|
|
|
|
struct DescrState { |
|
|
|
|
current_tree: gix_hash::ObjectId, |
|
|
|
|
tree_cache: collections::HashMap<gix_hash::ObjectId, gix_object::Tree>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl DescrState { |
|
|
|
|
fn new(current_tree: gix_hash::ObjectId) -> DescrState { |
|
|
|
|
DescrState { |
|
|
|
|
current_tree, |
|
|
|
|
tree_cache: Default::default(), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(thiserror::Error, Clone, Debug, PartialEq)] |
|
|
|
@ -20,10 +30,6 @@ enum GetObjectError { |
|
|
|
|
pub struct Proxy { |
|
|
|
|
client: reqwest::Client, |
|
|
|
|
state: sync::RwLock<collections::HashMap<origin::Descr, DescrState>>, |
|
|
|
|
|
|
|
|
|
// to prevent syncing the same origin more than once at a time, but still allow hitting that
|
|
|
|
|
// origin in during a sync.
|
|
|
|
|
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl Proxy { |
|
|
|
@ -183,23 +189,50 @@ impl Proxy { |
|
|
|
|
tree_hash: &gix_hash::ObjectId, |
|
|
|
|
entry_name: &str, |
|
|
|
|
) -> Result<gix_object::tree::Entry, origin::GetFileError> { |
|
|
|
|
let tree_object_bytes = self |
|
|
|
|
.get_object(descr, tree_hash, gix_object::Kind::Tree) |
|
|
|
|
.await |
|
|
|
|
.map_err(|e| match e { |
|
|
|
|
GetObjectError::Unavailable => origin::GetFileError::Unavailable, |
|
|
|
|
GetObjectError::Unexpected(_) => e.into_unexpected().into(), |
|
|
|
|
})? |
|
|
|
|
.read_to_end() |
|
|
|
|
.await |
|
|
|
|
.or(Err(origin::GetFileError::Unavailable))?; |
|
|
|
|
let cache_entry = { |
|
|
|
|
let states = self.state.read().unwrap(); |
|
|
|
|
states |
|
|
|
|
.get(descr) |
|
|
|
|
.expect("get_tree_entry called with unknown descr") |
|
|
|
|
.tree_cache |
|
|
|
|
.get(tree_hash) |
|
|
|
|
.cloned() |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let tree_object: gix_object::Tree = match cache_entry { |
|
|
|
|
Some(tree_object) => tree_object, |
|
|
|
|
None => { |
|
|
|
|
let tree_object_bytes = self |
|
|
|
|
.get_object(descr, tree_hash, gix_object::Kind::Tree) |
|
|
|
|
.await |
|
|
|
|
.map_err(|e| match e { |
|
|
|
|
GetObjectError::Unavailable => origin::GetFileError::Unavailable, |
|
|
|
|
GetObjectError::Unexpected(_) => e.into_unexpected().into(), |
|
|
|
|
})? |
|
|
|
|
.read_to_end() |
|
|
|
|
.await |
|
|
|
|
.or(Err(origin::GetFileError::Unavailable))?; |
|
|
|
|
|
|
|
|
|
let tree_object: gix_object::Tree = |
|
|
|
|
gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref()) |
|
|
|
|
.or(Err(origin::GetFileError::Unavailable))? |
|
|
|
|
.into(); |
|
|
|
|
|
|
|
|
|
self.state |
|
|
|
|
.write() |
|
|
|
|
.unwrap() |
|
|
|
|
.get_mut(descr) |
|
|
|
|
.expect("get_tree_entry called with unknown descr") |
|
|
|
|
.tree_cache |
|
|
|
|
.insert(*tree_hash, tree_object.clone()); |
|
|
|
|
|
|
|
|
|
let tree_object = gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref()) |
|
|
|
|
.or(Err(origin::GetFileError::Unavailable))?; |
|
|
|
|
tree_object |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
for entry in tree_object.entries { |
|
|
|
|
for entry in tree_object.entries.iter() { |
|
|
|
|
if entry.filename == entry_name { |
|
|
|
|
return Ok(entry.into()); |
|
|
|
|
return Ok(entry.clone()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -211,35 +244,24 @@ impl origin::Store for Proxy { |
|
|
|
|
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> { |
|
|
|
|
let descr = descr.clone(); |
|
|
|
|
Box::pin(async move { |
|
|
|
|
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
|
|
|
|
|
// isn't actually being held for the whole method duration.
|
|
|
|
|
let is_already_syncing = { |
|
|
|
|
self.sync_guard |
|
|
|
|
.lock() |
|
|
|
|
.unwrap() |
|
|
|
|
.insert(descr.clone(), ()) |
|
|
|
|
.is_some() |
|
|
|
|
}; |
|
|
|
|
let commit_hash = self.get_tip_commit_hash(&descr).await?; |
|
|
|
|
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?; |
|
|
|
|
|
|
|
|
|
if is_already_syncing { |
|
|
|
|
return Err(origin::SyncError::AlreadyInProgress); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// perform the rest of the work within this closure, so we can be sure that the guard
|
|
|
|
|
// lock is released no matter what.
|
|
|
|
|
let res = async { |
|
|
|
|
let commit_hash = self.get_tip_commit_hash(&descr).await?; |
|
|
|
|
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?; |
|
|
|
|
self.state |
|
|
|
|
.write() |
|
|
|
|
.unwrap() |
|
|
|
|
.insert(descr.clone(), DescrState { current_tree }); |
|
|
|
|
Ok(()) |
|
|
|
|
use std::collections::hash_map::Entry; |
|
|
|
|
match self.state.write().unwrap().entry(descr) { |
|
|
|
|
Entry::Vacant(entry) => { |
|
|
|
|
entry.insert(DescrState::new(current_tree)); |
|
|
|
|
} |
|
|
|
|
Entry::Occupied(mut entry) => { |
|
|
|
|
// if this is not the first sync then we don't want to overwrite the entry
|
|
|
|
|
// unless the tree has actually changed, in order to preserve the cache.
|
|
|
|
|
if entry.get().current_tree != current_tree { |
|
|
|
|
entry.insert(DescrState::new(current_tree)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
.await; |
|
|
|
|
|
|
|
|
|
self.sync_guard.lock().unwrap().remove(&descr); |
|
|
|
|
res |
|
|
|
|
Ok(()) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|