Skip to content

Commit

Permalink
Use async fn for Service::ready() and Service::shutdown() (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored May 28, 2024
1 parent dec6ab0 commit b04bdf4
Show file tree
Hide file tree
Showing 33 changed files with 282 additions and 296 deletions.
8 changes: 4 additions & 4 deletions ntex-async-std/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-async-std"
version = "0.4.0"
version = "0.5.0"
authors = ["ntex contributors <[email protected]>"]
description = "async-std intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -16,9 +16,9 @@ name = "ntex_async_std"
path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1.21"
ntex-io = "1.0.0"
ntex-util = "1.0.0"
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-util = "2.0"
log = "0.4"
async-std = { version = "1", features = ["unstable"] }
oneshot = { version = "0.1", default-features = false, features = ["async"] }
8 changes: 4 additions & 4 deletions ntex-glommio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-glommio"
version = "0.4.0"
version = "0.5.0"
authors = ["ntex contributors <[email protected]>"]
description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -16,9 +16,9 @@ name = "ntex_glommio"
path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1.24"
ntex-io = "1.0.0"
ntex-util = "1.0.0"
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-util = "2.0"
futures-lite = "2.2"
log = "0.4"
oneshot = { version = "0.1", default-features = false, features = ["async"] }
Expand Down
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.0.0] - 2024-05-28

* Use async fn for Service::ready() and Service::shutdown()

## [1.2.0] - 2024-05-12

* Better write back-pressure handling
Expand Down
6 changes: 3 additions & 3 deletions ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "1.2.0"
version = "2.0.0"
authors = ["ntex contributors <[email protected]>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -18,8 +18,8 @@ path = "src/lib.rs"
[dependencies]
ntex-codec = "0.6.2"
ntex-bytes = "0.1.24"
ntex-util = "1.0"
ntex-service = "2.0"
ntex-util = "2.0"
ntex-service = "3.0"

bitflags = "2"
log = "0.4"
Expand Down
40 changes: 11 additions & 29 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
#![allow(clippy::let_underscore_future)]
use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder};
use ntex_service::{IntoService, Pipeline, PipelineCall, Service};
use ntex_service::{IntoService, Pipeline, PipelineBinding, PipelineCall, Service};
use ntex_util::{future::Either, ready, spawn, time::Seconds};

