|
|
|
@ -2,16 +2,19 @@ |
|
|
|
|
use std::sync::Arc; |
|
|
|
|
use std::time::{Duration, UNIX_EPOCH}; |
|
|
|
|
|
|
|
|
|
use futures::stream::*; |
|
|
|
|
use futures::future; |
|
|
|
|
use futures::stream::{self, StreamExt}; |
|
|
|
|
use http::header::{ |
|
|
|
|
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, |
|
|
|
|
IF_NONE_MATCH, LAST_MODIFIED, RANGE, |
|
|
|
|
}; |
|
|
|
|
use hyper::{Body, Request, Response, StatusCode}; |
|
|
|
|
use tokio::sync::mpsc; |
|
|
|
|
|
|
|
|
|
use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; |
|
|
|
|
use garage_table::EmptyKey; |
|
|
|
|
use garage_util::data::*; |
|
|
|
|
use garage_util::error::OkOrMessage; |
|
|
|
|
|
|
|
|
|
use garage_model::garage::Garage; |
|
|
|
|
use garage_model::s3::object_table::*; |
|
|
|
@ -242,43 +245,56 @@ pub async fn handle_get( |
|
|
|
|
Ok(resp_builder.body(body)?) |
|
|
|
|
} |
|
|
|
|
ObjectVersionData::FirstBlock(_, first_block_hash) => { |
|
|
|
|
let order_stream = OrderTag::stream(); |
|
|
|
|
|
|
|
|
|
let read_first_block = garage |
|
|
|
|
.block_manager |
|
|
|
|
.rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0))); |
|
|
|
|
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); |
|
|
|
|
let (tx, rx) = mpsc::channel(2); |
|
|
|
|
|
|
|
|
|
let (first_block_stream, version) = |
|
|
|
|
futures::try_join!(read_first_block, get_next_blocks)?; |
|
|
|
|
let version = version.ok_or(Error::NoSuchKey)?; |
|
|
|
|
let order_stream = OrderTag::stream(); |
|
|
|
|
let first_block_hash = *first_block_hash; |
|
|
|
|
let version_uuid = last_v.uuid; |
|
|
|
|
|
|
|
|
|
tokio::spawn(async move { |
|
|
|
|
match async { |
|
|
|
|
let garage2 = garage.clone(); |
|
|
|
|
let version_fut = tokio::spawn(async move { |
|
|
|
|
garage2.version_table.get(&version_uuid, &EmptyKey).await |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
let stream_block_0 = garage |
|
|
|
|
.block_manager |
|
|
|
|
.rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0))) |
|
|
|
|
.await?; |
|
|
|
|
tx.send(stream_block_0) |
|
|
|
|
.await |
|
|
|
|
.ok_or_message("channel closed")?; |
|
|
|
|
|
|
|
|
|
let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; |
|
|
|
|
for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { |
|
|
|
|
let stream_block_i = garage |
|
|
|
|
.block_manager |
|
|
|
|
.rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64))) |
|
|
|
|
.await?; |
|
|
|
|
tx.send(stream_block_i) |
|
|
|
|
.await |
|
|
|
|
.ok_or_message("channel closed")?; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let mut blocks = version |
|
|
|
|
.blocks |
|
|
|
|
.items() |
|
|
|
|
.iter() |
|
|
|
|
.map(|(_, vb)| (vb.hash, None)) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
blocks[0].1 = Some(first_block_stream); |
|
|
|
|
|
|
|
|
|
let body_stream = futures::stream::iter(blocks) |
|
|
|
|
.enumerate() |
|
|
|
|
.map(move |(i, (hash, stream_opt))| { |
|
|
|
|
let garage = garage.clone(); |
|
|
|
|
async move { |
|
|
|
|
if let Some(stream) = stream_opt { |
|
|
|
|
stream |
|
|
|
|
} else { |
|
|
|
|
garage |
|
|
|
|
.block_manager |
|
|
|
|
.rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) |
|
|
|
|
.await |
|
|
|
|
.unwrap_or_else(|e| error_stream(i, e)) |
|
|
|
|
} |
|
|
|
|
Ok::<(), Error>(()) |
|
|
|
|
} |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok(()) => (), |
|
|
|
|
Err(e) => { |
|
|
|
|
let err = std::io::Error::new( |
|
|
|
|
std::io::ErrorKind::Other, |
|
|
|
|
format!("Error while getting object data: {}", e), |
|
|
|
|
); |
|
|
|
|
let _ = tx |
|
|
|
|
.send(Box::pin(stream::once(future::ready(Err(err))))) |
|
|
|
|
.await; |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.buffered(2) |
|
|
|
|
.flatten(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); |
|
|
|
|
|
|
|
|
|
let body = hyper::body::Body::wrap_stream(body_stream); |
|
|
|
|
Ok(resp_builder.body(body)?) |
|
|
|
|