Implemented sync method of new git_proxy module
This module is a WIP, intended to replace the existing git store with one that doesn't require any locally managed state.
This commit is contained in:
parent
57ee5ff30e
commit
98ddefad4f
513
Cargo.lock
generated
513
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -42,11 +42,13 @@ log = "0.4.19"
|
|||||||
env_logger = "0.10.0"
|
env_logger = "0.10.0"
|
||||||
serde_yaml = "0.9.22"
|
serde_yaml = "0.9.22"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
reqwest = "0.11.18"
|
|
||||||
hyper-reverse-proxy = "0.5.2-dev"
|
hyper-reverse-proxy = "0.5.2-dev"
|
||||||
gemini = "0.0.5"
|
gemini = "0.0.5"
|
||||||
bytes = "1.4.0"
|
bytes = "1.4.0"
|
||||||
hyper-trust-dns = "0.5.0"
|
hyper-trust-dns = "0.5.0"
|
||||||
|
gix-hash = "0.14.1"
|
||||||
|
reqwest = { version = "0.11.23", features = ["gzip", "deflate"] }
|
||||||
|
gix-object = "0.41.0"
|
||||||
|
|
||||||
[patch.crates-io]
|
[patch.crates-io]
|
||||||
|
|
||||||
|
188
src/origin/git_proxy.rs
Normal file
188
src/origin/git_proxy.rs
Normal file
@ -0,0 +1,188 @@
|
|||||||
|
use crate::error::unexpected::{self, Mappable};
|
||||||
|
use crate::{origin, util};
|
||||||
|
use std::{collections, sync};
|
||||||
|
|
||||||
|
struct DescrState {
|
||||||
|
current_tree: gix_hash::ObjectId,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct Proxy {
|
||||||
|
client: reqwest::Client,
|
||||||
|
state: sync::RwLock<collections::HashMap<origin::Descr, DescrState>>,
|
||||||
|
|
||||||
|
// to prevent syncing the same origin more than once at a time, but still allow hitting that
|
||||||
|
// origin in during a sync.
|
||||||
|
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Proxy {
|
||||||
|
pub fn new() -> Proxy {
|
||||||
|
Proxy::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) {
|
||||||
|
let origin::Descr::Git {
|
||||||
|
ref url,
|
||||||
|
ref branch_name,
|
||||||
|
} = descr;
|
||||||
|
(url, branch_name)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn construct_url(
|
||||||
|
url: &origin::descr::GitUrl,
|
||||||
|
sub_path: &str,
|
||||||
|
) -> unexpected::Result<reqwest::Url> {
|
||||||
|
let mut url: reqwest::Url = {
|
||||||
|
url.parsed
|
||||||
|
.to_string()
|
||||||
|
.parse()
|
||||||
|
.or_unexpected_while("parsing url as reqwest url")?
|
||||||
|
};
|
||||||
|
|
||||||
|
let base_path = match url.to_file_path() {
|
||||||
|
Ok(path) => path,
|
||||||
|
Err(()) => return Err(unexpected::Error::from("extracting path from url")),
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_path = base_path.join(sub_path);
|
||||||
|
|
||||||
|
url.set_path(
|
||||||
|
new_path
|
||||||
|
.to_str()
|
||||||
|
.or_unexpected_while("converting new path to string")?,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(url)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_tip_commit_hash(
|
||||||
|
&self,
|
||||||
|
descr: &origin::Descr,
|
||||||
|
) -> Result<gix_hash::ObjectId, origin::SyncError> {
|
||||||
|
let (url, branch_name) = Self::deconstruct_descr(&descr);
|
||||||
|
|
||||||
|
let refs_url =
|
||||||
|
Self::construct_url(&url, "/info/refs").or_unexpected_while("constructing refs url")?;
|
||||||
|
|
||||||
|
// when fetching refs we assume that any issue indicates that the origin itself
|
||||||
|
// (and therefore the URL) has some kind of issue.
|
||||||
|
let refs = self
|
||||||
|
.client
|
||||||
|
.get(refs_url)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?
|
||||||
|
.error_for_status()
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?
|
||||||
|
.text()
|
||||||
|
.await
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?;
|
||||||
|
|
||||||
|
let full_ref = format!("refs/heads/{}", branch_name);
|
||||||
|
for line in refs.lines() {
|
||||||
|
if !line.ends_with(full_ref.as_str()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return gix_hash::ObjectId::from_hex(
|
||||||
|
line.split_ascii_whitespace()
|
||||||
|
.next()
|
||||||
|
.ok_or(origin::SyncError::InvalidURL)?
|
||||||
|
.as_bytes(),
|
||||||
|
)
|
||||||
|
.or(Err(origin::SyncError::InvalidURL));
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(origin::SyncError::InvalidBranchName)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_commit_tree(
|
||||||
|
&self,
|
||||||
|
descr: &origin::Descr,
|
||||||
|
commit_hash: &gix_hash::ObjectId,
|
||||||
|
) -> Result<gix_hash::ObjectId, origin::SyncError> {
|
||||||
|
let hex = commit_hash.to_string();
|
||||||
|
let (url, _) = Self::deconstruct_descr(&descr);
|
||||||
|
|
||||||
|
let commit_object_url = Self::construct_url(
|
||||||
|
&url,
|
||||||
|
format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(),
|
||||||
|
)
|
||||||
|
.or_unexpected_while("constructing refs url")?;
|
||||||
|
|
||||||
|
let commit_object_bytes = self
|
||||||
|
.client
|
||||||
|
.get(commit_object_url)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?
|
||||||
|
.error_for_status()
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?
|
||||||
|
.bytes()
|
||||||
|
.await
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?;
|
||||||
|
|
||||||
|
let commit_object = gix_object::ObjectRef::from_loose(commit_object_bytes.as_ref())
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?
|
||||||
|
.into_commit()
|
||||||
|
.ok_or(origin::SyncError::InvalidURL)?;
|
||||||
|
|
||||||
|
Ok(commit_object.tree())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl origin::Store for Proxy {
|
||||||
|
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
|
||||||
|
let descr = descr.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
|
||||||
|
// isn't actually being held for the whole method duration.
|
||||||
|
let is_already_syncing = {
|
||||||
|
self.sync_guard
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(descr.clone(), ())
|
||||||
|
.is_some()
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_already_syncing {
|
||||||
|
return Err(origin::SyncError::AlreadyInProgress);
|
||||||
|
}
|
||||||
|
|
||||||
|
// perform the rest of the work within this closure, so we can be sure that the guard
|
||||||
|
// lock is released no matter what.
|
||||||
|
let res: Result<(), origin::SyncError> = (|| async {
|
||||||
|
let commit_hash = self.get_tip_commit_hash(&descr).await?;
|
||||||
|
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?;
|
||||||
|
self.state
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.insert(descr.clone(), DescrState { current_tree });
|
||||||
|
Ok(())
|
||||||
|
})()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
self.sync_guard.lock().unwrap().remove(&descr);
|
||||||
|
res
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
|
||||||
|
Ok(self
|
||||||
|
.state
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.keys()
|
||||||
|
.map(|d| d.clone())
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_file(
|
||||||
|
&self,
|
||||||
|
_descr: &origin::Descr,
|
||||||
|
_path: &str,
|
||||||
|
) -> Result<util::BoxByteStream, origin::GetFileError> {
|
||||||
|
panic!("TODO")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user