Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for mergePreferred #3229

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

shagoon
Copy link

@shagoon shagoon commented Jun 2, 2023

This PR adds support for mergePreferred like akka provides.

Internally two bounded CE Queues are used, mainly because they provide tryTake for peeking the queue of the preferred stream. If no data is available on the preferred stream, a race is started on both queues. The fibre of the looser will be reused when retrieving the next element.

Testing is kind of hard because race() is not deterministic. Since tests are missing atm, I created this as a draft PR. It's ready for review, but not ready to be merged. Please comment.

@armanbilge
Copy link
Member

Testing is kind of hard because race() is not deterministic.

If you use the mock runtime in TestControl.executeEmbed(...) then they will be deterministic in the sense that all sleeps will be precise e.g. in race(sleep(1.milli), sleep(2.millis)) the left side will always win. Ties will be non-deterministic.

@shagoon
Copy link
Author

shagoon commented Jun 2, 2023

TestControl.executeEmbed

I added some tests, atm without TestControl.executeEmbed. test("mergePreferred prefers") actually cheats by delaying the non-preferred stream. Without the delay, when fetching the first element, a race will be started (since preferredQueue probably would have nothing to offer at that moment), resulting in non-deterministic behaviour (i.e. non-preferred might win the race).

@shagoon
Copy link
Author

shagoon commented Jun 5, 2023

I adjusted the test of the behaviour of mergePreferred to work without any tweaks of timing (i.e. no delay, metered or TestControl.executeEmbed, etc.). It tolerates some (currently 2%) of the elements of the resulting stream to originate from the non preferred stream. This can happen, if the preferred stream did not yet offer an element to its queue, resulting in a race() on both queues, which the non preferred queue might win.

@shagoon shagoon marked this pull request as ready for review June 5, 2023 06:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants