diff --git a/changelog.d/21385-nats-source-multiple-url.enhancement.md b/changelog.d/21385-nats-source-multiple-url.enhancement.md new file mode 100644 index 0000000000000..c5fa309c560ec --- /dev/null +++ b/changelog.d/21385-nats-source-multiple-url.enhancement.md @@ -0,0 +1,3 @@ +Adds support for multiple URLs in the NATS source. This allows for greater fault tolerance as compared to reading from a single server. + +authors: benjamin-awd diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 2fe4277e1d074..296c4afd88b3a 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -51,6 +51,7 @@ pub struct NatsSourceConfig { /// If the port is not specified it defaults to 4222. #[configurable(metadata(docs::examples = "nats://demo.nats.io"))] #[configurable(metadata(docs::examples = "nats://127.0.0.1:4242"))] + #[configurable(metadata(docs::examples = "nats://localhost:4222,nats://localhost:5222,nats://localhost:6222"))] url: String, /// A [name][nats_connection_name] assigned to the NATS connection. @@ -185,7 +186,18 @@ impl SourceConfig for NatsSourceConfig { impl NatsSourceConfig { async fn connect(&self) -> Result { let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; - options.connect(&self.url).await.context(ConnectSnafu) + + let server_addrs = self.parse_server_addresses()?; + options.connect(server_addrs).await.context(ConnectSnafu) + } + + fn parse_server_addresses(&self) -> Result, BuildError> { + self.url.split(",") + .map(|url| url.parse::() + .map_err(|_| BuildError::Connect { + source: async_nats::ConnectErrorKind::ServerParse.into(), + })) + .collect() } } @@ -846,4 +858,56 @@ mod integration_tests { r ); } + + #[tokio::test] + async fn nats_multiple_urls_valid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://nats:4222,nats://demo.nats.io:4222".to_string(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: None, + log_namespace: None, + subject_key_field: default_subject_key_field(), + ..Default::default() + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed for multiple URLs, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_multiple_urls_invalid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "http://invalid-url,nats://:invalid@localhost:4222".to_string(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: None, + log_namespace: None, + subject_key_field: default_subject_key_field(), + ..Default::default() + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed for bad URLs, expected BuildError::Connect, got: {:?}", + r + ); + } } diff --git a/website/cue/reference/components/sources/base/nats.cue b/website/cue/reference/components/sources/base/nats.cue index c1c582d5c2d30..230a557c1ae57 100644 --- a/website/cue/reference/components/sources/base/nats.cue +++ b/website/cue/reference/components/sources/base/nats.cue @@ -592,6 +592,6 @@ base: components: sources: nats: configuration: { If the port is not specified it defaults to 4222. """ required: true - type: string: examples: ["nats://demo.nats.io", "nats://127.0.0.1:4242"] + type: string: examples: ["nats://demo.nats.io", "nats://127.0.0.1:4242", "nats://localhost:4222,nats://localhost:5222,nats://localhost:6222"] } }