Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segfault due to invalid iterator #251

Merged
merged 2 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

namespace foxglove {

std::vector<uint8_t> connectClientAndReceiveMsg(const std::string& uri,
const std::string& topic_name);
std::future<std::vector<uint8_t>> waitForChannelMsg(ClientInterface* client,
SubscriptionId subscriptionId);

std::future<std::vector<Parameter>> waitForParameters(std::shared_ptr<ClientInterface> client,
const std::string& requestId = std::string());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ void Server<ServerConfiguration>::handleUnsubscribe(const nlohmann::json& payloa
ChannelId chanId = sub->first;
_handlers.unsubscribeHandler(chanId, hdl);
std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
_clients.at(hdl).subscriptionsByChannel.erase(sub);
_clients.at(hdl).subscriptionsByChannel.erase(chanId);
}
}

Expand Down
54 changes: 16 additions & 38 deletions foxglove_bridge_base/src/test/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,24 @@

namespace foxglove {

constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(5);

std::vector<uint8_t> connectClientAndReceiveMsg(const std::string& uri,
const std::string& topicName) {
// Set up text message handler to resolve the promise when the topic is advertised
auto wsClient = std::make_shared<foxglove::Client<websocketpp::config::asio_client>>();
std::promise<nlohmann::json> channelPromise;
auto channelFuture = waitForChannel(wsClient, topicName);

// Connect the client and wait for the channel future
if (std::future_status::ready != wsClient->connect(uri).wait_for(DEFAULT_TIMEOUT)) {
throw std::runtime_error("Client failed to connect");
} else if (std::future_status::ready != channelFuture.wait_for(DEFAULT_TIMEOUT)) {
throw std::runtime_error("Client failed to receive channel");
}
std::future<std::vector<uint8_t>> waitForChannelMsg(ClientInterface* client,
SubscriptionId subscriptionId) {
// Set up binary message handler to resolve when a binary message has been received
auto promise = std::make_shared<std::promise<std::vector<uint8_t>>>();
auto future = promise->get_future();

const auto channel = channelFuture.get();
const SubscriptionId subscriptionId = 1;
client->setBinaryMessageHandler(
[promise = std::move(promise), subscriptionId](const uint8_t* data, size_t dataLength) {
if (ReadUint32LE(data + 1) != subscriptionId) {
return;
}
const size_t offset = 1 + 4 + 8;
std::vector<uint8_t> dataCopy(dataLength - offset);
std::memcpy(dataCopy.data(), data + offset, dataLength - offset);
promise->set_value(std::move(dataCopy));
});

// Set up binary message handler to resolve when a binary message has been received
std::promise<std::vector<uint8_t>> msgPromise;
auto msgFuture = msgPromise.get_future();
wsClient->setBinaryMessageHandler([&msgPromise](const uint8_t* data, size_t dataLength) {
if (ReadUint32LE(data + 1) != subscriptionId) {
return;
}
const size_t offset = 1 + 4 + 8;
std::vector<uint8_t> dataCopy(dataLength - offset);
std::memcpy(dataCopy.data(), data + offset, dataLength - offset);
msgPromise.set_value(std::move(dataCopy));
});

// Subscribe to the channel that corresponds to the topic
wsClient->subscribe({{subscriptionId, channel.id}});

// Wait until we have received a binary message
if (std::future_status::ready != msgFuture.wait_for(DEFAULT_TIMEOUT)) {
throw std::runtime_error("Client failed to receive message");
}
return msgFuture.get();
return future;
}

std::future<std::vector<Parameter>> waitForParameters(std::shared_ptr<ClientInterface> client,
Expand Down
43 changes: 37 additions & 6 deletions ros1_foxglove_bridge/tests/smoke_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,24 @@ TEST(SmokeTest, testSubscription) {
// Connect a few clients and make sure that they receive the correct message
const auto clientCount = 3;
for (auto i = 0; i < clientCount; ++i) {
std::vector<uint8_t> msgData;
ASSERT_NO_THROW(msgData = foxglove::connectClientAndReceiveMsg(URI, topic_name));
// Set up a client and subscribe to the channel.
auto client = std::make_shared<foxglove::Client<websocketpp::config::asio_client>>();
auto channelFuture = foxglove::waitForChannel(client, topic_name);
ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND));
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));
const foxglove::Channel channel = channelFuture.get();
const foxglove::SubscriptionId subscriptionId = 1;

// Subscribe to the channel and confirm that the promise resolves
auto msgFuture = waitForChannelMsg(client.get(), subscriptionId);
client->subscribe({{subscriptionId, channel.id}});
ASSERT_EQ(std::future_status::ready, msgFuture.wait_for(ONE_SECOND));
const auto msgData = msgFuture.get();
ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size()));

