From c0f991b9620e26c9f0454d53627bb72314556f95 Mon Sep 17 00:00:00 2001 From: Deen Date: Tue, 4 Jun 2024 22:28:52 +0800 Subject: [PATCH] feat(opentelemetry source): support trace ingestion (#19728) * feat: otlp trace * feat: add tuple grpc services * fmt * add changelog.d * add intergration tests * remove .vscode in gitignore * cue * add int test * hex trace_id span_id * fix int test * add a newline to pass the check * Update changelog.d/opentelemetry_trace.feature.md Co-authored-by: Stephen Wakely * Update website/cue/reference/components/sources/opentelemetry.cue Co-authored-by: Jesse Szwedko * Update website/cue/reference/components/sources/opentelemetry.cue Co-authored-by: Stephen Wakely * Fix cue error Signed-off-by: Stephen Wakely * Update trace.proto fix ident --------- Signed-off-by: Stephen Wakely Co-authored-by: Stephen Wakely Co-authored-by: Jesse Szwedko Co-authored-by: Stephen Wakely --- changelog.d/opentelemetry_trace.feature.md | 3 + lib/opentelemetry-proto/build.rs | 2 + lib/opentelemetry-proto/src/convert.rs | 158 +++++++++++++++++- lib/opentelemetry-proto/src/proto.rs | 12 ++ .../opentelemetry/proto/trace/v1/trace.proto | 8 +- src/sources/opentelemetry/grpc.rs | 52 +++++- src/sources/opentelemetry/http.rs | 107 ++++++++++-- .../opentelemetry/integration_tests.rs | 85 +++++++++- src/sources/opentelemetry/mod.rs | 36 +++- src/sources/opentelemetry/tests.rs | 11 +- src/sources/util/grpc/mod.rs | 28 ++++ tests/data/opentelemetry/config.yaml | 3 + .../components/sources/opentelemetry.cue | 14 +- 13 files changed, 475 insertions(+), 44 deletions(-) create mode 100644 changelog.d/opentelemetry_trace.feature.md diff --git a/changelog.d/opentelemetry_trace.feature.md b/changelog.d/opentelemetry_trace.feature.md new file mode 100644 index 0000000000000..544ffa86933ed --- /dev/null +++ b/changelog.d/opentelemetry_trace.feature.md @@ -0,0 +1,3 @@ +Support ingesting opentelemetry traces(Experimentally) + +authors: caibirdme diff --git a/lib/opentelemetry-proto/build.rs b/lib/opentelemetry-proto/build.rs index a5eddbe8f2a58..beaf9f4c417aa 100644 --- a/lib/opentelemetry-proto/build.rs +++ b/lib/opentelemetry-proto/build.rs @@ -9,6 +9,8 @@ fn main() -> Result<(), Error> { "src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto", ], &["src/proto/opentelemetry-proto"], diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index d1f91798f44c6..dc789d27fdd95 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -2,16 +2,25 @@ use bytes::Bytes; use chrono::{DateTime, TimeZone, Utc}; use lookup::path; use ordered_float::NotNan; +use std::collections::BTreeMap; use vector_core::{ config::{log_schema, LegacyKey, LogNamespace}, - event::{Event, LogEvent}, + event::{Event, LogEvent, TraceEvent}, +}; +use vrl::value::KeyString; +use vrl::{ + event_path, + value::{ObjectMap, Value}, }; -use vrl::value::{ObjectMap, Value}; use super::proto::{ common::v1::{any_value::Value as PBValue, KeyValue}, logs::v1::{LogRecord, ResourceLogs, SeverityNumber}, resource::v1::Resource, + trace::v1::{ + span::{Event as SpanEvent, Link}, + ResourceSpans, Span, Status as SpanStatus, + }, }; const SOURCE_NAME: &str = "opentelemetry"; @@ -44,6 +53,24 @@ impl ResourceLogs { } } +impl ResourceSpans { + pub fn into_event_iter(self) -> impl Iterator { + let resource = self.resource; + let now = Utc::now(); + + self.scope_spans + .into_iter() + .flat_map(|instrumentation_library_spans| instrumentation_library_spans.spans) + .map(move |span| { + ResourceSpan { + resource: resource.clone(), + span, + } + .into_event(now) + }) + } +} + impl From for Value { fn from(av: PBValue) -> Self { match av { @@ -68,6 +95,11 @@ struct ResourceLog { log_record: LogRecord, } +struct ResourceSpan { + resource: Option, + span: Span, +} + fn kv_list_into_value(arr: Vec) -> Value { Value::Object( arr.into_iter() @@ -83,6 +115,83 @@ fn kv_list_into_value(arr: Vec) -> Value { ) } +fn to_hex(d: &[u8]) -> String { + if d.is_empty() { + return "".to_string(); + } + hex::encode(d) +} + +// Unlike log events(log body + metadata), trace spans are just metadata, so we don't handle log_namespace here, +// insert all attributes into log root, just like what datadog_agent/traces does. +impl ResourceSpan { + fn into_event(self, now: DateTime) -> Event { + let mut trace = TraceEvent::default(); + let span = self.span; + trace.insert( + event_path!(TRACE_ID_KEY), + Value::from(to_hex(&span.trace_id)), + ); + trace.insert(event_path!(SPAN_ID_KEY), Value::from(to_hex(&span.span_id))); + trace.insert(event_path!("trace_state"), span.trace_state); + trace.insert( + event_path!("parent_span_id"), + Value::from(to_hex(&span.parent_span_id)), + ); + trace.insert(event_path!("name"), span.name); + trace.insert(event_path!("kind"), span.kind); + trace.insert( + event_path!("start_time_unix_nano"), + Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64)), + ); + trace.insert( + event_path!("end_time_unix_nano"), + Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64)), + ); + if !span.attributes.is_empty() { + trace.insert( + event_path!(ATTRIBUTES_KEY), + kv_list_into_value(span.attributes), + ); + } + trace.insert( + event_path!(DROPPED_ATTRIBUTES_COUNT_KEY), + Value::from(span.dropped_attributes_count), + ); + if !span.events.is_empty() { + trace.insert( + event_path!("events"), + Value::Array(span.events.into_iter().map(Into::into).collect()), + ); + } + trace.insert( + event_path!("dropped_events_count"), + Value::from(span.dropped_events_count), + ); + if !span.links.is_empty() { + trace.insert( + event_path!("links"), + Value::Array(span.links.into_iter().map(Into::into).collect()), + ); + } + trace.insert( + event_path!("dropped_links_count"), + Value::from(span.dropped_links_count), + ); + trace.insert(event_path!("status"), Value::from(span.status)); + if let Some(resource) = self.resource { + if !resource.attributes.is_empty() { + trace.insert( + event_path!(RESOURCE_KEY), + kv_list_into_value(resource.attributes), + ); + } + } + trace.insert(event_path!("ingest_timestamp"), Value::from(now)); + trace.into() + } +} + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.15.0/specification/logs/data-model.md impl ResourceLog { fn into_event(self, log_namespace: LogNamespace, now: DateTime) -> Event { @@ -130,7 +239,7 @@ impl ResourceLog { &mut log, Some(LegacyKey::Overwrite(path!(TRACE_ID_KEY))), path!(TRACE_ID_KEY), - Bytes::from(hex::encode(self.log_record.trace_id)), + Bytes::from(to_hex(&self.log_record.trace_id)), ); } if !self.log_record.span_id.is_empty() { @@ -139,7 +248,7 @@ impl ResourceLog { &mut log, Some(LegacyKey::Overwrite(path!(SPAN_ID_KEY))), path!(SPAN_ID_KEY), - Bytes::from(hex::encode(self.log_record.span_id)), + Bytes::from(to_hex(&self.log_record.span_id)), ); } if !self.log_record.severity_text.is_empty() { @@ -224,3 +333,44 @@ impl ResourceLog { log.into() } } + +impl From for Value { + fn from(ev: SpanEvent) -> Self { + let mut obj: BTreeMap = BTreeMap::new(); + obj.insert("name".into(), ev.name.into()); + obj.insert( + "time_unix_nano".into(), + Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64)), + ); + obj.insert("attributes".into(), kv_list_into_value(ev.attributes)); + obj.insert( + "dropped_attributes_count".into(), + Value::Integer(ev.dropped_attributes_count as i64), + ); + Value::Object(obj) + } +} + +impl From for Value { + fn from(link: Link) -> Self { + let mut obj: BTreeMap = BTreeMap::new(); + obj.insert("trace_id".into(), Value::from(to_hex(&link.trace_id))); + obj.insert("span_id".into(), Value::from(to_hex(&link.span_id))); + obj.insert("trace_state".into(), link.trace_state.into()); + obj.insert("attributes".into(), kv_list_into_value(link.attributes)); + obj.insert( + "dropped_attributes_count".into(), + Value::Integer(link.dropped_attributes_count as i64), + ); + Value::Object(obj) + } +} + +impl From for Value { + fn from(status: SpanStatus) -> Self { + let mut obj: BTreeMap = BTreeMap::new(); + obj.insert("message".into(), status.message.into()); + obj.insert("code".into(), status.code.into()); + Value::Object(obj) + } +} diff --git a/lib/opentelemetry-proto/src/proto.rs b/lib/opentelemetry-proto/src/proto.rs index ac248a52225a3..e77748c1c15dc 100644 --- a/lib/opentelemetry-proto/src/proto.rs +++ b/lib/opentelemetry-proto/src/proto.rs @@ -1,5 +1,10 @@ /// Service stub and clients. pub mod collector { + pub mod trace { + pub mod v1 { + tonic::include_proto!("opentelemetry.proto.collector.trace.v1"); + } + } pub mod logs { pub mod v1 { tonic::include_proto!("opentelemetry.proto.collector.logs.v1"); @@ -21,6 +26,13 @@ pub mod logs { } } +/// Generated types used for trace. +pub mod trace { + pub mod v1 { + tonic::include_proto!("opentelemetry.proto.trace.v1"); + } +} + /// Generated types used in resources. pub mod resource { pub mod v1 { diff --git a/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto b/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto index cd519f5b92584..864253ee41d87 100644 --- a/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto +++ b/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto @@ -170,10 +170,10 @@ message Span { // attributes is a collection of key/value pairs. Note, global attributes // like server name can be set using the resource API. Examples of attributes: // - // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" - // "/http/server_latency": 300 - // "example.com/my_attribute": true - // "example.com/score": 10.239 + // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + // "/http/server_latency": 300 + // "example.com/my_attribute": true + // "example.com/score": 10.239 // // The OpenTelemetry API specification further restricts the allowed value types: // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute diff --git a/src/sources/opentelemetry/grpc.rs b/src/sources/opentelemetry/grpc.rs index a76ca1e53a3ab..600fdfdf0d437 100644 --- a/src/sources/opentelemetry/grpc.rs +++ b/src/sources/opentelemetry/grpc.rs @@ -1,8 +1,13 @@ use futures::TryFutureExt; use tonic::{Request, Response, Status}; use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered}; -use vector_lib::opentelemetry::proto::collector::logs::v1::{ - logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, +use vector_lib::opentelemetry::proto::collector::{ + logs::v1::{ + logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, + }, + trace::v1::{ + trace_service_server::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, + }, }; use vector_lib::{ config::LogNamespace, @@ -12,7 +17,7 @@ use vector_lib::{ use crate::{ internal_events::{EventsReceived, StreamClosedError}, - sources::opentelemetry::LOGS, + sources::opentelemetry::{LOGS, TRACES}, SourceSender, }; @@ -24,19 +29,52 @@ pub(super) struct Service { pub log_namespace: LogNamespace, } +#[tonic::async_trait] +impl TraceService for Service { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let events: Vec = request + .into_inner() + .resource_spans + .into_iter() + .flat_map(|v| v.into_event_iter()) + .collect(); + self.handle_events(events, TRACES).await?; + + Ok(Response::new(ExportTraceServiceResponse { + partial_success: None, + })) + } +} + #[tonic::async_trait] impl LogsService for Service { async fn export( &self, request: Request, ) -> Result, Status> { - let mut events: Vec = request + let events: Vec = request .into_inner() .resource_logs .into_iter() .flat_map(|v| v.into_event_iter(self.log_namespace)) .collect(); + self.handle_events(events, LOGS).await?; + Ok(Response::new(ExportLogsServiceResponse { + partial_success: None, + })) + } +} + +impl Service { + async fn handle_events( + &self, + mut events: Vec, + log_name: &'static str, + ) -> Result<(), Status> { let count = events.len(); let byte_size = events.estimated_json_encoded_size_of(); self.events_received.emit(CountByteSize(count, byte_size)); @@ -45,7 +83,7 @@ impl LogsService for Service { self.pipeline .clone() - .send_batch_named(LOGS, events) + .send_batch_named(log_name, events) .map_err(|error| { let message = error.to_string(); emit!(StreamClosedError { count }); @@ -53,9 +91,7 @@ impl LogsService for Service { }) .and_then(|_| handle_batch_status(receiver)) .await?; - Ok(Response::new(ExportLogsServiceResponse { - partial_success: None, - })) + Ok(()) } } diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index ec0e0c020f70f..eb2aeeae231bb 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -13,8 +13,9 @@ use tracing::Span; use vector_lib::internal_event::{ ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered, }; -use vector_lib::opentelemetry::proto::collector::logs::v1::{ - ExportLogsServiceRequest, ExportLogsServiceResponse, +use vector_lib::opentelemetry::proto::collector::{ + logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse}, + trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse}, }; use vector_lib::tls::MaybeTlsIncomingStream; use vector_lib::{ @@ -86,6 +87,29 @@ pub(crate) fn build_warp_filter( out: SourceSender, bytes_received: Registered, events_received: Registered, +) -> BoxedFilter<(Response,)> { + let log_filters = build_warp_log_filter( + acknowledgements, + log_namespace, + out.clone(), + bytes_received.clone(), + events_received.clone(), + ); + let trace_filters = build_warp_trace_filter( + acknowledgements, + out.clone(), + bytes_received, + events_received, + ); + log_filters.or(trace_filters).unify().boxed() +} + +fn build_warp_log_filter( + acknowledgements: bool, + log_namespace: LogNamespace, + out: SourceSender, + bytes_received: Registered, + events_received: Registered, ) -> BoxedFilter<(Response,)> { warp::post() .and(warp::path!("v1" / "logs")) @@ -98,15 +122,77 @@ pub(crate) fn build_warp_filter( .and_then(move |encoding_header: Option, body: Bytes| { let events = decode(encoding_header.as_deref(), body).and_then(|body| { bytes_received.emit(ByteSize(body.len())); - decode_body(body, log_namespace, &events_received) + decode_log_body(body, log_namespace, &events_received) + }); + + handle_request( + events, + acknowledgements, + out.clone(), + super::LOGS, + ExportLogsServiceResponse::default(), + ) + }) + .boxed() +} + +fn build_warp_trace_filter( + acknowledgements: bool, + out: SourceSender, + bytes_received: Registered, + events_received: Registered, +) -> BoxedFilter<(Response,)> { + warp::post() + .and(warp::path!("v1" / "traces")) + .and(warp::header::exact_ignore_case( + "content-type", + "application/x-protobuf", + )) + .and(warp::header::optional::("content-encoding")) + .and(warp::body::bytes()) + .and_then(move |encoding_header: Option, body: Bytes| { + let events = decode(encoding_header.as_deref(), body).and_then(|body| { + bytes_received.emit(ByteSize(body.len())); + decode_trace_body(body, &events_received) }); - handle_request(events, acknowledgements, out.clone(), super::LOGS) + handle_request( + events, + acknowledgements, + out.clone(), + super::TRACES, + ExportTraceServiceResponse::default(), + ) }) .boxed() } -fn decode_body( +fn decode_trace_body( + body: Bytes, + events_received: &Registered, +) -> Result, ErrorMessage> { + let request = ExportTraceServiceRequest::decode(body).map_err(|error| { + ErrorMessage::new( + StatusCode::BAD_REQUEST, + format!("Could not decode request: {}", error), + ) + })?; + + let events: Vec = request + .resource_spans + .into_iter() + .flat_map(|v| v.into_event_iter()) + .collect(); + + events_received.emit(CountByteSize( + events.len(), + events.estimated_json_encoded_size_of(), + )); + + Ok(events) +} + +fn decode_log_body( body: Bytes, log_namespace: LogNamespace, events_received: &Registered, @@ -137,6 +223,7 @@ async fn handle_request( acknowledgements: bool, mut out: SourceSender, output: &str, + resp: impl Message, ) -> Result { match events { Ok(mut events) => { @@ -149,15 +236,9 @@ async fn handle_request( })?; match receiver { - None => Ok(protobuf(ExportLogsServiceResponse { - partial_success: None, - }) - .into_response()), + None => Ok(protobuf(resp).into_response()), Some(receiver) => match receiver.await { - BatchStatus::Delivered => Ok(protobuf(ExportLogsServiceResponse { - partial_success: None, - }) - .into_response()), + BatchStatus::Delivered => Ok(protobuf(resp).into_response()), BatchStatus::Errored => Err(warp::reject::custom(Status { code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here message: "Error delivering contents to sink".into(), diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index 9fc735b21abdc..a775e01ed5d07 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -1,7 +1,9 @@ use std::time::Duration; +use itertools::Itertools; use serde_json::json; +use super::{LOGS, TRACES}; use crate::{ config::{log_schema, SourceConfig, SourceContext}, event::EventStatus, @@ -11,6 +13,12 @@ use crate::{ retry_until, wait_for_tcp, }, }; +use prost::Message; + +use vector_lib::opentelemetry::proto::{ + collector::trace::v1::ExportTraceServiceRequest, + trace::v1::{ResourceSpans, ScopeSpans, Span}, +}; use super::{tests::new_source, GrpcConfig, HttpConfig, OpentelemetryConfig}; @@ -49,7 +57,7 @@ async fn receive_logs_legacy_namespace() { log_namespace: Default::default(), }; - let (sender, logs_output, _) = new_source(EventStatus::Delivered); + let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); let server = config .build(SourceContext::new_test(sender, None)) .await @@ -97,6 +105,81 @@ async fn receive_logs_legacy_namespace() { .await; } +#[tokio::test] +async fn receive_trace() { + // generate a trace request + let req = ExportTraceServiceRequest { + resource_spans: vec![ResourceSpans { + resource: None, + scope_spans: vec![ScopeSpans { + scope: None, + spans: vec![Span { + trace_id: (1..17).collect_vec(), //trace_id [u8;16] + span_id: (1..9).collect_vec(), // span_id [u8;8] + parent_span_id: (1..9).collect_vec(), // parent_span_id [u8;8] + name: "span".to_string(), + kind: 1, + start_time_unix_nano: 1713525203000000000, + end_time_unix_nano: 1713525205000000000, + attributes: vec![], + dropped_attributes_count: 0, + events: vec![], + dropped_events_count: 0, + links: vec![], + dropped_links_count: 0, + status: None, + trace_state: "".to_string(), + }], + schema_url: "".to_string(), + }], + schema_url: "".to_string(), + }], + }; + let body = req.encode_to_vec(); + + assert_source_compliance(&SOURCE_TAGS, async { + wait_ready(otel_health_url()).await; + + let config = OpentelemetryConfig { + grpc: GrpcConfig { + address: source_grpc_address().parse().unwrap(), + tls: Default::default(), + }, + http: HttpConfig { + address: source_http_address().parse().unwrap(), + tls: Default::default(), + keepalive: Default::default(), + }, + acknowledgements: Default::default(), + log_namespace: Default::default(), + }; + + let (sender, trace_output, _) = new_source(EventStatus::Delivered, TRACES.to_string()); + let server = config + .build(SourceContext::new_test(sender, None)) + .await + .unwrap(); + tokio::spawn(server); + wait_for_tcp(source_grpc_address()).await; + wait_for_tcp(source_http_address()).await; + + let client = reqwest::Client::new(); + let _res = client + .post(format!("{}/v1/traces", otel_otlp_url())) + .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf") + .body(body) + .send() + .await + .expect("Failed to send traces to Opentelemetry Collector."); + + // The Opentelemetry Collector is configured to send to both the gRPC and HTTP endpoints + // so we should expect to collect two events from the single log sent. + let events = collect_n(trace_output, 2).await; + assert_eq!(events.len(), 2); + }) + .await; +} + async fn wait_ready(address: String) { retry_until( || async { diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index 86cd2e23455c3..74ad6c6547d88 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -11,15 +11,20 @@ mod status; use std::net::SocketAddr; use futures::{future::join, FutureExt, TryFutureExt}; +use tonic::codec::CompressionEncoding; use vector_lib::lookup::{owned_value_path, OwnedTargetPath}; use vector_lib::opentelemetry::convert::{ ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY, RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY, }; +use tonic::transport::server::RoutesBuilder; use vector_lib::configurable::configurable_component; use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; -use vector_lib::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsServiceServer; +use vector_lib::opentelemetry::proto::collector::{ + logs::v1::logs_service_server::LogsServiceServer, + trace::v1::trace_service_server::TraceServiceServer, +}; use vector_lib::{ config::{log_schema, LegacyKey, LogNamespace}, schema::Definition, @@ -37,11 +42,12 @@ use crate::{ }, http::KeepaliveConfig, serde::bool_or_struct, - sources::{util::grpc::run_grpc_server, Source}, + sources::{util::grpc::run_grpc_server_with_routes, Source}, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; pub const LOGS: &str = "logs"; +pub const TRACES: &str = "traces"; /// Configuration for the `opentelemetry` source. #[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))] @@ -138,20 +144,31 @@ impl SourceConfig for OpentelemetryConfig { let log_namespace = cx.log_namespace(self.log_namespace); let grpc_tls_settings = MaybeTlsSettings::from_config(&self.grpc.tls, true)?; - let grpc_service = LogsServiceServer::new(Service { + + let log_service = LogsServiceServer::new(Service { pipeline: cx.out.clone(), acknowledgements, log_namespace, events_received: events_received.clone(), }) - .accept_compressed(tonic::codec::CompressionEncoding::Gzip) - // Tonic added a default of 4MB in 0.9. This replaces the old behavior. + .accept_compressed(CompressionEncoding::Gzip) .max_decoding_message_size(usize::MAX); - let grpc_source = run_grpc_server( + let trace_service = TraceServiceServer::new(Service { + pipeline: cx.out.clone(), + acknowledgements, + log_namespace, + events_received: events_received.clone(), + }) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); + + let mut builder = RoutesBuilder::default(); + builder.add_service(log_service).add_service(trace_service); + let grpc_source = run_grpc_server_with_routes( self.grpc.address, grpc_tls_settings, - grpc_service, + builder.routes(), cx.shutdown.clone(), ) .map_err(|error| { @@ -269,7 +286,10 @@ impl SourceConfig for OpentelemetryConfig { } }; - vec![SourceOutput::new_logs(DataType::Log, schema_definition).with_port(LOGS)] + vec![ + SourceOutput::new_logs(DataType::Log, schema_definition).with_port(LOGS), + SourceOutput::new_traces().with_port(TRACES), + ] } fn resources(&self) -> Vec { diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 628b92bb3d47b..5831f0a7beb96 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -57,7 +57,7 @@ async fn receive_grpc_logs_vector_namespace() { .remove(0) .schema_definition(true); - let (sender, logs_output, _) = new_source(EventStatus::Delivered); + let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); let server = source .build(SourceContext::new_test(sender, None)) .await @@ -195,7 +195,7 @@ async fn receive_grpc_logs_legacy_namespace() { .remove(0) .schema_definition(true); - let (sender, logs_output, _) = new_source(EventStatus::Delivered); + let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); let server = source .build(SourceContext::new_test(sender, None)) .await @@ -285,16 +285,17 @@ async fn receive_grpc_logs_legacy_namespace() { pub(super) fn new_source( status: EventStatus, + event_name: String, ) -> ( SourceSender, impl Stream, impl Stream, ) { let (mut sender, recv) = SourceSender::new_test_finalize(status); - let logs_output = sender - .add_outputs(status, LOGS.to_string()) + let output = sender + .add_outputs(status, event_name) .flat_map(into_event_stream); - (sender, logs_output, recv) + (sender, output, recv) } fn str_into_hex_bytes(s: &str) -> Vec { diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index c1976a4ac8fb5..1052318538bc6 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -7,6 +7,7 @@ use futures::FutureExt; use http::{Request, Response}; use hyper::Body; use std::{convert::Infallible, net::SocketAddr, time::Duration}; +use tonic::transport::server::Routes; use tonic::{ body::BoxBody, transport::server::{NamedService, Server}, @@ -63,6 +64,33 @@ where Ok(()) } +// This is a bit of a ugly hack to allow us to run two services on the same port. +// I just don't know how to convert the generic type with associated types into a Vec>. +pub async fn run_grpc_server_with_routes( + address: SocketAddr, + tls_settings: MaybeTlsSettings, + routes: Routes, + shutdown: ShutdownSignal, +) -> crate::Result<()> { + let span = Span::current(); + let (tx, rx) = tokio::sync::oneshot::channel::(); + let listener = tls_settings.bind(&address).await?; + let stream = listener.accept_stream(); + + info!(%address, "Building gRPC server."); + + Server::builder() + .layer(build_grpc_trace_layer(span.clone())) + .layer(DecompressionAndMetricsLayer) + .add_routes(routes) + .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap())) + .await?; + + drop(rx.await); + + Ok(()) +} + /// Builds a [TraceLayer] configured for a gRPC server. /// /// This layer emits gPRC specific telemetry for messages received/sent and handler duration. diff --git a/tests/data/opentelemetry/config.yaml b/tests/data/opentelemetry/config.yaml index 84b5a89372356..1bb4ab7021b62 100644 --- a/tests/data/opentelemetry/config.yaml +++ b/tests/data/opentelemetry/config.yaml @@ -26,3 +26,6 @@ service: logs: receivers: [otlp] exporters: [otlp,otlphttp] + traces: + receivers: [otlp] + exporters: [otlp,otlphttp] diff --git a/website/cue/reference/components/sources/opentelemetry.cue b/website/cue/reference/components/sources/opentelemetry.cue index 415039485ced6..3f767b5c449bb 100644 --- a/website/cue/reference/components/sources/opentelemetry.cue +++ b/website/cue/reference/components/sources/opentelemetry.cue @@ -41,7 +41,7 @@ components: sources: opentelemetry: { requirements: [] warnings: [ """ - The `opentelemetry` source only supports log events at this time. + The `opentelemetry` source only supports log and trace events at this time. """, ] notices: [] @@ -60,6 +60,12 @@ components: sources: opentelemetry: { Received log events will go to this output stream. Use `.logs` as an input to downstream transforms and sinks. """ }, + { + name: "traces" + description: """ + Received trace events will go to this output stream. Use `.traces` as an input to downstream transforms and sinks. + """ + }, ] output: { @@ -204,6 +210,12 @@ components: sources: opentelemetry: { `/usr/local/ssl/openssl.cnf` or can be specified with the `OPENSSL_CONF` environment variable. """ } + traces: { + title: "Ingest OTLP traces" + body: """ + Trace support is experimental and subject to change as Vector has no strongly-typed structure for traces internally. Instead traces are stored as a key/value map similar to logs. This may change in the future to be a structured format. + """ + } } telemetry: metrics: {