diff --git a/foxglove_bridge_base/include/foxglove_bridge/test/test_client.hpp b/foxglove_bridge_base/include/foxglove_bridge/test/test_client.hpp index 586328d..62742de 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/test/test_client.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/test/test_client.hpp @@ -11,8 +11,8 @@ namespace foxglove { -std::vector connectClientAndReceiveMsg(const std::string& uri, - const std::string& topic_name); +std::future> waitForChannelMsg(ClientInterface* client, + SubscriptionId subscriptionId); std::future> waitForParameters(std::shared_ptr client, const std::string& requestId = std::string()); diff --git a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp index d18d55a..5f6ee6e 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp @@ -1244,7 +1244,7 @@ void Server::handleUnsubscribe(const nlohmann::json& payloa ChannelId chanId = sub->first; _handlers.unsubscribeHandler(chanId, hdl); std::unique_lock clientsLock(_clientsMutex); - _clients.at(hdl).subscriptionsByChannel.erase(sub); + _clients.at(hdl).subscriptionsByChannel.erase(chanId); } } diff --git a/foxglove_bridge_base/src/test/test_client.cpp b/foxglove_bridge_base/src/test/test_client.cpp index 25457e8..5e5f612 100644 --- a/foxglove_bridge_base/src/test/test_client.cpp +++ b/foxglove_bridge_base/src/test/test_client.cpp @@ -8,46 +8,24 @@ namespace foxglove { -constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(5); - -std::vector connectClientAndReceiveMsg(const std::string& uri, - const std::string& topicName) { - // Set up text message handler to resolve the promise when the topic is advertised - auto wsClient = std::make_shared>(); - std::promise channelPromise; - auto channelFuture = waitForChannel(wsClient, topicName); - - // Connect the client and wait for the channel future - if (std::future_status::ready != wsClient->connect(uri).wait_for(DEFAULT_TIMEOUT)) { - throw std::runtime_error("Client failed to connect"); - } else if (std::future_status::ready != channelFuture.wait_for(DEFAULT_TIMEOUT)) { - throw std::runtime_error("Client failed to receive channel"); - } +std::future> waitForChannelMsg(ClientInterface* client, + SubscriptionId subscriptionId) { + // Set up binary message handler to resolve when a binary message has been received + auto promise = std::make_shared>>(); + auto future = promise->get_future(); - const auto channel = channelFuture.get(); - const SubscriptionId subscriptionId = 1; + client->setBinaryMessageHandler( + [promise = std::move(promise), subscriptionId](const uint8_t* data, size_t dataLength) { + if (ReadUint32LE(data + 1) != subscriptionId) { + return; + } + const size_t offset = 1 + 4 + 8; + std::vector dataCopy(dataLength - offset); + std::memcpy(dataCopy.data(), data + offset, dataLength - offset); + promise->set_value(std::move(dataCopy)); + }); - // Set up binary message handler to resolve when a binary message has been received - std::promise> msgPromise; - auto msgFuture = msgPromise.get_future(); - wsClient->setBinaryMessageHandler([&msgPromise](const uint8_t* data, size_t dataLength) { - if (ReadUint32LE(data + 1) != subscriptionId) { - return; - } - const size_t offset = 1 + 4 + 8; - std::vector dataCopy(dataLength - offset); - std::memcpy(dataCopy.data(), data + offset, dataLength - offset); - msgPromise.set_value(std::move(dataCopy)); - }); - - // Subscribe to the channel that corresponds to the topic - wsClient->subscribe({{subscriptionId, channel.id}}); - - // Wait until we have received a binary message - if (std::future_status::ready != msgFuture.wait_for(DEFAULT_TIMEOUT)) { - throw std::runtime_error("Client failed to receive message"); - } - return msgFuture.get(); + return future; } std::future> waitForParameters(std::shared_ptr client, diff --git a/ros1_foxglove_bridge/tests/smoke_test.cpp b/ros1_foxglove_bridge/tests/smoke_test.cpp index 60aada0..72afd91 100644 --- a/ros1_foxglove_bridge/tests/smoke_test.cpp +++ b/ros1_foxglove_bridge/tests/smoke_test.cpp @@ -80,10 +80,24 @@ TEST(SmokeTest, testSubscription) { // Connect a few clients and make sure that they receive the correct message const auto clientCount = 3; for (auto i = 0; i < clientCount; ++i) { - std::vector msgData; - ASSERT_NO_THROW(msgData = foxglove::connectClientAndReceiveMsg(URI, topic_name)); + // Set up a client and subscribe to the channel. + auto client = std::make_shared>(); + auto channelFuture = foxglove::waitForChannel(client, topic_name); + ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND)); + ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND)); + const foxglove::Channel channel = channelFuture.get(); + const foxglove::SubscriptionId subscriptionId = 1; + + // Subscribe to the channel and confirm that the promise resolves + auto msgFuture = waitForChannelMsg(client.get(), subscriptionId); + client->subscribe({{subscriptionId, channel.id}}); + ASSERT_EQ(std::future_status::ready, msgFuture.wait_for(ONE_SECOND)); + const auto msgData = msgFuture.get(); ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size()); EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size())); + + // Unsubscribe from the channel again. + client->unsubscribe({subscriptionId}); } } @@ -95,11 +109,24 @@ TEST(SmokeTest, testSubscriptionParallel) { pub.publish(std::string("hello world")); // Connect a few clients (in parallel) and make sure that they receive the correct message + const foxglove::SubscriptionId subscriptionId = 1; + auto clients = { + std::make_shared>(), + std::make_shared>(), + std::make_shared>(), + }; + std::vector>> futures; - const auto clientCount = 3; - for (auto i = 0; i < clientCount; ++i) { - futures.push_back( - std::async(std::launch::async, foxglove::connectClientAndReceiveMsg, URI, topic_name)); + for (auto client : clients) { + futures.push_back(waitForChannelMsg(client.get(), subscriptionId)); + } + + for (auto client : clients) { + auto channelFuture = foxglove::waitForChannel(client, topic_name); + ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND)); + ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND)); + const foxglove::Channel channel = channelFuture.get(); + client->subscribe({{subscriptionId, channel.id}}); } for (auto& future : futures) { @@ -108,6 +135,10 @@ TEST(SmokeTest, testSubscriptionParallel) { ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size()); EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size())); } + + for (auto client : clients) { + client->unsubscribe({subscriptionId}); + } } TEST(SmokeTest, testPublishing) { diff --git a/ros2_foxglove_bridge/tests/smoke_test.cpp b/ros2_foxglove_bridge/tests/smoke_test.cpp index d9699e1..17f639e 100644 --- a/ros2_foxglove_bridge/tests/smoke_test.cpp +++ b/ros2_foxglove_bridge/tests/smoke_test.cpp @@ -173,10 +173,24 @@ TEST(SmokeTest, testSubscription) { // Connect a few clients and make sure that they receive the correct message const auto clientCount = 3; for (auto i = 0; i < clientCount; ++i) { - std::vector msgData; - ASSERT_NO_THROW(msgData = foxglove::connectClientAndReceiveMsg(URI, topic_name)); + // Set up a client and subscribe to the channel. + auto client = std::make_shared>(); + auto channelFuture = foxglove::waitForChannel(client, topic_name); + ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND)); + ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND)); + const foxglove::Channel channel = channelFuture.get(); + const foxglove::SubscriptionId subscriptionId = 1; + + // Subscribe to the channel and confirm that the promise resolves + auto msgFuture = waitForChannelMsg(client.get(), subscriptionId); + client->subscribe({{subscriptionId, channel.id}}); + ASSERT_EQ(std::future_status::ready, msgFuture.wait_for(ONE_SECOND)); + const auto msgData = msgFuture.get(); ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size()); EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size())); + + // Unsubscribe from the channel again. + client->unsubscribe({subscriptionId}); } } @@ -193,11 +207,24 @@ TEST(SmokeTest, testSubscriptionParallel) { pub->publish(rosMsg); // Connect a few clients (in parallel) and make sure that they receive the correct message + const foxglove::SubscriptionId subscriptionId = 1; + auto clients = { + std::make_shared>(), + std::make_shared>(), + std::make_shared>(), + }; + std::vector>> futures; - const auto clientCount = 3; - for (auto i = 0; i < clientCount; ++i) { - futures.push_back( - std::async(std::launch::async, foxglove::connectClientAndReceiveMsg, URI, topic_name)); + for (auto client : clients) { + futures.push_back(waitForChannelMsg(client.get(), subscriptionId)); + } + + for (auto client : clients) { + auto channelFuture = foxglove::waitForChannel(client, topic_name); + ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND)); + ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND)); + const foxglove::Channel channel = channelFuture.get(); + client->subscribe({{subscriptionId, channel.id}}); } for (auto& future : futures) { @@ -206,6 +233,10 @@ TEST(SmokeTest, testSubscriptionParallel) { ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size()); EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size())); } + + for (auto client : clients) { + client->unsubscribe({subscriptionId}); + } } TEST(SmokeTest, testPublishing) {