Skip to content

Commit

Permalink
enhancement(nats source): add support for multiple URLs
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamin-awd committed Sep 30, 2024
1 parent 438bc8b commit d47eda4
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
3 changes: 3 additions & 0 deletions changelog.d/21385-nats-source-multiple-url.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds support for multiple URLs in the NATS source. This allows for greater fault tolerance as compared to reading from a single server.

authors: benjamin-awd
66 changes: 65 additions & 1 deletion src/sources/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct NatsSourceConfig {
/// If the port is not specified it defaults to 4222.
#[configurable(metadata(docs::examples = "nats://demo.nats.io"))]
#[configurable(metadata(docs::examples = "nats://127.0.0.1:4242"))]
#[configurable(metadata(docs::examples = "nats://localhost:4222,nats://localhost:5222,nats://localhost:6222"))]
url: String,

/// A [name][nats_connection_name] assigned to the NATS connection.
Expand Down Expand Up @@ -185,7 +186,18 @@ impl SourceConfig for NatsSourceConfig {
impl NatsSourceConfig {
async fn connect(&self) -> Result<async_nats::Client, BuildError> {
let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?;
options.connect(&self.url).await.context(ConnectSnafu)

let server_addrs = self.parse_server_addresses()?;
options.connect(server_addrs).await.context(ConnectSnafu)
}

fn parse_server_addresses(&self) -> Result<Vec<async_nats::ServerAddr>, BuildError> {
self.url.split(",")
.map(|url| url.parse::<async_nats::ServerAddr>()
.map_err(|_| BuildError::Connect {
source: async_nats::ConnectErrorKind::ServerParse.into(),
}))
.collect()
}
}

Expand Down Expand Up @@ -846,4 +858,56 @@ mod integration_tests {
r
);
}

#[tokio::test]
async fn nats_multiple_urls_valid() {
let subject = format!("test-{}", random_string(10));

let conf = NatsSourceConfig {
connection_name: "".to_owned(),
subject: subject.clone(),
url: "nats://nats:4222,nats://demo.nats.io:4222".to_string(),
queue: None,
framing: default_framing_message_based(),
decoding: default_decoding(),
tls: None,
auth: None,
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
assert!(
r.is_ok(),
"publish_and_check failed for multiple URLs, expected Ok(()), got: {:?}",
r
);
}

#[tokio::test]
async fn nats_multiple_urls_invalid() {
let subject = format!("test-{}", random_string(10));

let conf = NatsSourceConfig {
connection_name: "".to_owned(),
subject: subject.clone(),
url: "http://invalid-url,nats://:invalid@localhost:4222".to_string(),
queue: None,
framing: default_framing_message_based(),
decoding: default_decoding(),
tls: None,
auth: None,
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
assert!(
matches!(r, Err(BuildError::Connect { .. })),
"publish_and_check failed for bad URLs, expected BuildError::Connect, got: {:?}",
r
);
}
}
2 changes: 1 addition & 1 deletion website/cue/reference/components/sources/base/nats.cue
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,6 @@ base: components: sources: nats: configuration: {
If the port is not specified it defaults to 4222.
"""
required: true
type: string: examples: ["nats://demo.nats.io", "nats://127.0.0.1:4242"]
type: string: examples: ["nats://demo.nats.io", "nats://127.0.0.1:4242", "nats://localhost:4222,nats://localhost:5222,nats://localhost:6222"]
}
}

0 comments on commit d47eda4

Please sign in to comment.