Skip to content

Commit

Permalink
[Consensus] use dynamic timeouts in commit sync (#19705)
Browse files Browse the repository at this point in the history
## Description 

Using a flat high timeouts in commit sync can stall fetching for too
long and hurt throughput, when slow hosts block fetching blocks for the
max timeout. Using timeouts that are too short can be problematic too.
Instead, start the request timeout at 10s and gradually increase it with
retries.

## Test plan 

CI
PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
mwtian committed Oct 4, 2024
1 parent 9d23ec4 commit 40d9ec7
Showing 1 changed file with 62 additions and 18 deletions.
80 changes: 62 additions & 18 deletions consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,15 @@ impl<C: NetworkClient> CommitSyncer<C> {
inner: Arc<Inner<C>>,
commit_range: CommitRange,
) -> (CommitIndex, Vec<TrustedCommit>, Vec<VerifiedBlock>) {
// Individual request base timeout.
const TIMEOUT: Duration = Duration::from_secs(10);
// Max per-request timeout will be base timeout times a multiplier.
// At the extreme, this means there will be 120s timeout to fetch max_blocks_per_fetch blocks.
const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
// timeout * max number of targets should be reasonably small, so the
// system can adjust to slow network or large data sizes quickly.
const MAX_NUM_TARGETS: usize = 24;
let mut timeout_multiplier = 0;
let _timer = inner
.context
.metrics
Expand All @@ -381,7 +390,8 @@ impl<C: NetworkClient> CommitSyncer<C> {
.start_timer();
info!("Starting to fetch commits in {commit_range:?} ...",);
loop {
let mut all_authorities = inner
// Attempt to fetch commits and blocks through min(committee size, MAX_NUM_TARGETS) peers.
let mut target_authorities = inner
.context
.committee
.authorities()
Expand All @@ -393,21 +403,42 @@ impl<C: NetworkClient> CommitSyncer<C> {
}
})
.collect_vec();
all_authorities.shuffle(&mut ThreadRng::default());
for authority in all_authorities {
match Self::fetch_once(inner.clone(), authority, commit_range.clone()).await {
Ok((commits, blocks)) => {
target_authorities.shuffle(&mut ThreadRng::default());
target_authorities.truncate(MAX_NUM_TARGETS);
// Increase timeout multiplier for each loop until MAX_TIMEOUT_MULTIPLIER.
timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
let request_timeout = TIMEOUT * timeout_multiplier;
// Give enough overall timeout for fetching commits and blocks.
// - Timeout for fetching commits and commit certifying blocks.
// - Timeout for fetching blocks referenced by the commits.
// - Time spent on pipelining requests to fetch blocks.
// - Another headroom to allow fetch_once() to timeout gracefully if possible.
let fetch_timeout = request_timeout * 4;
// Try fetching from selected target authority.
for authority in target_authorities {
match tokio::time::timeout(
fetch_timeout,
Self::fetch_once(
inner.clone(),
authority,
commit_range.clone(),
request_timeout,
),
)
.await
{
Ok(Ok((commits, blocks))) => {
info!("Finished fetching commits in {commit_range:?}",);
return (commit_range.end(), commits, blocks);
}
Err(e) => {
Ok(Err(e)) => {
let hostname = inner
.context
.committee
.authority(authority)
.hostname
.clone();
warn!("Failed to fetch from {hostname}: {}", e);
warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
let error: &'static str = e.into();
inner
.context
Expand All @@ -417,6 +448,22 @@ impl<C: NetworkClient> CommitSyncer<C> {
.with_label_values(&[&hostname, error])
.inc();
}
Err(_) => {
let hostname = inner
.context
.committee
.authority(authority)
.hostname
.clone();
warn!("Timed out fetching {commit_range:?} from {authority}",);
inner
.context
.metrics
.node_metrics
.commit_sync_fetch_once_errors
.with_label_values(&[&hostname, "FetchTimeout"])
.inc();
}
}
}
}
Expand All @@ -429,10 +476,8 @@ impl<C: NetworkClient> CommitSyncer<C> {
inner: Arc<Inner<C>>,
target_authority: AuthorityIndex,
commit_range: CommitRange,
timeout: Duration,
) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(15);
const FETCH_BLOCKS_TIMEOUT: Duration = Duration::from_secs(120);

let _timer = inner
.context
.metrics
Expand All @@ -443,11 +488,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
// 1. Fetch commits in the commit range from the target authority.
let (serialized_commits, serialized_blocks) = inner
.network_client
.fetch_commits(
target_authority,
commit_range.clone(),
FETCH_COMMITS_TIMEOUT,
)
.fetch_commits(target_authority, commit_range.clone(), timeout)
.await?;

// 2. Verify the response contains blocks that can certify the last returned commit,
Expand All @@ -462,23 +503,26 @@ impl<C: NetworkClient> CommitSyncer<C> {

// 3. Fetch blocks referenced by the commits, from the same authority.
let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
let num_chunks = block_refs
.len()
.div_ceil(inner.context.parameters.max_blocks_per_fetch)
as u32;
let mut requests: FuturesOrdered<_> = block_refs
.chunks(inner.context.parameters.max_blocks_per_fetch)
.enumerate()
.map(|(i, request_block_refs)| {
let i = i as u32;
let inner = inner.clone();
async move {
// 4. Send out pipelined fetch requests to avoid overloading the target authority.
sleep(Duration::from_millis(200) * i).await;
sleep(timeout * i as u32 / num_chunks).await;
// TODO: add some retries.
let serialized_blocks = inner
.network_client
.fetch_blocks(
target_authority,
request_block_refs.to_vec(),
vec![],
FETCH_BLOCKS_TIMEOUT,
timeout,
)
.await?;
// 5. Verify the same number of blocks are returned as requested.
Expand Down

0 comments on commit 40d9ec7

Please sign in to comment.