use crate::error::unexpected::{self, Intoable, Mappable}; use crate::{origin, util}; use std::{collections, sync}; #[derive(Clone)] struct DescrState { current_tree: gix_hash::ObjectId, } #[derive(thiserror::Error, Clone, Debug, PartialEq)] enum GetObjectError { #[error("unavailable due to server-side issue")] Unavailable, #[error(transparent)] Unexpected(#[from] unexpected::Error), } #[derive(Default)] pub struct Proxy { client: reqwest::Client, state: sync::RwLock>, // to prevent syncing the same origin more than once at a time, but still allow hitting that // origin in during a sync. sync_guard: sync::Mutex>, } impl Proxy { pub fn new() -> Proxy { Proxy::default() } fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) { let origin::Descr::Git { ref url, ref branch_name, } = descr; (url, branch_name) } fn construct_url( url: &origin::descr::GitUrl, sub_path: &str, ) -> unexpected::Result { let mut url: reqwest::Url = { url.parsed .to_string() .parse() .or_unexpected_while("parsing url as reqwest url")? }; let new_path = url .path() .parse::() .or_unexpected_while("parsing url path")? .join(sub_path); url.set_path( new_path .to_str() .or_unexpected_while("converting new path to string")?, ); Ok(url) } async fn get_tip_commit_hash( &self, descr: &origin::Descr, ) -> Result { let (url, branch_name) = Self::deconstruct_descr(descr); let refs_url = Self::construct_url(url, "info/refs").or_unexpected_while("constructing refs url")?; // when fetching refs we assume that any issue indicates that the origin itself // (and therefore the URL) has some kind of issue. let refs = self .client .get(refs_url) .send() .await .or(Err(origin::SyncError::InvalidURL))? .error_for_status() .or(Err(origin::SyncError::InvalidURL))? .text() .await .or(Err(origin::SyncError::InvalidURL))?; let full_ref = format!("refs/heads/{}", branch_name); for line in refs.lines() { if !line.ends_with(full_ref.as_str()) { continue; } return gix_hash::ObjectId::from_hex( line.split_ascii_whitespace() .next() .ok_or(origin::SyncError::InvalidURL)? .as_bytes(), ) .or(Err(origin::SyncError::InvalidURL)); } Err(origin::SyncError::InvalidBranchName) } async fn get_object( &self, descr: &origin::Descr, oid: &gix_hash::ObjectId, expect_kind: gix_object::Kind, ) -> Result { let hex = oid.to_string(); let (url, _) = Self::deconstruct_descr(descr); let object_url = Self::construct_url(url, format!("objects/{}/{}", &hex[..2], &hex[2..]).as_str()) .or_unexpected_while("constructing refs url")?; let mut loose_object = self .client .get(object_url) .send() .await .or(Err(GetObjectError::Unavailable))? .error_for_status() .map(|res| { use async_compression::tokio::bufread::ZlibDecoder; use futures::stream::TryStreamExt; use std::io; tokio::io::BufReader::new(ZlibDecoder::new(tokio_util::io::StreamReader::new( res.bytes_stream() .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), ))) }) .or(Err(GetObjectError::Unavailable))?; use tokio::io::AsyncBufReadExt; let mut header = Vec::::new(); loose_object .read_until(0, &mut header) .await .or(Err(GetObjectError::Unavailable))?; let (kind, _, _) = gix_object::decode::loose_header(&header).or(Err(GetObjectError::Unavailable))?; if kind != expect_kind { return Err(GetObjectError::Unavailable); } Ok(util::BoxByteStream::from_async_read(loose_object)) } async fn get_commit_tree( &self, descr: &origin::Descr, commit_hash: &gix_hash::ObjectId, ) -> Result { let commit_object_bytes = self .get_object(descr, commit_hash, gix_object::Kind::Commit) .await .map_err(|e| match e { GetObjectError::Unavailable => origin::SyncError::Unavailable, GetObjectError::Unexpected(_) => e.into_unexpected().into(), })? .read_to_end() .await .or(Err(origin::SyncError::Unavailable))?; let commit_object = gix_object::CommitRef::from_bytes(commit_object_bytes.as_ref()) .or(Err(origin::SyncError::Unavailable))?; Ok(commit_object.tree()) } async fn get_tree_entry( &self, descr: &origin::Descr, tree_hash: &gix_hash::ObjectId, entry_name: &str, ) -> Result { let tree_object_bytes = self .get_object(descr, tree_hash, gix_object::Kind::Tree) .await .map_err(|e| match e { GetObjectError::Unavailable => origin::GetFileError::Unavailable, GetObjectError::Unexpected(_) => e.into_unexpected().into(), })? .read_to_end() .await .or(Err(origin::GetFileError::Unavailable))?; let tree_object = gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref()) .or(Err(origin::GetFileError::Unavailable))?; for entry in tree_object.entries { if entry.filename == entry_name { return Ok(entry.into()); } } Err(origin::GetFileError::FileNotFound) } } impl origin::Store for Proxy { fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> { let descr = descr.clone(); Box::pin(async move { // attempt to lock this descr for syncing, doing so within a new scope so the mutex // isn't actually being held for the whole method duration. let is_already_syncing = { self.sync_guard .lock() .unwrap() .insert(descr.clone(), ()) .is_some() }; if is_already_syncing { return Err(origin::SyncError::AlreadyInProgress); } // perform the rest of the work within this closure, so we can be sure that the guard // lock is released no matter what. let res = async { let commit_hash = self.get_tip_commit_hash(&descr).await?; let current_tree = self.get_commit_tree(&descr, &commit_hash).await?; self.state .write() .unwrap() .insert(descr.clone(), DescrState { current_tree }); Ok(()) } .await; self.sync_guard.lock().unwrap().remove(&descr); res }) } fn get_file( &self, descr: &origin::Descr, path: &str, ) -> util::BoxFuture<'_, Result> { let descr = descr.clone(); let path = path.to_string(); Box::pin(async move { let current_state = self .state .read() .unwrap() .get(&descr) .ok_or(origin::GetFileError::DescrNotSynced)? .clone(); let path = path .as_str() .parse::() .or_unexpected_while("parsing path")?; let path_parts = path.iter().collect::>(); let path_parts_len = path_parts.len(); if path_parts_len < 2 { return Err(unexpected::Error::from("path has fewer than 2 parts").into()); } else if path_parts[0] != std::path::MAIN_SEPARATOR_STR { return Err(unexpected::Error::from(format!( "expected first path part to be separator, found {:?}", path_parts[0] )) .into()); } let mut tree_hash = current_state.current_tree; // The first part is "/" (main separator), and the last is the file name itself. // Everything in between (if any) should be directories, so navigate those. for dir_name in path_parts[1..path_parts_len - 1].iter() { let entry = self .get_tree_entry( &descr, &tree_hash, dir_name .to_str() .map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?, ) .await?; if !entry.mode.is_tree() { return Err(origin::GetFileError::FileNotFound); } tree_hash = entry.oid; } let file_name = { let file_name = path_parts[path_parts_len - 1]; file_name .to_str() .map_unexpected_while(|| format!("decoding file name {file_name:?}"))? }; let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?; // TODO handle symlinks if entry.mode.is_tree() { return Err(origin::GetFileError::PathIsDirectory); } else if !entry.mode.is_blob() { return Err(unexpected::Error::from(format!( "can't handle entry {} of mode {}", entry.filename, entry.mode.as_str() )) .into()); } self.get_object(&descr, &entry.oid, gix_object::Kind::Blob) .await .map_err(|e| match e { GetObjectError::Unavailable => origin::GetFileError::Unavailable, GetObjectError::Unexpected(_) => e .into_unexpected_while(format!("getting object for entry {:?}", entry)) .into(), }) }) } }