use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError};
Expand Down Expand Up @@ -144,7 +143,6 @@ where
flags: Flags,
shared: Rc<DispatcherShared<S, U>>,
response: Option<PipelineCall<S, DispatchItem<U>>>,
pool: Pool,
cfg: DispatcherConfig,
read_remains: u32,
read_remains_prev: u32,
Expand All @@ -158,7 +156,7 @@ where
{
io: IoBoxed,
codec: U,
service: Pipeline<S>,
service: PipelineBinding<S, DispatchItem<U>>,
error: Cell<Option<DispatcherError<S::Error, <U as Encoder>::Error>>>,
inflight: Cell<usize>,
}
Expand Down Expand Up @@ -194,7 +192,7 @@ impl<S, U> From<Either<S, U>> for DispatcherError<S, U> {

impl<S, U> Dispatcher<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
U: Decoder + Encoder + 'static,
{
/// Construct new `Dispatcher` instance.
Expand All @@ -217,18 +215,16 @@ where
Flags::KA_ENABLED
};

let pool = io.memory_pool().pool();
let shared = Rc::new(DispatcherShared {
io,
codec,
error: Cell::new(None),
inflight: Cell::new(0),
service: Pipeline::new(service.into_service()),
service: Pipeline::new(service.into_service()).bind(),
});

Dispatcher {
inner: DispatcherInner {
pool,
shared,
flags,
cfg: cfg.clone(),
Expand Down Expand Up @@ -284,14 +280,6 @@ where
}
}

// handle memory pool pressure
if slf.pool.poll_ready(cx).is_pending() {
slf.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
slf.shared.io.stop_timer();
slf.shared.io.pause();
return Poll::Pending;
}

loop {
match slf.st {
DispatcherState::Processing => {
Expand Down Expand Up @@ -434,7 +422,7 @@ where
U: Decoder + Encoder + 'static,
{
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) {
let mut fut = self.shared.service.call_static(item);
let mut fut = self.shared.service.call(item);
self.shared.inflight.set(self.shared.inflight.get() + 1);

// optimize first call
Expand Down Expand Up @@ -682,26 +670,21 @@ mod tests {
U: Decoder + Encoder + 'static,
{
/// Construct new `Dispatcher` instance
pub(crate) fn debug<T: IoStream, F: IntoService<S, DispatchItem<U>>>(
io: T,
codec: U,
service: F,
) -> (Self, State) {
pub(crate) fn debug<T: IoStream>(io: T, codec: U, service: S) -> (Self, State) {
let cfg = DispatcherConfig::default()
.set_keepalive_timeout(Seconds(1))
.clone();
Self::debug_cfg(io, codec, service, cfg)
}

/// Construct new `Dispatcher` instance
pub(crate) fn debug_cfg<T: IoStream, F: IntoService<S, DispatchItem<U>>>(
pub(crate) fn debug_cfg<T: IoStream>(
io: T,
codec: U,
service: F,
service: S,
cfg: DispatcherConfig,
) -> (Self, State) {
let state = Io::new(io);
let pool = state.memory_pool().pool();
state.set_disconnect_timeout(cfg.disconnect_timeout());
state.set_tag("DBG");

Expand All @@ -719,7 +702,7 @@ mod tests {
io: state.into(),
error: Cell::new(None),
inflight: Cell::new(0),
service: Pipeline::new(service.into_service()),
service: Pipeline::new(service).bind(),
});

(
Expand All @@ -731,7 +714,6 @@ mod tests {
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
pool,
shared,
cfg,
flags,
Expand Down Expand Up @@ -864,9 +846,9 @@ mod tests {
type Response = Option<Response<BytesCodec>>;
type Error = ();

fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), ()> {
self.0.set(self.0.get() + 1);
Poll::Ready(Err(()))
Err(())
}

async fn call(
Expand Down
4 changes: 4 additions & 0 deletions ntex-net/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.0.0] - 2024-05-28

* Use async fn for Service::ready() and Service::shutdown()

## [1.0.2] - 2024-03-30

* Fix glommio compat feature #327
Expand Down
16 changes: 8 additions & 8 deletions ntex-net/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-net"
version = "1.0.2"
version = "2.0.0"
authors = ["ntex contributors <[email protected]>"]
description = "ntexwork utils for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand Down Expand Up @@ -28,16 +28,16 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"]
async-std = ["ntex-rt/async-std", "ntex-async-std"]

[dependencies]
ntex-service = "2.0"
ntex-bytes = "0.1.24"
ntex-service = "3.0"
ntex-bytes = "0.1"
ntex-http = "0.1"
ntex-io = "1.0"
ntex-io = "2.0"
ntex-rt = "0.4.11"
ntex-util = "1.0"
ntex-util = "2.0"

ntex-tokio = { version = "0.4.0", optional = true }
ntex-glommio = { version = "0.4.0", optional = true }
ntex-async-std = { version = "0.4.0", optional = true }
ntex-tokio = { version = "0.5.0", optional = true }
ntex-glommio = { version = "0.5.0", optional = true }
ntex-async-std = { version = "0.5.0", optional = true }

log = "0.4"
thiserror = "1.0"
Expand Down
9 changes: 5 additions & 4 deletions ntex-net/src/connect/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use ntex_util::future::Either;

use super::{Address, Connect, ConnectError};

#[derive(Copy)]
/// DNS Resolver Service
pub struct Resolver<T>(marker::PhantomData<T>);

Expand All @@ -17,6 +16,8 @@ impl<T> Resolver<T> {
}
}

impl<T> Copy for Resolver<T> {}

impl<T: Address> Resolver<T> {
/// Lookup ip addresses for provided host
pub async fn lookup(&self, req: Connect<T>) -> Result<Connect<T>, ConnectError> {
Expand Down Expand Up @@ -100,7 +101,7 @@ impl<T> Default for Resolver<T> {

impl<T> Clone for Resolver<T> {
fn clone(&self) -> Self {
Resolver(marker::PhantomData)
*self
}
}

Expand All @@ -117,7 +118,7 @@ impl<T: Address, C> ServiceFactory<Connect<T>, C> for Resolver<T> {
type InitError = ();

async fn create(&self, _: C) -> Result<Self::Service, Self::InitError> {
Ok(self.clone())
Ok(*self)
}
}

Expand All @@ -144,7 +145,7 @@ mod tests {
async fn resolver() {
let resolver = Resolver::default().clone();
assert!(format!("{:?}", resolver).contains("Resolver"));
let srv = resolver.pipeline(()).await.unwrap();
let srv = resolver.pipeline(()).await.unwrap().bind();
assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready());

let res = srv.call(Connect::new("www.rust-lang.org")).await;
Expand Down
12 changes: 5 additions & 7 deletions ntex-net/src/connect/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ use ntex_util::future::{BoxFuture, Either};
use super::{Address, Connect, ConnectError, Resolver};
use crate::tcp_connect_in;

#[derive(Copy)]
/// Basic tcp stream connector
pub struct Connector<T> {
resolver: Resolver<T>,
pool: PoolRef,
tag: &'static str,
}

impl<T> Copy for Connector<T> {}

impl<T> Connector<T> {
/// Construct new connect service with default dns resolver
pub fn new() -> Self {
Expand Down Expand Up @@ -85,11 +87,7 @@ impl<T> Default for Connector<T> {

impl<T> Clone for Connector<T> {
fn clone(&self) -> Self {
Connector {
resolver: self.resolver.clone(),
tag: self.tag,
pool: self.pool,
}
*self
}
}

Expand All @@ -110,7 +108,7 @@ impl<T: Address, C> ServiceFactory<Connect<T>, C> for Connector<T> {
type InitError = ();

async fn create(&self, _: C) -> Result<Self::Service, Self::InitError> {
Ok(self.clone())
Ok(*self)
}
}

Expand Down
4 changes: 4 additions & 0 deletions ntex-server/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.0.0] - 2024-05-28

* Use async fn for Service::ready() and Service::shutdown()

## [1.0.5] - 2024-04-02

* Fix external configuration handling
Expand Down
12 changes: 6 additions & 6 deletions ntex-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-server"
version = "1.0.5"
version = "2.0.0"
authors = ["ntex contributors <[email protected]>"]
description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -16,11 +16,11 @@ name = "ntex_server"
path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1.24"
ntex-net = "1.0"
ntex-service = "2.0"
ntex-rt = "0.4.13"
ntex-util = "1.0"
ntex-bytes = "0.1"
ntex-net = "2.0"
ntex-service = "3.0"
ntex-rt = "0.4"
ntex-util = "2.0"

async-channel = "2"
async-broadcast = "0.7"
Expand Down
13 changes: 10 additions & 3 deletions ntex-server/src/net/counter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cell::Cell, rc::Rc, task};
use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll};

use ntex_util::task::LocalWaker;

Expand Down Expand Up @@ -30,8 +30,15 @@ impl Counter {

/// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task.
pub(super) fn available(&self, cx: &mut task::Context<'_>) -> bool {
self.0.available(cx)
pub(super) async fn available(&self) {
poll_fn(|cx| {
if self.0.available(cx) {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await
}

/// Get total number of acquired counts
Expand Down
Loading

0 comments on commit b04bdf4

Please sign in to comment.