Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3007.x] The zmq socket poll method needs to be awaited #66926

Open
wants to merge 3 commits into
base: 3007.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/65265.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Await on zmq monitor socket's poll method to fix publish server reliability in
environment's with a large amount of minions.
2 changes: 1 addition & 1 deletion salt/transport/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ def start_io_loop(self, io_loop):
async def consume(self):
while self._running.is_set():
try:
if self._monitor_socket.poll():
if await self._monitor_socket.poll():
msg = await self._monitor_socket.recv_multipart()
self.monitor_callback(msg)
else:
Expand Down
82 changes: 82 additions & 0 deletions tests/pytests/scenarios/transport/test_zeromq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import asyncio
import logging
import multiprocessing
import time

import pytest

try:
import zmq

import salt.transport.zeromq
except ImportError:
zmq = None


log = logging.getLogger(__name__)


def clients(recieved):
"""
Fire up 1000 publish socket clients and wait for a message.
"""
log.debug("Clients start")
context = zmq.asyncio.Context()
sockets = {}
for i in range(1000):
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5406")
socket.setsockopt(zmq.SUBSCRIBE, b"")
sockets[i] = socket
log.debug("Clients connected")

async def check():
start = time.time()
while time.time() - start < 60:
n = 0
for i in list(sockets):
if await sockets[i].poll():
msg = await sockets[i].recv()
n += 1
log.debug(
"Client %d got message %s total %d", i, msg, recieved.value
)
sockets[i].close(0)
sockets.pop(i)
with recieved.get_lock():
recieved.value += n
await asyncio.sleep(0.3)

asyncio.run(check())


@pytest.mark.skipif(not zmq, reason="Zeromq not installed")
def test_issue_regression_65265():
"""
Regression test for 65265. This test will not fail 100% of the time prior
twangboy marked this conversation as resolved.
Show resolved Hide resolved
to the fix for 65265. However, it does pass reliably with the issue fixed.
"""
recieved = multiprocessing.Value("i", 0)
process_manager = salt.utils.process.ProcessManager(wait_for_kill=5)
opts = {"ipv6": False, "zmq_filtering": False, "zmq_backlog": 1000, "pub_hwm": 1000}
process_manager.add_process(clients, args=(recieved,))
process_manager.add_process(clients, args=(recieved,))
process_manager.add_process(clients, args=(recieved,))
# Give some time for all clients to start up before starting server.
time.sleep(10)
server = salt.transport.zeromq.PublishServer(
opts, pub_host="127.0.0.1", pub_port=5406, pull_path="/tmp/pull.ipc"
)
process_manager.add_process(server.publish_daemon, args=(server.publish_payload,))
# Wait some more for the server to start up completely.
time.sleep(10)
asyncio.run(server.publish(b"asdf"))
log.debug("After publish")
# Give time for clients to receive thier messages.
time.sleep(10)
try:
with recieved.get_lock():
total = recieved.value
assert total == 3000
finally:
process_manager.terminate()
Loading