// Unsubscribe from the channel again.
client->unsubscribe({subscriptionId});
}
}

Expand All @@ -95,11 +109,24 @@ TEST(SmokeTest, testSubscriptionParallel) {
pub.publish(std::string("hello world"));

// Connect a few clients (in parallel) and make sure that they receive the correct message
const foxglove::SubscriptionId subscriptionId = 1;
auto clients = {
std::make_shared<foxglove::Client<websocketpp::config::asio_client>>(),
std::make_shared<foxglove::Client<websocketpp::config::asio_client>>(),
std::make_shared<foxglove::Client<websocketpp::config::asio_client>>(),
};

std::vector<std::future<std::vector<uint8_t>>> futures;
const auto clientCount = 3;
for (auto i = 0; i < clientCount; ++i) {
futures.push_back(
std::async(std::launch::async, foxglove::connectClientAndReceiveMsg, URI, topic_name));
for (auto client : clients) {
futures.push_back(waitForChannelMsg(client.get(), subscriptionId));
}

for (auto client : clients) {
auto channelFuture = foxglove::waitForChannel(client, topic_name);
ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND));
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));
const foxglove::Channel channel = channelFuture.get();
client->subscribe({{subscriptionId, channel.id}});
}

for (auto& future : futures) {
Expand All @@ -108,6 +135,10 @@ TEST(SmokeTest, testSubscriptionParallel) {
ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size()));
}

for (auto client : clients) {
client->unsubscribe({subscriptionId});
}
}

TEST(SmokeTest, testPublishing) {
Expand Down
43 changes: 37 additions & 6 deletions ros2_foxglove_bridge/tests/smoke_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,24 @@ TEST(SmokeTest, testSubscription) {
// Connect a few clients and make sure that they receive the correct message
const auto clientCount = 3;
for (auto i = 0; i < clientCount; ++i) {
std::vector<uint8_t> msgData;
ASSERT_NO_THROW(msgData = foxglove::connectClientAndReceiveMsg(URI, topic_name));
// Set up a client and subscribe to the channel.
auto client = std::make_shared<foxglove::Client<websocketpp::config::asio_client>>();
auto channelFuture = foxglove::waitForChannel(client, topic_name);
ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND));
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));
const foxglove::Channel channel = channelFuture.get();
const foxglove::SubscriptionId subscriptionId = 1;

// Subscribe to the channel and confirm that the promise resolves
auto msgFuture = waitForChannelMsg(client.get(), subscriptionId);
client->subscribe({{subscriptionId, channel.id}});
ASSERT_EQ(std::future_status::ready, msgFuture.wait_for(ONE_SECOND));
const auto msgData = msgFuture.get();
ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size()));

// Unsubscribe from the channel again.
client->unsubscribe({subscriptionId});
}
}

Expand All @@ -193,11 +207,24 @@ TEST(SmokeTest, testSubscriptionParallel) {
pub->publish(rosMsg);

// Connect a few clients (in parallel) and make sure that they receive the correct message
const foxglove::SubscriptionId subscriptionId = 1;
auto clients = {
std::make_shared<foxglove::Client<websocketpp::config::asio_client>>(),
std::make_shared<foxglove::Client<websocketpp::config::asio_client>>(),
std::make_shared<foxglove::Client<websocketpp::config::asio_client>>(),
};

std::vector<std::future<std::vector<uint8_t>>> futures;
const auto clientCount = 3;
for (auto i = 0; i < clientCount; ++i) {
futures.push_back(
std::async(std::launch::async, foxglove::connectClientAndReceiveMsg, URI, topic_name));
for (auto client : clients) {
futures.push_back(waitForChannelMsg(client.get(), subscriptionId));
}

for (auto client : clients) {
auto channelFuture = foxglove::waitForChannel(client, topic_name);
ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND));
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));
const foxglove::Channel channel = channelFuture.get();
client->subscribe({{subscriptionId, channel.id}});
}

for (auto& future : futures) {
Expand All @@ -206,6 +233,10 @@ TEST(SmokeTest, testSubscriptionParallel) {
ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size()));
}

for (auto client : clients) {
client->unsubscribe({subscriptionId});
}
}

TEST(SmokeTest, testPublishing) {
Expand Down