Skip to content

Commit

Permalink
Add TryAcquireClient and AsyncAcquireClient (#67)
Browse files Browse the repository at this point in the history
* Optimize(linux): Use `/dev/fd/$fd` to switch to fifo when pipe is received

Fifo uses a cloned file descriptor, which can set
`O_NONBLOCK1 without affecing other users' of josberver.

Signed-off-by: Jiahao XU <[email protected]>

* Impl `TryacquireClient`

Signed-off-by: Jiahao XU <[email protected]>

* Impl `TryAcquireClient::into_inner`

Signed-off-by: Jiahao XU <[email protected]>

* Fix linux optimization in `unix::Client::from_pipe

Signed-off-by: Jiahao XU <[email protected]>

* Impl `try_acquire` for wasm and windows impl

Signed-off-by: Jiahao XU <[email protected]>

* Fix `Client::into_try_acquire_client`

The `TryAcquireClient` returned, whether as `Ok` or `Err`,
must have `O_NONBLOCK` set.

Signed-off-by: Jiahao XU <[email protected]>

* Fix linux optimization in `unix::Client::from_pipe`

Signed-off-by: Jiahao XU <[email protected]>

* Impl `AsyncAcquireClient`

Signed-off-by: Jiahao XU <[email protected]>

* Fix msrv on windows

Signed-off-by: Jiahao XU <[email protected]>

* Fix compilation on windows

Signed-off-by: Jiahao XU <[email protected]>

* Impl `AsyncAcquireClient::{acquire, acquire_owned}`

And move `AsyncAcquireClient` into a separate module

Signed-off-by: Jiahao XU <[email protected]>

* Add CI checking for wasm32-wasi

Signed-off-by: Jiahao XU <[email protected]>

* Add missing `async_client.rs`

Forgot to commit

Signed-off-by: Jiahao XU <[email protected]>

* Fix msrv-wasm

Signed-off-by: Jiahao XU <[email protected]>

* FIx msrv-wasm

Signed-off-by: Jiahao XU <[email protected]>

* Impl `AsyncAcquireClient` for wasm targets

Signed-off-by: Jiahao XU <[email protected]>

* Fix `wasm.rs`

Signed-off-by: Jiahao XU <[email protected]>

* Add tests

Signed-off-by: Jiahao XU <[email protected]>

* Fix `TryAcquireClient`: Set non-blocking on first instance of client

And clear it on last instance of client.

Signed-off-by: Jiahao XU <[email protected]>

* Optimize `wasm` client impl

Rm unused `Arc`

Signed-off-by: Jiahao XU <[email protected]>

* Optimize `unix::Client::acquire`

Avoid call to `poll` unless `EAGAIN` is returned

Signed-off-by: Jiahao XU <[email protected]>

---------

Signed-off-by: Jiahao XU <[email protected]>
  • Loading branch information
NobodyXu authored Mar 17, 2024
1 parent 1bff0ce commit 1dd92b6
Show file tree
Hide file tree
Showing 9 changed files with 555 additions and 99 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,14 @@ jobs:
rustup toolchain add stable --no-self-update --component rustfmt
rustup default stable
- run: cargo fmt -- --check

wasm:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: |
rustup toolchain add stable --no-self-update --target wasm32-wasi
rustup default stable
- uses: Swatinem/rust-cache@v2
- run: cargo check --target wasm32-wasi
17 changes: 17 additions & 0 deletions .github/workflows/msrv.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,20 @@ jobs:
cargo +nightly -Zminimal-versions update
- uses: Swatinem/rust-cache@v2
- run: cargo check --all-features

msrv-wasm:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Install Rust
run: |
rustup toolchain add 1.58 --no-self-update --target wasm32-wasi
rustup toolchain add nightly --no-self-update
rustup default 1.58
- name: Use minimal version and create Cargo.lock
run: |
./avoid-dev-deps.sh
cargo +nightly -Zminimal-versions update
- uses: Swatinem/rust-cache@v2
- run: cargo check --target wasm32-wasi
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ rustdoc-args = ["--cfg", "docsrs"]
cfg-if = "1.0.0"
tokio = { version = "1", default-features = false, features = [
"process",
"net",
], optional = true }
scopeguard = "1.1.0"
derive_destructure2 = "0.1.2"

[target.'cfg(any(unix, windows))'.dependencies]
# Features:
Expand Down
126 changes: 126 additions & 0 deletions src/async_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::{
fmt,
future::Future,
io, ops,
pin::Pin,
task::{Context, Poll},
};

#[cfg(unix)]
use tokio::io::{unix::AsyncFd, Interest};

use crate::{Acquired, TryAcquireClient};

#[cfg(unix)]
type AsyncAcquireClientInner = AsyncFd<TryAcquireClient>;

#[cfg(not(unix))]
type AsyncAcquireClientInner = TryAcquireClient;

/// Extension of [`Client`] that supports async acquire.
#[derive(Debug)]
pub struct AsyncAcquireClient(AsyncAcquireClientInner);

impl ops::Deref for AsyncAcquireClient {
type Target = TryAcquireClient;

fn deref(&self) -> &Self::Target {
#[cfg(unix)]
return self.0.get_ref();

#[cfg(not(unix))]
return &self.0;
}
}

impl AsyncAcquireClient {
/// Create async acquire client
pub fn new(try_acquire_client: TryAcquireClient) -> io::Result<Self> {
#[cfg(unix)]
return AsyncFd::with_interest(try_acquire_client, Interest::READABLE).map(Self);

#[cfg(not(unix))]
return Ok(Self(try_acquire_client));
}

/// Deregisters and returns [`TryAcquireClient`]
pub fn into_inner(self) -> TryAcquireClient {
#[cfg(unix)]
return self.0.into_inner();

#[cfg(not(unix))]
return self.0;
}

/// Async poll version of [`crate::Client::acquire`]
pub fn poll_acquire(&self, cx: &mut Context<'_>) -> Poll<io::Result<Acquired>> {
#[cfg(unix)]
return loop {
let mut ready_guard = match self.0.poll_read_ready(cx) {
Poll::Pending => break Poll::Pending,
Poll::Ready(res) => res?,
};

if let Some(acquired) = self.try_acquire()? {
break Poll::Ready(Ok(acquired));
} else {
ready_guard.clear_ready();
}
};

#[cfg(not(unix))]
return self
.0
.0
.0
.inner
.poll_acquire(cx)
.map_ok(|data| Acquired::new(&self.0, data));
}

/// Async version of [`crate::Client::acquire`]
pub fn acquire(&self) -> impl Future<Output = io::Result<Acquired>> + Send + Sync + Unpin + '_ {
poll_fn(move |cx| self.poll_acquire(cx))
}

/// Async owned version of [`crate::Client::acquire`]
pub fn acquire_owned(
self,
) -> impl Future<Output = io::Result<Acquired>> + Send + Sync + Unpin + 'static {
poll_fn(move |cx| self.poll_acquire(cx))
}
}

// Code below is copied from https://doc.rust-lang.org/nightly/src/core/future/poll_fn.rs.html#143-153

fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}

#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollFn<F> {
f: F,
}

impl<F: Unpin> Unpin for PollFn<F> {}

impl<F> fmt::Debug for PollFn<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollFn").finish()
}
}

impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
// SAFETY: We are not moving out of the pinned field.
(unsafe { &mut self.get_unchecked_mut().f })(cx)
}
}
Loading

0 comments on commit 1dd92b6

Please sign in to comment.