Skip to content

Commit

Permalink
enhancement(gcp_cloud_storage): Make API endpoint configurable (#21158)
Browse files Browse the repository at this point in the history
* enhancement(gcp_cloud_storage): Make API endpoint configurable

Addresses #21137

Adds an `endpoint` config parameter, following the same pattern as other sources/sinks.

* Update changelog.d/21137_configurable_gcs_sink_endpoint.enhancement.md

* Fix GcsSinkConfig instantiation

* cuefmt

Signed-off-by: Jesse Szwedko <[email protected]>

---------

Signed-off-by: Jesse Szwedko <[email protected]>
Co-authored-by: Jesse Szwedko <[email protected]>
Co-authored-by: Jesse Szwedko <[email protected]>
  • Loading branch information
3 people authored Oct 4, 2024
1 parent 4588cec commit 88d6fe7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds support for configuring `gcp_cloud_storage` sink API `endpoint`.

authors: movinfinex
12 changes: 10 additions & 2 deletions src/sinks/gcp/cloud_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::{
sinks::{
gcs_common::{
config::{
build_healthcheck, GcsPredefinedAcl, GcsRetryLogic, GcsStorageClass, BASE_URL,
build_healthcheck, default_endpoint, GcsPredefinedAcl, GcsRetryLogic,
GcsStorageClass,
},
service::{GcsRequest, GcsRequestSettings, GcsService},
sink::GcsSink,
Expand Down Expand Up @@ -155,6 +156,12 @@ pub struct GcsSinkConfig {
#[serde(default)]
batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,

/// API endpoint for Google Cloud Storage
#[configurable(metadata(docs::examples = "http://localhost:9000"))]
#[configurable(validation(format = "uri"))]
#[serde(default = "default_endpoint")]
endpoint: String,

#[configurable(derived)]
#[serde(default)]
request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
Expand Down Expand Up @@ -196,6 +203,7 @@ fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
encoding,
compression: Compression::gzip_default(),
batch: Default::default(),
endpoint: Default::default(),
request: Default::default(),
auth: Default::default(),
tls: Default::default(),
Expand All @@ -221,7 +229,7 @@ impl GenerateConfig for GcsSinkConfig {
impl SinkConfig for GcsSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
let base_url = format!("{}{}/", BASE_URL, self.bucket);
let base_url = format!("{}/{}/", self.endpoint, self.bucket);
let tls = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls, cx.proxy())?;
let healthcheck = build_healthcheck(
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/gcs_common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use crate::{
},
};

pub const BASE_URL: &str = "https://storage.googleapis.com/";
pub fn default_endpoint() -> String {
"https://storage.googleapis.com".to_string()
}

/// GCS Predefined ACLs.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,14 @@ base: components: sinks: gcp_cloud_storage: configuration: {
}
}
}
endpoint: {
description: "API endpoint for Google Cloud Storage"
required: false
type: string: {
default: "https://storage.googleapis.com"
examples: ["http://localhost:9000"]
}
}
filename_append_uuid: {
description: """
Whether or not to append a UUID v4 token to the end of the object key.
Expand Down

0 comments on commit 88d6fe7

Please sign in to comment.