Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3007.x] The zmq socket poll method needs to be awaited #66926

Open
wants to merge 3 commits into
base: 3007.x
Choose a base branch
from

Conversation

dwoz
Copy link
Contributor

@dwoz dwoz commented Sep 26, 2024

When using zmq.asyncio.Context, the socket's poll method is a coroutine.

What does this PR do?

What issues does this PR fix or reference?

Fixes #65265

Previous Behavior

Remove this section if not relevant

New Behavior

Remove this section if not relevant

Merge requirements satisfied?

[NOTICE] Bug fixes or features added to Salt require tests.

When using zmq.asyncio.Context, the socket's poll method is a coroutine.
dmurphy18
dmurphy18 previously approved these changes Sep 26, 2024
twangboy
twangboy previously approved these changes Sep 26, 2024
@Sxderp
Copy link

Sxderp commented Sep 27, 2024

Just to clarify what I think is going on here. The async method is not awaited and thus returns the coroutine object. This object always evaluates to True and thus we hit the await recv_multipart() which either hangs or fails or both, causing issues.

Anyway. Some things that may be worth considering:

If self._monitor_socket.poll() must be awaited. Then the call in start_poll should be awaited as well?

The new test seems very similar to these:

(tcp)

async def test_pub_channel(master_opts, io_loop):
server = salt.transport.zeromq.PublishServer(
master_opts,
pub_host="127.0.0.1",
pub_port=4506,
pull_path=os.path.join(master_opts["sock_dir"], "publish_pull.ipc"),
)
payloads = []
async def publish_payload(payload):
await server.publish_payload(payload)
payloads.append(payload)
io_loop.add_callback(
server.publisher,
publish_payload,
ioloop=io_loop,
)
await asyncio.sleep(3)
await server.publish(salt.payload.dumps({"meh": "bah"}))
start = time.monotonic()
try:
while not payloads:
await asyncio.sleep(0.3)
if time.monotonic() - start > 30:
assert False, "No message received after 30 seconds"
assert payloads
finally:
server.close()

(zeromq)
async def test_pub_channel(master_opts, minion_opts, io_loop):
def presence_callback(client):
pass
def remove_presence_callback(client):
pass
master_opts["transport"] = "tcp"
minion_opts.update(master_ip="127.0.0.1", transport="tcp")
server = salt.transport.tcp.TCPPublishServer(
master_opts,
pub_host="127.0.0.1",
pub_port=master_opts["publish_port"],
pull_path=os.path.join(master_opts["sock_dir"], "publish_pull.ipc"),
)
client = salt.transport.tcp.TCPPubClient(
minion_opts,
io_loop,
host="127.0.0.1",
port=master_opts["publish_port"],
)
payloads = []
publishes = []
async def publish_payload(payload, callback):
await server.publish_payload(payload)
payloads.append(payload)
async def on_recv(message):
publishes.append(message)
io_loop.add_callback(
server.publisher, publish_payload, presence_callback, remove_presence_callback
)
# Wait for socket to bind.
await tornado.gen.sleep(3)
await client.connect(master_opts["publish_port"])
client.on_recv(on_recv)
await server.publish({"meh": "bah"})
start = time.monotonic()
try:
while not publishes:
await tornado.gen.sleep(0.3)
if time.monotonic() - start > 30:
assert False, "Message not published after 30 seconds"
finally:
server.close()
server.pub_server.close()
client.close()

With a few key differences.

  1. The existing ZeroMQ does not check for response while the TCP one and this PR do. The error may have been caught if that test was expanded.
  2. This PR spins up more clients.
  3. The existing tests use PublishClient rather than raw transport (probably a good thing?).

Does it make more sense to make a more generic "transports" test that tests ALL the transports (WebSocket is missing these tests) and ensure the PublishClient / PublishServer work with both inet and unix sockets (ipc_mode config) for each transport?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants