From efda87e121b0e1e41d00922d073b93e0c9d35a30 Mon Sep 17 00:00:00 2001 From: ronaldoliu Date: Fri, 26 Jan 2024 14:52:06 +0800 Subject: [PATCH 01/16] feat: otlp trace --- .gitignore | 1 + lib/opentelemetry-proto/build.rs | 4 +- lib/opentelemetry-proto/src/convert.rs | 99 +++++++++++++++++++++++++- lib/opentelemetry-proto/src/proto.rs | 12 ++++ src/sources/opentelemetry/grpc.rs | 44 +++++++++--- src/sources/opentelemetry/http.rs | 85 ++++++++++++++++++---- src/sources/opentelemetry/mod.rs | 29 ++++++-- 7 files changed, 245 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index 25a85d5f6b361..135ee9b82dbf7 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ node_modules tests/data/wasm/*/target heaptrack.* massif.* +.vscode # tilt tilt_modules/ diff --git a/lib/opentelemetry-proto/build.rs b/lib/opentelemetry-proto/build.rs index a5eddbe8f2a58..5ecd955812ded 100644 --- a/lib/opentelemetry-proto/build.rs +++ b/lib/opentelemetry-proto/build.rs @@ -9,10 +9,12 @@ fn main() -> Result<(), Error> { "src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto", ], &["src/proto/opentelemetry-proto"], )?; Ok(()) -} +} \ No newline at end of file diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index d1f91798f44c6..e005431e4ed30 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -1,16 +1,19 @@ +use std::collections::BTreeMap; +use vrl::value::KeyString; use bytes::Bytes; use chrono::{DateTime, TimeZone, Utc}; use lookup::path; use ordered_float::NotNan; use vector_core::{ config::{log_schema, LegacyKey, LogNamespace}, - event::{Event, LogEvent}, + event::{Event, LogEvent, TraceEvent}, }; -use vrl::value::{ObjectMap, Value}; +use vrl::{event_path, value::{ObjectMap, Value}}; use super::proto::{ common::v1::{any_value::Value as PBValue, KeyValue}, logs::v1::{LogRecord, ResourceLogs, SeverityNumber}, + trace::v1::{ResourceSpans, Span, Status as SpanStatus, span::{Event as SpanEvent, Link}}, resource::v1::Resource, }; @@ -44,6 +47,24 @@ impl ResourceLogs { } } +impl ResourceSpans { + pub fn into_event_iter(self) -> impl Iterator { + let resource = self.resource; + let now = Utc::now(); + + self.scope_spans + .into_iter() + .flat_map(|instrumentation_library_spans| instrumentation_library_spans.spans) + .map(move |span| { + ResourceSpan { + resource: resource.clone(), + span, + } + .into_event(now) + }) + } +} + impl From for Value { fn from(av: PBValue) -> Self { match av { @@ -68,6 +89,11 @@ struct ResourceLog { log_record: LogRecord, } +struct ResourceSpan { + resource: Option, + span: Span, +} + fn kv_list_into_value(arr: Vec) -> Value { Value::Object( arr.into_iter() @@ -83,6 +109,43 @@ fn kv_list_into_value(arr: Vec) -> Value { ) } +// Unlike log events(log body + metadata), trace spans are just metadata, so we don't handle log_namespace here, +// insert all attributes into log root, just like what datadog_agent/traces does. +impl ResourceSpan { + fn into_event(self, now: DateTime) -> Event { + let mut trace = TraceEvent::default(); + let span = self.span; + trace.insert(event_path!("trace_id"), Value::from(span.trace_id)); + trace.insert(event_path!("span_id"), Value::from(span.span_id)); + trace.insert(event_path!("trace_state"), span.trace_state); + trace.insert(event_path!("parent_span_id"), Value::from(span.parent_span_id)); + trace.insert(event_path!("name"), span.name); + trace.insert(event_path!("kind"), span.kind); + trace.insert(event_path!("start_time_unix_nano"), Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64))); + trace.insert(event_path!("end_time_unix_nano"), Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64))); + if !span.attributes.is_empty() { + trace.insert(event_path!("attributes"), kv_list_into_value(span.attributes)); + } + trace.insert(event_path!("dropped_attributes_count"), Value::from(span.dropped_attributes_count)); + if !span.events.is_empty() { + trace.insert(event_path!("events"), Value::Array(span.events.into_iter().map(Into::into).collect())); + } + trace.insert(event_path!("dropped_events_count"), Value::from(span.dropped_events_count)); + if !span.links.is_empty() { + trace.insert(event_path!("links"), Value::Array(span.links.into_iter().map(Into::into).collect())); + } + trace.insert(event_path!("dropped_links_count"), Value::from(span.dropped_links_count)); + trace.insert(event_path!("status"), Value::from(span.status)); + if let Some(resource) = self.resource { + if !resource.attributes.is_empty() { + trace.insert(event_path!(RESOURCE_KEY), kv_list_into_value(resource.attributes)); + } + } + trace.insert(event_path!("ingest_timestamp"), Value::from(now)); + trace.into() + } +} + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.15.0/specification/logs/data-model.md impl ResourceLog { fn into_event(self, log_namespace: LogNamespace, now: DateTime) -> Event { @@ -224,3 +287,35 @@ impl ResourceLog { log.into() } } + +impl From for Value { + fn from(ev: SpanEvent) -> Self { + let mut obj: BTreeMap = BTreeMap::new(); + obj.insert("name".into(), ev.name.into()); + obj.insert("time_unix_nano".into(), Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64))); + obj.insert("attributes".into(), kv_list_into_value(ev.attributes)); + obj.insert("dropped_attributes_count".into(), Value::Integer(ev.dropped_attributes_count as i64)); + Value::Object(obj) + } +} + +impl From for Value { + fn from(link: Link) -> Self { + let mut obj: BTreeMap = BTreeMap::new(); + obj.insert("trace_id".into(), Value::from(link.trace_id)); + obj.insert("span_id".into(), Value::from(link.span_id)); + obj.insert("trace_state".into(), link.trace_state.into()); + obj.insert("attributes".into(), kv_list_into_value(link.attributes)); + obj.insert("dropped_attributes_count".into(), Value::Integer(link.dropped_attributes_count as i64)); + Value::Object(obj) + } +} + +impl From for Value { + fn from(status: SpanStatus) -> Self { + let mut obj: BTreeMap = BTreeMap::new(); + obj.insert("message".into(), status.message.into()); + obj.insert("code".into(), status.code.into()); + Value::Object(obj) + } +} \ No newline at end of file diff --git a/lib/opentelemetry-proto/src/proto.rs b/lib/opentelemetry-proto/src/proto.rs index ac248a52225a3..e77748c1c15dc 100644 --- a/lib/opentelemetry-proto/src/proto.rs +++ b/lib/opentelemetry-proto/src/proto.rs @@ -1,5 +1,10 @@ /// Service stub and clients. pub mod collector { + pub mod trace { + pub mod v1 { + tonic::include_proto!("opentelemetry.proto.collector.trace.v1"); + } + } pub mod logs { pub mod v1 { tonic::include_proto!("opentelemetry.proto.collector.logs.v1"); @@ -21,6 +26,13 @@ pub mod logs { } } +/// Generated types used for trace. +pub mod trace { + pub mod v1 { + tonic::include_proto!("opentelemetry.proto.trace.v1"); + } +} + /// Generated types used in resources. pub mod resource { pub mod v1 { diff --git a/src/sources/opentelemetry/grpc.rs b/src/sources/opentelemetry/grpc.rs index a76ca1e53a3ab..e372335fc5536 100644 --- a/src/sources/opentelemetry/grpc.rs +++ b/src/sources/opentelemetry/grpc.rs @@ -1,8 +1,9 @@ use futures::TryFutureExt; use tonic::{Request, Response, Status}; use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered}; -use vector_lib::opentelemetry::proto::collector::logs::v1::{ - logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, +use vector_lib::opentelemetry::proto::collector::{ + trace::v1::{trace_service_server::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse}, + logs::v1::{logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse}, }; use vector_lib::{ config::LogNamespace, @@ -12,7 +13,7 @@ use vector_lib::{ use crate::{ internal_events::{EventsReceived, StreamClosedError}, - sources::opentelemetry::LOGS, + sources::opentelemetry::{LOGS, TRACES}, SourceSender, }; @@ -24,19 +25,48 @@ pub(super) struct Service { pub log_namespace: LogNamespace, } +#[tonic::async_trait] +impl TraceService for Service { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let events: Vec = request + .into_inner() + .resource_spans + .into_iter() + .flat_map(|v| v.into_event_iter()) + .collect(); + self.handle_events(events, TRACES).await?; + + Ok(Response::new(ExportTraceServiceResponse { + partial_success: None, + })) + } +} + #[tonic::async_trait] impl LogsService for Service { async fn export( &self, request: Request, ) -> Result, Status> { - let mut events: Vec = request + let events: Vec = request .into_inner() .resource_logs .into_iter() .flat_map(|v| v.into_event_iter(self.log_namespace)) .collect(); + self.handle_events(events, LOGS).await?; + + Ok(Response::new(ExportLogsServiceResponse { + partial_success: None, + })) + } +} +impl Service { + async fn handle_events(&self, mut events: Vec, log_name: &'static str) -> Result<(), Status> { let count = events.len(); let byte_size = events.estimated_json_encoded_size_of(); self.events_received.emit(CountByteSize(count, byte_size)); @@ -45,7 +75,7 @@ impl LogsService for Service { self.pipeline .clone() - .send_batch_named(LOGS, events) + .send_batch_named(log_name, events) .map_err(|error| { let message = error.to_string(); emit!(StreamClosedError { count }); @@ -53,9 +83,7 @@ impl LogsService for Service { }) .and_then(|_| handle_batch_status(receiver)) .await?; - Ok(Response::new(ExportLogsServiceResponse { - partial_success: None, - })) + Ok(()) } } diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index ec0e0c020f70f..0ee3ca5cfecfe 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -13,8 +13,9 @@ use tracing::Span; use vector_lib::internal_event::{ ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered, }; -use vector_lib::opentelemetry::proto::collector::logs::v1::{ - ExportLogsServiceRequest, ExportLogsServiceResponse, +use vector_lib::opentelemetry::proto::collector::{ + logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse}, + trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse}, }; use vector_lib::tls::MaybeTlsIncomingStream; use vector_lib::{ @@ -86,6 +87,18 @@ pub(crate) fn build_warp_filter( out: SourceSender, bytes_received: Registered, events_received: Registered, +) -> BoxedFilter<(Response,)> { + let log_filters = build_warp_log_filter(acknowledgements, log_namespace, out.clone(), bytes_received.clone(), events_received.clone()); + let trace_filters = build_warp_trace_filter(acknowledgements, out.clone(), bytes_received, events_received); + log_filters.or(trace_filters).unify().boxed() +} + +fn build_warp_log_filter( + acknowledgements: bool, + log_namespace: LogNamespace, + out: SourceSender, + bytes_received: Registered, + events_received: Registered, ) -> BoxedFilter<(Response,)> { warp::post() .and(warp::path!("v1" / "logs")) @@ -98,15 +111,65 @@ pub(crate) fn build_warp_filter( .and_then(move |encoding_header: Option, body: Bytes| { let events = decode(encoding_header.as_deref(), body).and_then(|body| { bytes_received.emit(ByteSize(body.len())); - decode_body(body, log_namespace, &events_received) + decode_log_body(body, log_namespace, &events_received) + }); + + handle_request(events, acknowledgements, out.clone(), super::LOGS, ExportLogsServiceResponse::default()) + }) + .boxed() +} + +fn build_warp_trace_filter( + acknowledgements: bool, + out: SourceSender, + bytes_received: Registered, + events_received: Registered, +) -> BoxedFilter<(Response,)> { + warp::post() + .and(warp::path!("v1" / "traces")) + .and(warp::header::exact_ignore_case( + "content-type", + "application/x-protobuf", + )) + .and(warp::header::optional::("content-encoding")) + .and(warp::body::bytes()) + .and_then(move |encoding_header: Option, body: Bytes| { + let events = decode(encoding_header.as_deref(), body).and_then(|body| { + bytes_received.emit(ByteSize(body.len())); + decode_trace_body(body, &events_received) }); - handle_request(events, acknowledgements, out.clone(), super::LOGS) + handle_request(events, acknowledgements, out.clone(), super::TRACES, ExportTraceServiceResponse::default()) }) .boxed() } -fn decode_body( +fn decode_trace_body( + body: Bytes, + events_received: &Registered, +) -> Result, ErrorMessage> { + let request = ExportTraceServiceRequest::decode(body).map_err(|error| { + ErrorMessage::new( + StatusCode::BAD_REQUEST, + format!("Could not decode request: {}", error), + ) + })?; + + let events: Vec = request + .resource_spans + .into_iter() + .flat_map(|v| v.into_event_iter()) + .collect(); + + events_received.emit(CountByteSize( + events.len(), + events.estimated_json_encoded_size_of(), + )); + + Ok(events) +} + +fn decode_log_body( body: Bytes, log_namespace: LogNamespace, events_received: &Registered, @@ -137,7 +200,9 @@ async fn handle_request( acknowledgements: bool, mut out: SourceSender, output: &str, -) -> Result { + resp: impl Message, +) -> Result +{ match events { Ok(mut events) => { let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events); @@ -149,14 +214,10 @@ async fn handle_request( })?; match receiver { - None => Ok(protobuf(ExportLogsServiceResponse { - partial_success: None, - }) + None => Ok(protobuf(resp) .into_response()), Some(receiver) => match receiver.await { - BatchStatus::Delivered => Ok(protobuf(ExportLogsServiceResponse { - partial_success: None, - }) + BatchStatus::Delivered => Ok(protobuf(resp) .into_response()), BatchStatus::Errored => Err(warp::reject::custom(Status { code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index 86cd2e23455c3..6a0ffa269c2cd 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -16,10 +16,14 @@ use vector_lib::opentelemetry::convert::{ ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY, RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY, }; +use tonic::codec::CompressionEncoding; use vector_lib::configurable::configurable_component; use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; -use vector_lib::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsServiceServer; +use vector_lib::opentelemetry::proto::collector::{ + trace::v1::trace_service_server::TraceServiceServer, + //logs::v1::logs_service_server::LogsServiceServer, +}; use vector_lib::{ config::{log_schema, LegacyKey, LogNamespace}, schema::Definition, @@ -42,6 +46,7 @@ use crate::{ }; pub const LOGS: &str = "logs"; +pub const TRACES: &str = "traces"; /// Configuration for the `opentelemetry` source. #[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))] @@ -129,6 +134,7 @@ impl GenerateConfig for OpentelemetryConfig { } } + #[async_trait::async_trait] #[typetag::serde(name = "opentelemetry")] impl SourceConfig for OpentelemetryConfig { @@ -138,20 +144,28 @@ impl SourceConfig for OpentelemetryConfig { let log_namespace = cx.log_namespace(self.log_namespace); let grpc_tls_settings = MaybeTlsSettings::from_config(&self.grpc.tls, true)?; - let grpc_service = LogsServiceServer::new(Service { + // let log_service = LogsServiceServer::new(Service { + // pipeline: cx.out.clone(), + // acknowledgements, + // log_namespace, + // events_received: events_received.clone(), + // }) + // .accept_compressed(CompressionEncoding::Gzip) + // .max_decoding_message_size(usize::MAX); + + let trace_service = TraceServiceServer::new(Service { pipeline: cx.out.clone(), acknowledgements, log_namespace, events_received: events_received.clone(), }) - .accept_compressed(tonic::codec::CompressionEncoding::Gzip) - // Tonic added a default of 4MB in 0.9. This replaces the old behavior. + .accept_compressed(CompressionEncoding::Gzip) .max_decoding_message_size(usize::MAX); let grpc_source = run_grpc_server( self.grpc.address, grpc_tls_settings, - grpc_service, + trace_service, cx.shutdown.clone(), ) .map_err(|error| { @@ -269,7 +283,10 @@ impl SourceConfig for OpentelemetryConfig { } }; - vec![SourceOutput::new_logs(DataType::Log, schema_definition).with_port(LOGS)] + vec![ + SourceOutput::new_logs(DataType::Log, schema_definition).with_port(LOGS), + SourceOutput::new_traces().with_port(TRACES), + ] } fn resources(&self) -> Vec { From 151cc80f27f2dea63ea1fe9be0632ab55b58ff3e Mon Sep 17 00:00:00 2001 From: ronaldoliu Date: Sat, 27 Jan 2024 10:33:25 +0800 Subject: [PATCH 02/16] feat: add tuple grpc services --- src/sources/opentelemetry/mod.rs | 25 ++++++++++--------- src/sources/util/grpc/mod.rs | 42 ++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 12 deletions(-) diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index 6a0ffa269c2cd..c8820b1abc112 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -22,7 +22,7 @@ use vector_lib::configurable::configurable_component; use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; use vector_lib::opentelemetry::proto::collector::{ trace::v1::trace_service_server::TraceServiceServer, - //logs::v1::logs_service_server::LogsServiceServer, + logs::v1::logs_service_server::LogsServiceServer, }; use vector_lib::{ config::{log_schema, LegacyKey, LogNamespace}, @@ -41,7 +41,7 @@ use crate::{ }, http::KeepaliveConfig, serde::bool_or_struct, - sources::{util::grpc::run_grpc_server, Source}, + sources::{util::grpc::run_grpc_tuple_server, Source}, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; @@ -144,14 +144,15 @@ impl SourceConfig for OpentelemetryConfig { let log_namespace = cx.log_namespace(self.log_namespace); let grpc_tls_settings = MaybeTlsSettings::from_config(&self.grpc.tls, true)?; - // let log_service = LogsServiceServer::new(Service { - // pipeline: cx.out.clone(), - // acknowledgements, - // log_namespace, - // events_received: events_received.clone(), - // }) - // .accept_compressed(CompressionEncoding::Gzip) - // .max_decoding_message_size(usize::MAX); + + let log_service = LogsServiceServer::new(Service { + pipeline: cx.out.clone(), + acknowledgements, + log_namespace, + events_received: events_received.clone(), + }) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX); let trace_service = TraceServiceServer::new(Service { pipeline: cx.out.clone(), @@ -162,10 +163,10 @@ impl SourceConfig for OpentelemetryConfig { .accept_compressed(CompressionEncoding::Gzip) .max_decoding_message_size(usize::MAX); - let grpc_source = run_grpc_server( + let grpc_source = run_grpc_tuple_server( self.grpc.address, grpc_tls_settings, - trace_service, + (log_service, trace_service), cx.shutdown.clone(), ) .map_err(|error| { diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index c1976a4ac8fb5..de4c263469d40 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -63,6 +63,48 @@ where Ok(()) } +// This is a bit of a ugly hack to allow us to run two services on the same port. +// I just don't know how to convert the generic type with associated types into a Vec>. +pub async fn run_grpc_tuple_server( + address: SocketAddr, + tls_settings: MaybeTlsSettings, + services: (S,T), + shutdown: ShutdownSignal, +) -> crate::Result<()> +where + S: Service, Response = Response, Error = Infallible> + + NamedService + + Clone + + Send + + 'static, + S::Future: Send + 'static, + T: Service, Response = Response, Error = Infallible> + + NamedService + + Clone + + Send + + 'static, + T::Future: Send + 'static, +{ + let span = Span::current(); + let (tx, rx) = tokio::sync::oneshot::channel::(); + let listener = tls_settings.bind(&address).await?; + let stream = listener.accept_stream(); + + info!(%address, "Building gRPC server."); + + Server::builder() + .layer(build_grpc_trace_layer(span.clone())) + .layer(DecompressionAndMetricsLayer) + .add_service(services.0) + .add_service(services.1) + .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap())) + .await?; + + drop(rx.await); + + Ok(()) +} + /// Builds a [TraceLayer] configured for a gRPC server. /// /// This layer emits gPRC specific telemetry for messages received/sent and handler duration. From 7cb2b528806cf868990ef56da2540d1ee98823fc Mon Sep 17 00:00:00 2001 From: caibirdme Date: Sat, 27 Jan 2024 11:03:20 +0800 Subject: [PATCH 03/16] fmt --- lib/opentelemetry-proto/build.rs | 2 +- lib/opentelemetry-proto/src/convert.rs | 87 +++++++++++++++++++------- src/sources/opentelemetry/grpc.rs | 14 ++++- src/sources/opentelemetry/http.rs | 40 +++++++++--- src/sources/opentelemetry/mod.rs | 7 +-- src/sources/util/grpc/mod.rs | 6 +- 6 files changed, 114 insertions(+), 42 deletions(-) diff --git a/lib/opentelemetry-proto/build.rs b/lib/opentelemetry-proto/build.rs index 5ecd955812ded..beaf9f4c417aa 100644 --- a/lib/opentelemetry-proto/build.rs +++ b/lib/opentelemetry-proto/build.rs @@ -17,4 +17,4 @@ fn main() -> Result<(), Error> { )?; Ok(()) -} \ No newline at end of file +} diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index e005431e4ed30..9bb3917477050 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -1,20 +1,26 @@ -use std::collections::BTreeMap; -use vrl::value::KeyString; use bytes::Bytes; use chrono::{DateTime, TimeZone, Utc}; use lookup::path; use ordered_float::NotNan; +use std::collections::BTreeMap; use vector_core::{ config::{log_schema, LegacyKey, LogNamespace}, event::{Event, LogEvent, TraceEvent}, }; -use vrl::{event_path, value::{ObjectMap, Value}}; +use vrl::value::KeyString; +use vrl::{ + event_path, + value::{ObjectMap, Value}, +}; use super::proto::{ common::v1::{any_value::Value as PBValue, KeyValue}, logs::v1::{LogRecord, ResourceLogs, SeverityNumber}, - trace::v1::{ResourceSpans, Span, Status as SpanStatus, span::{Event as SpanEvent, Link}}, resource::v1::Resource, + trace::v1::{ + span::{Event as SpanEvent, Link}, + ResourceSpans, Span, Status as SpanStatus, + }, }; const SOURCE_NAME: &str = "opentelemetry"; @@ -52,7 +58,7 @@ impl ResourceSpans { let resource = self.resource; let now = Utc::now(); - self.scope_spans + self.scope_spans .into_iter() .flat_map(|instrumentation_library_spans| instrumentation_library_spans.spans) .map(move |span| { @@ -118,27 +124,57 @@ impl ResourceSpan { trace.insert(event_path!("trace_id"), Value::from(span.trace_id)); trace.insert(event_path!("span_id"), Value::from(span.span_id)); trace.insert(event_path!("trace_state"), span.trace_state); - trace.insert(event_path!("parent_span_id"), Value::from(span.parent_span_id)); + trace.insert( + event_path!("parent_span_id"), + Value::from(span.parent_span_id), + ); trace.insert(event_path!("name"), span.name); trace.insert(event_path!("kind"), span.kind); - trace.insert(event_path!("start_time_unix_nano"), Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64))); - trace.insert(event_path!("end_time_unix_nano"), Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64))); + trace.insert( + event_path!("start_time_unix_nano"), + Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64)), + ); + trace.insert( + event_path!("end_time_unix_nano"), + Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64)), + ); if !span.attributes.is_empty() { - trace.insert(event_path!("attributes"), kv_list_into_value(span.attributes)); + trace.insert( + event_path!("attributes"), + kv_list_into_value(span.attributes), + ); } - trace.insert(event_path!("dropped_attributes_count"), Value::from(span.dropped_attributes_count)); + trace.insert( + event_path!("dropped_attributes_count"), + Value::from(span.dropped_attributes_count), + ); if !span.events.is_empty() { - trace.insert(event_path!("events"), Value::Array(span.events.into_iter().map(Into::into).collect())); + trace.insert( + event_path!("events"), + Value::Array(span.events.into_iter().map(Into::into).collect()), + ); } - trace.insert(event_path!("dropped_events_count"), Value::from(span.dropped_events_count)); + trace.insert( + event_path!("dropped_events_count"), + Value::from(span.dropped_events_count), + ); if !span.links.is_empty() { - trace.insert(event_path!("links"), Value::Array(span.links.into_iter().map(Into::into).collect())); + trace.insert( + event_path!("links"), + Value::Array(span.links.into_iter().map(Into::into).collect()), + ); } - trace.insert(event_path!("dropped_links_count"), Value::from(span.dropped_links_count)); + trace.insert( + event_path!("dropped_links_count"), + Value::from(span.dropped_links_count), + ); trace.insert(event_path!("status"), Value::from(span.status)); if let Some(resource) = self.resource { if !resource.attributes.is_empty() { - trace.insert(event_path!(RESOURCE_KEY), kv_list_into_value(resource.attributes)); + trace.insert( + event_path!(RESOURCE_KEY), + kv_list_into_value(resource.attributes), + ); } } trace.insert(event_path!("ingest_timestamp"), Value::from(now)); @@ -290,23 +326,32 @@ impl ResourceLog { impl From for Value { fn from(ev: SpanEvent) -> Self { - let mut obj: BTreeMap = BTreeMap::new(); + let mut obj: BTreeMap = BTreeMap::new(); obj.insert("name".into(), ev.name.into()); - obj.insert("time_unix_nano".into(), Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64))); + obj.insert( + "time_unix_nano".into(), + Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64)), + ); obj.insert("attributes".into(), kv_list_into_value(ev.attributes)); - obj.insert("dropped_attributes_count".into(), Value::Integer(ev.dropped_attributes_count as i64)); + obj.insert( + "dropped_attributes_count".into(), + Value::Integer(ev.dropped_attributes_count as i64), + ); Value::Object(obj) } } impl From for Value { fn from(link: Link) -> Self { - let mut obj: BTreeMap = BTreeMap::new(); + let mut obj: BTreeMap = BTreeMap::new(); obj.insert("trace_id".into(), Value::from(link.trace_id)); obj.insert("span_id".into(), Value::from(link.span_id)); obj.insert("trace_state".into(), link.trace_state.into()); obj.insert("attributes".into(), kv_list_into_value(link.attributes)); - obj.insert("dropped_attributes_count".into(), Value::Integer(link.dropped_attributes_count as i64)); + obj.insert( + "dropped_attributes_count".into(), + Value::Integer(link.dropped_attributes_count as i64), + ); Value::Object(obj) } } @@ -318,4 +363,4 @@ impl From for Value { obj.insert("code".into(), status.code.into()); Value::Object(obj) } -} \ No newline at end of file +} diff --git a/src/sources/opentelemetry/grpc.rs b/src/sources/opentelemetry/grpc.rs index e372335fc5536..600fdfdf0d437 100644 --- a/src/sources/opentelemetry/grpc.rs +++ b/src/sources/opentelemetry/grpc.rs @@ -2,8 +2,12 @@ use futures::TryFutureExt; use tonic::{Request, Response, Status}; use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered}; use vector_lib::opentelemetry::proto::collector::{ - trace::v1::{trace_service_server::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse}, - logs::v1::{logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse}, + logs::v1::{ + logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, + }, + trace::v1::{ + trace_service_server::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, + }, }; use vector_lib::{ config::LogNamespace, @@ -66,7 +70,11 @@ impl LogsService for Service { } impl Service { - async fn handle_events(&self, mut events: Vec, log_name: &'static str) -> Result<(), Status> { + async fn handle_events( + &self, + mut events: Vec, + log_name: &'static str, + ) -> Result<(), Status> { let count = events.len(); let byte_size = events.estimated_json_encoded_size_of(); self.events_received.emit(CountByteSize(count, byte_size)); diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 0ee3ca5cfecfe..eb2aeeae231bb 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -88,8 +88,19 @@ pub(crate) fn build_warp_filter( bytes_received: Registered, events_received: Registered, ) -> BoxedFilter<(Response,)> { - let log_filters = build_warp_log_filter(acknowledgements, log_namespace, out.clone(), bytes_received.clone(), events_received.clone()); - let trace_filters = build_warp_trace_filter(acknowledgements, out.clone(), bytes_received, events_received); + let log_filters = build_warp_log_filter( + acknowledgements, + log_namespace, + out.clone(), + bytes_received.clone(), + events_received.clone(), + ); + let trace_filters = build_warp_trace_filter( + acknowledgements, + out.clone(), + bytes_received, + events_received, + ); log_filters.or(trace_filters).unify().boxed() } @@ -114,7 +125,13 @@ fn build_warp_log_filter( decode_log_body(body, log_namespace, &events_received) }); - handle_request(events, acknowledgements, out.clone(), super::LOGS, ExportLogsServiceResponse::default()) + handle_request( + events, + acknowledgements, + out.clone(), + super::LOGS, + ExportLogsServiceResponse::default(), + ) }) .boxed() } @@ -139,7 +156,13 @@ fn build_warp_trace_filter( decode_trace_body(body, &events_received) }); - handle_request(events, acknowledgements, out.clone(), super::TRACES, ExportTraceServiceResponse::default()) + handle_request( + events, + acknowledgements, + out.clone(), + super::TRACES, + ExportTraceServiceResponse::default(), + ) }) .boxed() } @@ -201,8 +224,7 @@ async fn handle_request( mut out: SourceSender, output: &str, resp: impl Message, -) -> Result -{ +) -> Result { match events { Ok(mut events) => { let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events); @@ -214,11 +236,9 @@ async fn handle_request( })?; match receiver { - None => Ok(protobuf(resp) - .into_response()), + None => Ok(protobuf(resp).into_response()), Some(receiver) => match receiver.await { - BatchStatus::Delivered => Ok(protobuf(resp) - .into_response()), + BatchStatus::Delivered => Ok(protobuf(resp).into_response()), BatchStatus::Errored => Err(warp::reject::custom(Status { code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here message: "Error delivering contents to sink".into(), diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index c8820b1abc112..338f4a72c1a6d 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -11,18 +11,18 @@ mod status; use std::net::SocketAddr; use futures::{future::join, FutureExt, TryFutureExt}; +use tonic::codec::CompressionEncoding; use vector_lib::lookup::{owned_value_path, OwnedTargetPath}; use vector_lib::opentelemetry::convert::{ ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY, RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY, }; -use tonic::codec::CompressionEncoding; use vector_lib::configurable::configurable_component; use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; use vector_lib::opentelemetry::proto::collector::{ - trace::v1::trace_service_server::TraceServiceServer, logs::v1::logs_service_server::LogsServiceServer, + trace::v1::trace_service_server::TraceServiceServer, }; use vector_lib::{ config::{log_schema, LegacyKey, LogNamespace}, @@ -134,7 +134,6 @@ impl GenerateConfig for OpentelemetryConfig { } } - #[async_trait::async_trait] #[typetag::serde(name = "opentelemetry")] impl SourceConfig for OpentelemetryConfig { @@ -144,7 +143,7 @@ impl SourceConfig for OpentelemetryConfig { let log_namespace = cx.log_namespace(self.log_namespace); let grpc_tls_settings = MaybeTlsSettings::from_config(&self.grpc.tls, true)?; - + let log_service = LogsServiceServer::new(Service { pipeline: cx.out.clone(), acknowledgements, diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index de4c263469d40..8aa456a52a4ba 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -63,12 +63,12 @@ where Ok(()) } -// This is a bit of a ugly hack to allow us to run two services on the same port. +// This is a bit of a ugly hack to allow us to run two services on the same port. // I just don't know how to convert the generic type with associated types into a Vec>. -pub async fn run_grpc_tuple_server( +pub async fn run_grpc_tuple_server( address: SocketAddr, tls_settings: MaybeTlsSettings, - services: (S,T), + services: (S, T), shutdown: ShutdownSignal, ) -> crate::Result<()> where From 247f4cac3879a46f8544d1a0a34ce305b371d6c4 Mon Sep 17 00:00:00 2001 From: Deen Liu Date: Fri, 19 Apr 2024 17:52:35 +0800 Subject: [PATCH 04/16] add changelog.d --- changelog.d/opentelemetry_trace.feature.md | 3 +++ src/sources/opentelemetry/mod.rs | 9 ++++++--- src/sources/util/grpc/mod.rs | 21 ++++----------------- 3 files changed, 13 insertions(+), 20 deletions(-) create mode 100644 changelog.d/opentelemetry_trace.feature.md diff --git a/changelog.d/opentelemetry_trace.feature.md b/changelog.d/opentelemetry_trace.feature.md new file mode 100644 index 0000000000000..db6fcee7990f2 --- /dev/null +++ b/changelog.d/opentelemetry_trace.feature.md @@ -0,0 +1,3 @@ +Support ingesting opentelemetry traces(Experimentally) + +authors: caibirdme \ No newline at end of file diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index 338f4a72c1a6d..a1f55faf100f4 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -29,6 +29,7 @@ use vector_lib::{ schema::Definition, }; use vrl::value::{kind::Collection, Kind}; +use tonic::transport::server::RoutesBuilder; use self::{ grpc::Service, @@ -41,7 +42,7 @@ use crate::{ }, http::KeepaliveConfig, serde::bool_or_struct, - sources::{util::grpc::run_grpc_tuple_server, Source}, + sources::{util::grpc::run_grpc_server_with_routes, Source}, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; @@ -162,10 +163,12 @@ impl SourceConfig for OpentelemetryConfig { .accept_compressed(CompressionEncoding::Gzip) .max_decoding_message_size(usize::MAX); - let grpc_source = run_grpc_tuple_server( + let mut builder = RoutesBuilder::default(); + builder.add_service(log_service).add_service(trace_service); + let grpc_source = run_grpc_server_with_routes( self.grpc.address, grpc_tls_settings, - (log_service, trace_service), + builder.routes(), cx.shutdown.clone(), ) .map_err(|error| { diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index 8aa456a52a4ba..b944550e2feba 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -17,6 +17,7 @@ use tower_http::{ trace::TraceLayer, }; use tracing::Span; +use tonic::transport::server::Routes; mod decompression; pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer}; @@ -65,25 +66,12 @@ where // This is a bit of a ugly hack to allow us to run two services on the same port. // I just don't know how to convert the generic type with associated types into a Vec>. -pub async fn run_grpc_tuple_server( +pub async fn run_grpc_server_with_routes( address: SocketAddr, tls_settings: MaybeTlsSettings, - services: (S, T), + routes: Routes, shutdown: ShutdownSignal, ) -> crate::Result<()> -where - S: Service, Response = Response, Error = Infallible> - + NamedService - + Clone - + Send - + 'static, - S::Future: Send + 'static, - T: Service, Response = Response, Error = Infallible> - + NamedService - + Clone - + Send - + 'static, - T::Future: Send + 'static, { let span = Span::current(); let (tx, rx) = tokio::sync::oneshot::channel::(); @@ -95,8 +83,7 @@ where Server::builder() .layer(build_grpc_trace_layer(span.clone())) .layer(DecompressionAndMetricsLayer) - .add_service(services.0) - .add_service(services.1) + .add_routes(routes) .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap())) .await?; From 7b38451964f77731ab47ae34980c2b55758d77c1 Mon Sep 17 00:00:00 2001 From: Deen Liu Date: Fri, 19 Apr 2024 18:40:59 +0800 Subject: [PATCH 05/16] add intergration tests --- .../opentelemetry/integration_tests.rs | 82 +++++++++++++++++++ tests/data/opentelemetry/config.yaml | 3 + 2 files changed, 85 insertions(+) diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index 9fc735b21abdc..915e23b295561 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use itertools::Itertools; use serde_json::json; use crate::{ @@ -11,6 +12,12 @@ use crate::{ retry_until, wait_for_tcp, }, }; +use prost::Message; + +use vector_lib::opentelemetry::proto::{ + collector::trace::v1::ExportTraceServiceRequest, + trace::v1::{ScopeSpans, ResourceSpans, Span} +}; use super::{tests::new_source, GrpcConfig, HttpConfig, OpentelemetryConfig}; @@ -97,6 +104,81 @@ async fn receive_logs_legacy_namespace() { .await; } +#[tokio::test] +async fn receive_trace() { + // generate a trace request + let req = ExportTraceServiceRequest{ + resource_spans: vec![ResourceSpans{ + resource: None, + scope_spans: vec![ScopeSpans{ + scope: None, + spans: vec![Span{ + trace_id: (1..17).collect_vec(), + span_id: (1..9).collect_vec(), + parent_span_id: (1..9).collect_vec(), + name: "span".to_string(), + kind: 1, + start_time_unix_nano: 123456789, + end_time_unix_nano: 987654321, + attributes: vec![], + dropped_attributes_count: 0, + events: vec![], + dropped_events_count: 0, + links: vec![], + dropped_links_count: 0, + status: None, + trace_state: "".to_string(), + }], + schema_url: "world".to_string(), + }], + schema_url: "hello".to_string(), + }], + }; + let body = req.encode_to_vec(); + + assert_source_compliance(&SOURCE_TAGS, async { + wait_ready(otel_health_url()).await; + + let config = OpentelemetryConfig { + grpc: GrpcConfig { + address: source_grpc_address().parse().unwrap(), + tls: Default::default(), + }, + http: HttpConfig { + address: source_http_address().parse().unwrap(), + tls: Default::default(), + keepalive: Default::default(), + }, + acknowledgements: Default::default(), + log_namespace: Default::default(), + }; + + let (sender, trace_output, _) = new_source(EventStatus::Delivered); + let server = config + .build(SourceContext::new_test(sender, None)) + .await + .unwrap(); + tokio::spawn(server); + wait_for_tcp(source_grpc_address()).await; + wait_for_tcp(source_http_address()).await; + + let client = reqwest::Client::new(); + let _res = client + .post(format!("{}/v1/traces", otel_otlp_url())) + .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf") + .body(body) + .send() + .await + .expect("Failed to send traces to Opentelemetry Collector."); + + // The Opentelemetry Collector is configured to send to both the gRPC and HTTP endpoints + // so we should expect to collect two events from the single log sent. + let events = collect_n(trace_output, 2).await; + assert_eq!(events.len(), 2); + }) + .await; +} + async fn wait_ready(address: String) { retry_until( || async { diff --git a/tests/data/opentelemetry/config.yaml b/tests/data/opentelemetry/config.yaml index 84b5a89372356..1bb4ab7021b62 100644 --- a/tests/data/opentelemetry/config.yaml +++ b/tests/data/opentelemetry/config.yaml @@ -26,3 +26,6 @@ service: logs: receivers: [otlp] exporters: [otlp,otlphttp] + traces: + receivers: [otlp] + exporters: [otlp,otlphttp] From 4cbaa229d0f34d897f585ec8eed107d562dbeb68 Mon Sep 17 00:00:00 2001 From: Deen Liu Date: Fri, 19 Apr 2024 18:43:37 +0800 Subject: [PATCH 06/16] remove .vscode in gitignore --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index 65279205dd3de..24b0593d51d6c 100644 --- a/.gitignore +++ b/.gitignore @@ -16,7 +16,6 @@ node_modules tests/data/wasm/*/target heaptrack.* massif.* -.vscode # tilt tilt_modules/ From 441b5fff1ab30afd670b825cc4b85f66f2cb91dc Mon Sep 17 00:00:00 2001 From: Deen Liu Date: Fri, 19 Apr 2024 18:53:47 +0800 Subject: [PATCH 07/16] cue --- .../components/sources/opentelemetry.cue | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/website/cue/reference/components/sources/opentelemetry.cue b/website/cue/reference/components/sources/opentelemetry.cue index 415039485ced6..b2ac00d8fe942 100644 --- a/website/cue/reference/components/sources/opentelemetry.cue +++ b/website/cue/reference/components/sources/opentelemetry.cue @@ -41,7 +41,7 @@ components: sources: opentelemetry: { requirements: [] warnings: [ """ - The `opentelemetry` source only supports log events at this time. + The `opentelemetry` source only supports log and trace events at this time. """, ] notices: [] @@ -60,6 +60,12 @@ components: sources: opentelemetry: { Received log events will go to this output stream. Use `.logs` as an input to downstream transforms and sinks. """ }, + { + name: "traces" + description: """ + Received trace events will go to this output stream. Use `.traces` as an input to downstream transforms and sinks. + """ + }, ] output: { @@ -203,6 +209,13 @@ components: sources: opentelemetry: { [OpenSSL configuration file](\(urls.openssl_conf)). The file location defaults to `/usr/local/ssl/openssl.cnf` or can be specified with the `OPENSSL_CONF` environment variable. """ + }, + traces: { + title: "Ingest OTLP traces" + body: """ + Vector now support ingest opentelemetry trace in proto format(JSON is not supported). The trace + data is then converted as log, and you can use vrl to remap it to whatever you want. + """ } } From 80cb66a3f31eee23b2b2b52ad5f8bb2dbb7ec07c Mon Sep 17 00:00:00 2001 From: Deen Liu Date: Fri, 19 Apr 2024 19:17:03 +0800 Subject: [PATCH 08/16] add int test --- .../opentelemetry/integration_tests.rs | 24 +++++++++---------- src/sources/opentelemetry/mod.rs | 2 +- src/sources/util/grpc/mod.rs | 5 ++-- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index 915e23b295561..f72f4a5e7b822 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -16,7 +16,7 @@ use prost::Message; use vector_lib::opentelemetry::proto::{ collector::trace::v1::ExportTraceServiceRequest, - trace::v1::{ScopeSpans, ResourceSpans, Span} + trace::v1::{ResourceSpans, ScopeSpans, Span}, }; use super::{tests::new_source, GrpcConfig, HttpConfig, OpentelemetryConfig}; @@ -107,19 +107,19 @@ async fn receive_logs_legacy_namespace() { #[tokio::test] async fn receive_trace() { // generate a trace request - let req = ExportTraceServiceRequest{ - resource_spans: vec![ResourceSpans{ + let req = ExportTraceServiceRequest { + resource_spans: vec![ResourceSpans { resource: None, - scope_spans: vec![ScopeSpans{ + scope_spans: vec![ScopeSpans { scope: None, - spans: vec![Span{ - trace_id: (1..17).collect_vec(), - span_id: (1..9).collect_vec(), - parent_span_id: (1..9).collect_vec(), + spans: vec![Span { + trace_id: (1..17).collect_vec(), //trace_id [u8;16] + span_id: (1..9).collect_vec(), // span_id [u8;8] + parent_span_id: (1..9).collect_vec(), // parent_span_id [u8;8] name: "span".to_string(), kind: 1, - start_time_unix_nano: 123456789, - end_time_unix_nano: 987654321, + start_time_unix_nano: 1713525203000000000, + end_time_unix_nano: 1713525205000000000, attributes: vec![], dropped_attributes_count: 0, events: vec![], @@ -129,9 +129,9 @@ async fn receive_trace() { status: None, trace_state: "".to_string(), }], - schema_url: "world".to_string(), + schema_url: "".to_string(), }], - schema_url: "hello".to_string(), + schema_url: "".to_string(), }], }; let body = req.encode_to_vec(); diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index a1f55faf100f4..74ad6c6547d88 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -18,6 +18,7 @@ use vector_lib::opentelemetry::convert::{ SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY, }; +use tonic::transport::server::RoutesBuilder; use vector_lib::configurable::configurable_component; use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; use vector_lib::opentelemetry::proto::collector::{ @@ -29,7 +30,6 @@ use vector_lib::{ schema::Definition, }; use vrl::value::{kind::Collection, Kind}; -use tonic::transport::server::RoutesBuilder; use self::{ grpc::Service, diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index b944550e2feba..1052318538bc6 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -7,6 +7,7 @@ use futures::FutureExt; use http::{Request, Response}; use hyper::Body; use std::{convert::Infallible, net::SocketAddr, time::Duration}; +use tonic::transport::server::Routes; use tonic::{ body::BoxBody, transport::server::{NamedService, Server}, @@ -17,7 +18,6 @@ use tower_http::{ trace::TraceLayer, }; use tracing::Span; -use tonic::transport::server::Routes; mod decompression; pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer}; @@ -71,8 +71,7 @@ pub async fn run_grpc_server_with_routes( tls_settings: MaybeTlsSettings, routes: Routes, shutdown: ShutdownSignal, -) -> crate::Result<()> -{ +) -> crate::Result<()> { let span = Span::current(); let (tx, rx) = tokio::sync::oneshot::channel::(); let listener = tls_settings.bind(&address).await?; From 35eb17e481b2e5ca40c6899d812a6186cff1b6dd Mon Sep 17 00:00:00 2001 From: caibirdme Date: Fri, 19 Apr 2024 21:28:45 +0800 Subject: [PATCH 09/16] hex trace_id span_id --- lib/opentelemetry-proto/src/convert.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index 9bb3917477050..d59d043319515 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -115,18 +115,25 @@ fn kv_list_into_value(arr: Vec) -> Value { ) } +fn to_hex(d: &[u8]) -> String { + if d.is_empty() { + return "".to_string(); + } + hex::encode(d) +} + // Unlike log events(log body + metadata), trace spans are just metadata, so we don't handle log_namespace here, // insert all attributes into log root, just like what datadog_agent/traces does. impl ResourceSpan { fn into_event(self, now: DateTime) -> Event { let mut trace = TraceEvent::default(); let span = self.span; - trace.insert(event_path!("trace_id"), Value::from(span.trace_id)); - trace.insert(event_path!("span_id"), Value::from(span.span_id)); + trace.insert(event_path!(TRACE_ID_KEY), Value::from(to_hex(&span.trace_id))); + trace.insert(event_path!(SPAN_ID_KEY), Value::from(to_hex(&span.span_id))); trace.insert(event_path!("trace_state"), span.trace_state); trace.insert( event_path!("parent_span_id"), - Value::from(span.parent_span_id), + Value::from(to_hex(&span.parent_span_id)), ); trace.insert(event_path!("name"), span.name); trace.insert(event_path!("kind"), span.kind); @@ -140,12 +147,12 @@ impl ResourceSpan { ); if !span.attributes.is_empty() { trace.insert( - event_path!("attributes"), + event_path!(ATTRIBUTES_KEY), kv_list_into_value(span.attributes), ); } trace.insert( - event_path!("dropped_attributes_count"), + event_path!(DROPPED_ATTRIBUTES_COUNT_KEY), Value::from(span.dropped_attributes_count), ); if !span.events.is_empty() { @@ -229,7 +236,7 @@ impl ResourceLog { &mut log, Some(LegacyKey::Overwrite(path!(TRACE_ID_KEY))), path!(TRACE_ID_KEY), - Bytes::from(hex::encode(self.log_record.trace_id)), + Bytes::from(to_hex(&self.log_record.trace_id)), ); } if !self.log_record.span_id.is_empty() { @@ -238,7 +245,7 @@ impl ResourceLog { &mut log, Some(LegacyKey::Overwrite(path!(SPAN_ID_KEY))), path!(SPAN_ID_KEY), - Bytes::from(hex::encode(self.log_record.span_id)), + Bytes::from(to_hex(&self.log_record.span_id)), ); } if !self.log_record.severity_text.is_empty() { @@ -344,8 +351,8 @@ impl From for Value { impl From for Value { fn from(link: Link) -> Self { let mut obj: BTreeMap = BTreeMap::new(); - obj.insert("trace_id".into(), Value::from(link.trace_id)); - obj.insert("span_id".into(), Value::from(link.span_id)); + obj.insert("trace_id".into(), Value::from(to_hex(&link.trace_id))); + obj.insert("span_id".into(), Value::from(to_hex(&link.span_id))); obj.insert("trace_state".into(), link.trace_state.into()); obj.insert("attributes".into(), kv_list_into_value(link.attributes)); obj.insert( From 3d1eed01e489e02cfe37571c42c34c2df08c67bb Mon Sep 17 00:00:00 2001 From: caibirdme Date: Fri, 19 Apr 2024 21:37:40 +0800 Subject: [PATCH 10/16] fix int test --- lib/opentelemetry-proto/src/convert.rs | 5 ++++- src/sources/opentelemetry/integration_tests.rs | 9 +++++---- src/sources/opentelemetry/tests.rs | 11 ++++++----- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index d59d043319515..dc789d27fdd95 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -128,7 +128,10 @@ impl ResourceSpan { fn into_event(self, now: DateTime) -> Event { let mut trace = TraceEvent::default(); let span = self.span; - trace.insert(event_path!(TRACE_ID_KEY), Value::from(to_hex(&span.trace_id))); + trace.insert( + event_path!(TRACE_ID_KEY), + Value::from(to_hex(&span.trace_id)), + ); trace.insert(event_path!(SPAN_ID_KEY), Value::from(to_hex(&span.span_id))); trace.insert(event_path!("trace_state"), span.trace_state); trace.insert( diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index f72f4a5e7b822..a775e01ed5d07 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -3,6 +3,7 @@ use std::time::Duration; use itertools::Itertools; use serde_json::json; +use super::{LOGS, TRACES}; use crate::{ config::{log_schema, SourceConfig, SourceContext}, event::EventStatus, @@ -56,7 +57,7 @@ async fn receive_logs_legacy_namespace() { log_namespace: Default::default(), }; - let (sender, logs_output, _) = new_source(EventStatus::Delivered); + let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); let server = config .build(SourceContext::new_test(sender, None)) .await @@ -113,8 +114,8 @@ async fn receive_trace() { scope_spans: vec![ScopeSpans { scope: None, spans: vec![Span { - trace_id: (1..17).collect_vec(), //trace_id [u8;16] - span_id: (1..9).collect_vec(), // span_id [u8;8] + trace_id: (1..17).collect_vec(), //trace_id [u8;16] + span_id: (1..9).collect_vec(), // span_id [u8;8] parent_span_id: (1..9).collect_vec(), // parent_span_id [u8;8] name: "span".to_string(), kind: 1, @@ -153,7 +154,7 @@ async fn receive_trace() { log_namespace: Default::default(), }; - let (sender, trace_output, _) = new_source(EventStatus::Delivered); + let (sender, trace_output, _) = new_source(EventStatus::Delivered, TRACES.to_string()); let server = config .build(SourceContext::new_test(sender, None)) .await diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 628b92bb3d47b..5831f0a7beb96 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -57,7 +57,7 @@ async fn receive_grpc_logs_vector_namespace() { .remove(0) .schema_definition(true); - let (sender, logs_output, _) = new_source(EventStatus::Delivered); + let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); let server = source .build(SourceContext::new_test(sender, None)) .await @@ -195,7 +195,7 @@ async fn receive_grpc_logs_legacy_namespace() { .remove(0) .schema_definition(true); - let (sender, logs_output, _) = new_source(EventStatus::Delivered); + let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); let server = source .build(SourceContext::new_test(sender, None)) .await @@ -285,16 +285,17 @@ async fn receive_grpc_logs_legacy_namespace() { pub(super) fn new_source( status: EventStatus, + event_name: String, ) -> ( SourceSender, impl Stream, impl Stream, ) { let (mut sender, recv) = SourceSender::new_test_finalize(status); - let logs_output = sender - .add_outputs(status, LOGS.to_string()) + let output = sender + .add_outputs(status, event_name) .flat_map(into_event_stream); - (sender, logs_output, recv) + (sender, output, recv) } fn str_into_hex_bytes(s: &str) -> Vec { From 073c3851b472938c981fa91c38398e177f98a691 Mon Sep 17 00:00:00 2001 From: caibirdme Date: Tue, 23 Apr 2024 11:44:27 +0800 Subject: [PATCH 11/16] add a newline to pass the check --- changelog.d/opentelemetry_trace.feature.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/changelog.d/opentelemetry_trace.feature.md b/changelog.d/opentelemetry_trace.feature.md index db6fcee7990f2..9800fa111919b 100644 --- a/changelog.d/opentelemetry_trace.feature.md +++ b/changelog.d/opentelemetry_trace.feature.md @@ -1,3 +1,4 @@ Support ingesting opentelemetry traces(Experimentally) -authors: caibirdme \ No newline at end of file +authors: caibirdme + From be004f852fff230f4256b926b3a740c718ca0ac9 Mon Sep 17 00:00:00 2001 From: Deen Date: Fri, 26 Apr 2024 10:09:22 +0800 Subject: [PATCH 12/16] Update changelog.d/opentelemetry_trace.feature.md Co-authored-by: Stephen Wakely --- changelog.d/opentelemetry_trace.feature.md | 1 - 1 file changed, 1 deletion(-) diff --git a/changelog.d/opentelemetry_trace.feature.md b/changelog.d/opentelemetry_trace.feature.md index 9800fa111919b..544ffa86933ed 100644 --- a/changelog.d/opentelemetry_trace.feature.md +++ b/changelog.d/opentelemetry_trace.feature.md @@ -1,4 +1,3 @@ Support ingesting opentelemetry traces(Experimentally) authors: caibirdme - From 633290a9b5758e06ff1542bf65644b1609cf774a Mon Sep 17 00:00:00 2001 From: Deen Date: Fri, 26 Apr 2024 10:09:56 +0800 Subject: [PATCH 13/16] Update website/cue/reference/components/sources/opentelemetry.cue Co-authored-by: Jesse Szwedko --- website/cue/reference/components/sources/opentelemetry.cue | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/website/cue/reference/components/sources/opentelemetry.cue b/website/cue/reference/components/sources/opentelemetry.cue index b2ac00d8fe942..68a2593930ffc 100644 --- a/website/cue/reference/components/sources/opentelemetry.cue +++ b/website/cue/reference/components/sources/opentelemetry.cue @@ -213,8 +213,7 @@ components: sources: opentelemetry: { traces: { title: "Ingest OTLP traces" body: """ - Vector now support ingest opentelemetry trace in proto format(JSON is not supported). The trace - data is then converted as log, and you can use vrl to remap it to whatever you want. + Trace support is experimental and subject to change as Vector has no strongly-typed structure for traces internally. Instead traces are stored as a key/value map similar to logs. This may change in the future to be a structured format. """ } } From 79b3399258e7b386700208fb010b08f4485fe84d Mon Sep 17 00:00:00 2001 From: Deen Date: Sat, 11 May 2024 09:42:55 +0800 Subject: [PATCH 14/16] Update website/cue/reference/components/sources/opentelemetry.cue Co-authored-by: Stephen Wakely --- website/cue/reference/components/sources/opentelemetry.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/cue/reference/components/sources/opentelemetry.cue b/website/cue/reference/components/sources/opentelemetry.cue index 68a2593930ffc..37833d9445036 100644 --- a/website/cue/reference/components/sources/opentelemetry.cue +++ b/website/cue/reference/components/sources/opentelemetry.cue @@ -214,7 +214,7 @@ components: sources: opentelemetry: { title: "Ingest OTLP traces" body: """ Trace support is experimental and subject to change as Vector has no strongly-typed structure for traces internally. Instead traces are stored as a key/value map similar to logs. This may change in the future to be a structured format. - """ + """ } } From 72fe781f7e6f18aa0e20cd66ba8807ae4a0376e6 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 30 May 2024 10:35:20 +0100 Subject: [PATCH 15/16] Fix cue error Signed-off-by: Stephen Wakely --- website/cue/reference/components/sources/opentelemetry.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/cue/reference/components/sources/opentelemetry.cue b/website/cue/reference/components/sources/opentelemetry.cue index 37833d9445036..3f767b5c449bb 100644 --- a/website/cue/reference/components/sources/opentelemetry.cue +++ b/website/cue/reference/components/sources/opentelemetry.cue @@ -209,7 +209,7 @@ components: sources: opentelemetry: { [OpenSSL configuration file](\(urls.openssl_conf)). The file location defaults to `/usr/local/ssl/openssl.cnf` or can be specified with the `OPENSSL_CONF` environment variable. """ - }, + } traces: { title: "Ingest OTLP traces" body: """ From 37ae74ca22c45065bee8889fc0a5426c63265015 Mon Sep 17 00:00:00 2001 From: Deen Date: Tue, 4 Jun 2024 21:01:39 +0800 Subject: [PATCH 16/16] Update trace.proto fix ident --- .../opentelemetry/proto/trace/v1/trace.proto | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto b/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto index cd519f5b92584..864253ee41d87 100644 --- a/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto +++ b/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto @@ -170,10 +170,10 @@ message Span { // attributes is a collection of key/value pairs. Note, global attributes // like server name can be set using the resource API. Examples of attributes: // - // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" - // "/http/server_latency": 300 - // "example.com/my_attribute": true - // "example.com/score": 10.239 + // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + // "/http/server_latency": 300 + // "example.com/my_attribute": true + // "example.com/score": 10.239 // // The OpenTelemetry API specification further restricts the allowed value types: // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute