Skip to content

Commit

Permalink
Fix S3Store retry might cause poisoned data
Browse files Browse the repository at this point in the history
If using S3Store + VerifyStore is in use, S3 store could receive data
that VerifyStore deemed invalid due to retry logic.

This only effects S3Store + VerifyStore due to the way AwsS3Sdk crate
works, we need to hold recent data in the BufChannel, in the event
VerifyStore got an invalid hash, the retry logic in S3 would trigger,
but instead of being seen as "invalid" it would actually have stored the
sent data and sent it, causing S3 to to still receive the invalid data.
This PR causes BuffChannel logic to set a flag making the next read in
S3 store always trigger the error condition.
  • Loading branch information
allada committed Oct 2, 2024
1 parent b5a4d92 commit d6801d2
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 29 deletions.
6 changes: 3 additions & 3 deletions nativelink-store/src/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,16 +427,16 @@ where
// Note(allada) If the upload size is not known, we go down the multipart upload path.
// This is not very efficient, but it greatly reduces the complexity of the code.
if max_size < MIN_MULTIPART_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) {
let UploadSizeInfo::ExactSize(sz) = upload_size else {
unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
};
reader.set_max_recent_data_size(
u64::try_from(self.max_retry_buffer_per_request)
.err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?,
);
return self
.retrier
.retry(unfold(reader, move |mut reader| async move {
let UploadSizeInfo::ExactSize(sz) = upload_size else {
unreachable!("upload_size must be UploadSizeInfo::ExactSize here");
};
// We need to make a new pair here because the aws sdk does not give us
// back the body after we send it in order to retry.
let (mut tx, rx) = make_buf_channel_pair();
Expand Down
56 changes: 30 additions & 26 deletions nativelink-util/src/buf_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub fn make_buf_channel_pair() -> (DropCloserWriteHalf, DropCloserReadHalf) {
DropCloserReadHalf {
rx,
queued_data: VecDeque::new(),
last_err: None,
eof_sent,
bytes_received: 0,
recent_data: Vec::new(),
Expand All @@ -59,7 +60,7 @@ pub fn make_buf_channel_pair() -> (DropCloserWriteHalf, DropCloserReadHalf) {

/// Writer half of the pair.
pub struct DropCloserWriteHalf {
tx: Option<mpsc::Sender<Result<Bytes, Error>>>,
tx: Option<mpsc::Sender<Bytes>>,
bytes_written: u64,
eof_sent: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -93,16 +94,15 @@ impl DropCloserWriteHalf {
buf,
));
}
if let Err(err) = tx.send(Ok(buf)).await {
if let Err(err) = tx.send(buf).await {
// Close our channel.
self.tx = None;
return Err((
make_err!(
Code::Internal,
"Failed to write to data, receiver disconnected"
),
err.0
.expect("Data should never be Err in send_get_bytes_on_error()"),
err.0,
));
}
self.bytes_written += buf_len;
Expand Down Expand Up @@ -141,7 +141,7 @@ impl DropCloserWriteHalf {
match self.send_get_bytes_on_error(chunk).await {
Ok(()) => {}
Err(e) => {
reader.queued_data.push_front(Ok(e.1));
reader.queued_data.push_front(e.1);
return Err(e.0).err_tip(|| "In DropCloserWriteHalf::bind_buffered::send");
}
}
Expand Down Expand Up @@ -185,13 +185,15 @@ impl DropCloserWriteHalf {

/// Reader half of the pair.
pub struct DropCloserReadHalf {
rx: mpsc::Receiver<Result<Bytes, Error>>,
rx: mpsc::Receiver<Bytes>,
/// Number of bytes received over the stream.
bytes_received: u64,
eof_sent: Arc<AtomicBool>,
/// If there was an error in the stream, this will be set to the last error.
last_err: Option<Error>,
/// If not empty, this is the data that needs to be sent out before
/// data from the underlying channel can should be sent.
queued_data: VecDeque<Result<Bytes, Error>>,
queued_data: VecDeque<Bytes>,
/// As data is being read from the stream, this buffer will be filled
/// with the most recent data. Once `max_recent_data_size` is reached
/// this buffer will be cleared and no longer be populated.
Expand All @@ -210,17 +212,16 @@ impl DropCloserReadHalf {
self.rx.is_empty()
}

fn recv_inner(&mut self, data: Result<Bytes, Error>) -> Result<Bytes, Error> {
let chunk = data
.map_err(|e| make_err!(Code::Internal, "Received erroneous queued_data chunk: {e}"))?;

fn recv_inner(&mut self, chunk: Bytes) -> Result<Bytes, Error> {
// `queued_data` is allowed to have empty bytes that represent EOF
if chunk.is_empty() {
if !self.eof_sent.load(Ordering::Acquire) {
return Err(make_err!(
Code::Internal,
"Sender dropped before sending EOF"
));
let err = make_err!(Code::Internal, "Sender dropped before sending EOF");
self.queued_data.clear();
self.recent_data.clear();
self.bytes_received = 0;
self.last_err = Some(err.clone());
return Err(err);
};

self.maybe_populate_recent_data(&ZERO_DATA);
Expand All @@ -234,9 +235,10 @@ impl DropCloserReadHalf {

/// Try to receive a chunk of data, returning `None` if none is available.
pub fn try_recv(&mut self) -> Option<Result<Bytes, Error>> {
self.queued_data
.pop_front()
.map(|result| self.recv_inner(result))
if let Some(err) = &self.last_err {
return Some(Err(err.clone()));
}
self.queued_data.pop_front().map(Ok)
}

/// Receive a chunk of data, waiting asynchronously until some is available.
Expand All @@ -245,7 +247,7 @@ impl DropCloserReadHalf {
result
} else {
// `None` here indicates EOF, which we represent as Zero data
let data = self.rx.recv().await.unwrap_or(Ok(ZERO_DATA));
let data = self.rx.recv().await.unwrap_or(ZERO_DATA);
self.recv_inner(data)
}
}
Expand Down Expand Up @@ -285,7 +287,7 @@ impl DropCloserReadHalf {
let mut data_sum = 0;
for chunk in self.recent_data.drain(..).rev() {
data_sum += chunk.len() as u64;
self.queued_data.push_front(Ok(chunk));
self.queued_data.push_front(chunk);
}
assert!(self.recent_data.is_empty(), "Recent_data should be empty");
// Ensure the sum of the bytes in recent_data is equal to the bytes_received.
Expand Down Expand Up @@ -313,14 +315,16 @@ impl DropCloserReadHalf {
}

/// Peek the next set of bytes in the stream without consuming them.
pub async fn peek(&mut self) -> &Result<Bytes, Error> {
pub async fn peek(&mut self) -> Result<Bytes, Error> {
if self.queued_data.is_empty() {
let chunk = self.recv().await;
let chunk = self.recv().await.err_tip(|| "In buf_channel::peek")?;
self.queued_data.push_front(chunk);
}
self.queued_data
Ok(self
.queued_data
.front()
.expect("Should have data in the queue")
.cloned()
.expect("Should have data in the queue"))
}

/// The number of bytes received over this stream so far.
Expand All @@ -345,7 +349,7 @@ impl DropCloserReadHalf {
}
if chunk.len() > size {
let remaining = chunk.split_off(size);
self.queued_data.push_front(Ok(remaining));
self.queued_data.push_front(remaining);
// No need to read EOF if we are a partial chunk.
return Ok(chunk);
}
Expand Down Expand Up @@ -376,7 +380,7 @@ impl DropCloserReadHalf {
if output.len() + chunk.len() > size {
// Slice off the extra data and put it back into the queue. We are done.
let remaining = chunk.split_off(size - output.len());
self.queued_data.push_front(Ok(remaining));
self.queued_data.push_front(remaining);
}
output.extend_from_slice(&chunk);
if output.len() == size {
Expand Down
29 changes: 29 additions & 0 deletions nativelink-util/tests/buf_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,32 @@ async fn eof_can_send_twice() -> Result<(), Error> {
tx.send_eof().unwrap();
Ok(())
}

#[nativelink_test]
async fn set_max_recent_data_size_no_eof_then_retry_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
{
rx.set_max_recent_data_size(1024);
tx.send(DATA1.into()).await.unwrap();
drop(tx);
assert_eq!(rx.recv().await.unwrap(), Bytes::from(DATA1));
assert_eq!(
rx.recv().await,
Err(make_err!(
Code::Internal,
"Sender dropped before sending EOF"
))
);
}
{
rx.try_reset_stream().unwrap();
assert_eq!(
rx.recv().await,
Err(make_err!(
Code::Internal,
"Sender dropped before sending EOF"
))
);
}
Ok(())
}

0 comments on commit d6801d2

Please sign in to comment.