Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at implementing the operations API #1359

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

caass
Copy link
Member

@caass caass commented Sep 25, 2024

Description

Implement part of the operations API, specifically get, list, and wait.

Type of change

Please delete options that aren't relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to
    not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

TBD

Checklist

  • Updated documentation if needed
  • Tests added/amended
  • bazel test //... passes locally
  • PR is contained in a single commit, using git amend see some docs

This change is Reviewable

Copy link
Member Author

@caass caass left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+@allada +@adam-singer this is a first pass at implementing this API. How do you think I should test this? Aside from creating unit tests, I'm picturing writing some kind of shell script that sends GRPC requests to the server mid-build...

Reviewed 6 of 7 files at r1, 1 of 1 files at r6, all commit messages.
Reviewable status: 0 of 2 LGTMs obtained, and all files reviewed, and pending CI: Local / ubuntu-22.04, NativeLink.com Cloud / Remote Cache (Legacy Dockerfile Test), Remote / large-ubuntu-22.04, Web Platform Deployment / macos-14, Web Platform Deployment / ubuntu-24.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (22.04), integration-tests (22.04), pre-commit-checks, vale, and 7 discussions need to be resolved (waiting on @adam-singer and @allada)


-- commits line 1 at r6:
Nit these need to be squashed into a better commit message


nativelink-service/src/operations_server.rs line 22 at r6 (raw file):

use futures::{stream, StreamExt, TryStreamExt};
use lru::LruCache;
use nativelink_error::{Code, Error};

nit: move the nativelink_ imports below external libs


nativelink-service/src/operations_server.rs line 46 at r6 (raw file):

    seconds: 20,
    nanos: 0,
};

FYI This default comes from the default Hyper timeout, but I'm happy to pick a different number.

Code quote:

const WAIT_OPERATION_DEFAULT_TIMEOUT: prost_types::Duration = prost_types::Duration {
    seconds: 20,
    nanos: 0,
};

nativelink-service/src/operations_server.rs line 48 at r6 (raw file):

};

pub struct OpsServer {

#[derive(Debug)]


nativelink-service/src/operations_server.rs line 83 at r6 (raw file):

    ) -> Result<ListOperationsResponse, Error> {
        let mut action_state_results = if let Some(uuid) = page_uuid {
            self.cache.lock().await.pop(&uuid).ok_or_else(|| {

nit: use .err_tip_with_code


nativelink-service/src/operations_server.rs line 96 at r6 (raw file):

                    async move {
                        let operations = scheduler.filter_operations(filter).await?;
                        Ok(operations.collect::<Vec<_>>().await)

FYI there's an extraneous allocation here, we collect each scheduler's operations into a vec, and then fold those vecs into a vecdeque. This is to work around a lifetime bug where the lifetime of the operations stream (in let operations = scheduler.filter_operations(filter).await?;) is tied to the scheduler, which is dropped at the end of the async move block.


nativelink-service/src/operations_server.rs line 100 at r6 (raw file):

                })
                .try_fold(VecDeque::new(), |mut queue, mut operations| async move {
                    queue.extend(operations.drain(..));

Nit it's unclear there's any benefit to draining here, we could just copy and then drop the vec all at once which might be faster


nativelink-service/src/operations_server.rs line 160 at r6 (raw file):

    async fn wait_operation_inner(&self, operation_id: OperationId) -> Result<Operation, Error> {
        let mut action_state_result_maybe = None;
        for scheduler in self.schedulers.values() {

This is duplicated from the above function, and could be factored out into a async fn find_operation(&self, operation_id: OperationId) method.


nativelink-service/src/operations_server.rs line 228 at r6 (raw file):

            OperationFilter::default()
        } else {
            return Err(Status::unimplemented("filtering not implemented yet"));

It's a little bit unclear how to best implement this; there's an example here of the filtering syntax, but it seems...not the same as what we want.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants