Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return submission index to track down completion of async callbacks #6360

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ By @bradwerth [#6216](https://github.com/gfx-rs/wgpu/pull/6216).
#### General

- Add `VideoFrame` to `ExternalImageSource` enum. By @jprochazk in [#6170](https://github.com/gfx-rs/wgpu/pull/6170)
- Return submission index in `map_async` and `on_submitted_work_done` to track down completion of async callbacks. By @eliemichel in [#6360](https://github.com/gfx-rs/wgpu/pull/6360)

#### Vulkan

Expand Down
30 changes: 12 additions & 18 deletions wgpu-core/src/device/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2147,33 +2147,27 @@ impl Global {
offset: BufferAddress,
size: Option<BufferAddress>,
op: BufferMapOperation,
) -> BufferAccessResult {
) -> Result<crate::SubmissionIndex, BufferAccessError> {
profiling::scope!("Buffer::map_async");
api_log!("Buffer::map_async {buffer_id:?} offset {offset:?} size {size:?} op: {op:?}");

let hub = &self.hub;

let op_and_err = 'error: {
let buffer = match hub.buffers.get(buffer_id).get() {
Ok(buffer) => buffer,
Err(e) => break 'error Some((op, e.into())),
};

buffer.map_async(offset, size, op).err()
let map_result = match hub.buffers.get(buffer_id).get() {
Ok(buffer) => buffer.map_async(offset, size, op),
Err(e) => Err((op, e.into())),
};

// User callbacks must not be called while holding `buffer.map_async`'s locks, so we
// defer the error callback if it needs to be called immediately (typically when running
// into errors).
if let Some((mut operation, err)) = op_and_err {
if let Some(callback) = operation.callback.take() {
callback.call(Err(err.clone()));
match map_result {
Ok(submission_index) => Ok(submission_index),
Err((mut operation, err)) => {
if let Some(callback) = operation.callback.take() {
callback.call(Err(err.clone()));
}
log::error!("Buffer::map_async error: {err}");
Err(err)
}
log::error!("Buffer::map_async error: {err}");
return Err(err);
}

Ok(())
}

pub fn buffer_get_mapped_range(
Expand Down
7 changes: 6 additions & 1 deletion wgpu-core/src/device/life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,20 @@ impl LifetimeTracker {
}
}

pub fn add_work_done_closure(&mut self, closure: SubmittedWorkDoneClosure) {
pub fn add_work_done_closure(
&mut self,
closure: SubmittedWorkDoneClosure,
) -> Option<SubmissionIndex> {
match self.active.last_mut() {
Some(active) => {
active.work_done_closures.push(closure);
Some(active.index)
}
// We must defer the closure until all previously occurring map_async closures
// have fired. This is required by the spec.
None => {
self.work_done_closures.push(closure);
None
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions wgpu-core/src/device/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,12 +1334,19 @@ impl Global {
&self,
queue_id: QueueId,
closure: SubmittedWorkDoneClosure,
) {
) -> SubmissionIndex {
api_log!("Queue::on_submitted_work_done {queue_id:?}");

//TODO: flush pending writes
let queue = self.hub.queues.get(queue_id);
queue.device.lock_life().add_work_done_closure(closure);
let result = queue.device.lock_life().add_work_done_closure(closure);
match result {
Some(submission_index) => submission_index,
None => queue
.device
.last_successful_submission_index
.load(Ordering::Acquire),
}
}
}

Expand Down
13 changes: 9 additions & 4 deletions wgpu-core/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
resource_log,
snatch::{SnatchGuard, Snatchable},
track::{SharedTrackerIndexAllocator, TextureSelector, TrackerIndex},
Label, LabelHelpers,
Label, LabelHelpers, SubmissionIndex,
};

use smallvec::SmallVec;
Expand Down Expand Up @@ -303,7 +303,7 @@ impl BufferMapCallback {
// SAFETY: the contract of the call to from_c says that this unsafe is sound.
BufferMapCallbackInner::C { inner } => unsafe {
let status = match result {
Ok(()) => BufferMapAsyncStatus::Success,
Ok(_) => BufferMapAsyncStatus::Success,
Err(BufferAccessError::Device(_)) => BufferMapAsyncStatus::ContextLost,
Err(BufferAccessError::InvalidResource(_))
| Err(BufferAccessError::DestroyedResource(_)) => BufferMapAsyncStatus::Invalid,
Expand Down Expand Up @@ -537,7 +537,7 @@ impl Buffer {
offset: wgt::BufferAddress,
size: Option<wgt::BufferAddress>,
op: BufferMapOperation,
) -> Result<(), (BufferMapOperation, BufferAccessError)> {
) -> Result<SubmissionIndex, (BufferMapOperation, BufferAccessError)> {
let range_size = if let Some(size) = size {
size
} else if offset > self.size {
Expand Down Expand Up @@ -624,9 +624,14 @@ impl Buffer {
.buffers
.set_single(self, internal_use);

// TODO: should we increment last_successful_submission_index instead?
let submit_index = device
.active_submission_index
.fetch_add(1, core::sync::atomic::Ordering::SeqCst)
+ 1;
device.lock_life().map(self);

Ok(())
Ok(submit_index)
}

// Note: This must not be called while holding a lock.
Expand Down
8 changes: 5 additions & 3 deletions wgpu/src/api/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl<'a> BufferSlice<'a> {
&self,
mode: MapMode,
callback: impl FnOnce(Result<(), BufferAsyncError>) + WasmNotSend + 'static,
) {
) -> SubmissionIndex {
let mut mc = self.buffer.map_context.lock();
assert_eq!(mc.initial_range, 0..0, "Buffer is already mapped");
let end = match self.size {
Expand All @@ -346,13 +346,15 @@ impl<'a> BufferSlice<'a> {
};
mc.initial_range = self.offset..end;

DynContext::buffer_map_async(
let data = DynContext::buffer_map_async(
&*self.buffer.context,
self.buffer.data.as_ref(),
mode,
self.offset..end,
Box::new(callback),
)
);

SubmissionIndex { data }
}

/// Gain read-only access to the bytes of a [mapped] [`Buffer`].
Expand Down
10 changes: 7 additions & 3 deletions wgpu/src/api/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,15 @@ impl Queue {
/// has completed. There are no restrictions on the code you can run in the callback, however on native the
/// call to the function will not complete until the callback returns, so prefer keeping callbacks short
/// and used to set flags, send messages, etc.
pub fn on_submitted_work_done(&self, callback: impl FnOnce() + Send + 'static) {
DynContext::queue_on_submitted_work_done(
pub fn on_submitted_work_done(
&self,
callback: impl FnOnce() + Send + 'static,
) -> SubmissionIndex {
let data = DynContext::queue_on_submitted_work_done(
&*self.context,
self.data.as_ref(),
Box::new(callback),
)
);
SubmissionIndex { data }
}
}
11 changes: 6 additions & 5 deletions wgpu/src/backend/wgpu_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1391,7 +1391,7 @@ impl crate::Context for ContextWgpuCore {
mode: MapMode,
range: Range<wgt::BufferAddress>,
callback: crate::context::BufferMapCallback,
) {
) -> Self::SubmissionIndexData {
let operation = wgc::resource::BufferMapOperation {
host: match mode {
MapMode::Read => wgc::device::HostMap::Read,
Expand All @@ -1411,9 +1411,10 @@ impl crate::Context for ContextWgpuCore {
Some(range.end - range.start),
operation,
) {
Ok(()) => (),
Ok(index) => index,
Err(cause) => {
self.handle_error_nolabel(&buffer_data.error_sink, cause, "Buffer::map_async")
self.handle_error_nolabel(&buffer_data.error_sink, cause, "Buffer::map_async");
Self::SubmissionIndexData::MAX // invalid submission index
}
}
}
Expand Down Expand Up @@ -2095,9 +2096,9 @@ impl crate::Context for ContextWgpuCore {
&self,
queue_data: &Self::QueueData,
callback: crate::context::SubmittedWorkDoneCallback,
) {
) -> Self::SubmissionIndexData {
let closure = wgc::device::queue::SubmittedWorkDoneClosure::from_rust(callback);
self.0.queue_on_submitted_work_done(queue_data.id, closure);
self.0.queue_on_submitted_work_done(queue_data.id, closure)
}

fn device_start_capture(&self, device_data: &Self::DeviceData) {
Expand Down
18 changes: 10 additions & 8 deletions wgpu/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ pub trait Context: Debug + WasmNotSendSync + Sized {
mode: MapMode,
range: Range<BufferAddress>,
callback: BufferMapCallback,
);
) -> Self::SubmissionIndexData;
fn buffer_get_mapped_range(
&self,
buffer_data: &Self::BufferData,
Expand Down Expand Up @@ -413,7 +413,7 @@ pub trait Context: Debug + WasmNotSendSync + Sized {
&self,
queue_data: &Self::QueueData,
callback: SubmittedWorkDoneCallback,
);
) -> Self::SubmissionIndexData;

fn device_start_capture(&self, device_data: &Self::DeviceData);
fn device_stop_capture(&self, device_data: &Self::DeviceData);
Expand Down Expand Up @@ -908,7 +908,7 @@ pub(crate) trait DynContext: Debug + WasmNotSendSync {
mode: MapMode,
range: Range<BufferAddress>,
callback: BufferMapCallback,
);
) -> Arc<crate::Data>;
fn buffer_get_mapped_range(
&self,
buffer_data: &crate::Data,
Expand Down Expand Up @@ -1092,7 +1092,7 @@ pub(crate) trait DynContext: Debug + WasmNotSendSync {
&self,
queue_data: &crate::Data,
callback: SubmittedWorkDoneCallback,
);
) -> Arc<crate::Data>;

fn device_start_capture(&self, data: &crate::Data);
fn device_stop_capture(&self, data: &crate::Data);
Expand Down Expand Up @@ -1688,9 +1688,10 @@ where
mode: MapMode,
range: Range<BufferAddress>,
callback: BufferMapCallback,
) {
) -> Arc<crate::Data> {
let buffer_data = downcast_ref(buffer_data);
Context::buffer_map_async(self, buffer_data, mode, range, callback)
let data = Context::buffer_map_async(self, buffer_data, mode, range, callback);
Arc::new(data) as _
}

fn buffer_get_mapped_range(
Expand Down Expand Up @@ -2111,9 +2112,10 @@ where
&self,
queue_data: &crate::Data,
callback: SubmittedWorkDoneCallback,
) {
) -> Arc<crate::Data> {
let queue_data = downcast_ref(queue_data);
Context::queue_on_submitted_work_done(self, queue_data, callback)
let data = Context::queue_on_submitted_work_done(self, queue_data, callback);
Arc::new(data) as _
}

fn device_start_capture(&self, device_data: &crate::Data) {
Expand Down