Skip to content

Commit

Permalink
feat(opentelemetry source): support trace ingestion (vectordotdev#19728)
Browse files Browse the repository at this point in the history
* feat: otlp trace

* feat: add tuple grpc services

* fmt

* add changelog.d

* add intergration tests

* remove .vscode in gitignore

* cue

* add int test

* hex trace_id span_id

* fix int test

* add a newline to pass the check

* Update changelog.d/opentelemetry_trace.feature.md

Co-authored-by: Stephen Wakely <[email protected]>

* Update website/cue/reference/components/sources/opentelemetry.cue

Co-authored-by: Jesse Szwedko <[email protected]>

* Update website/cue/reference/components/sources/opentelemetry.cue

Co-authored-by: Stephen Wakely <[email protected]>

* Fix cue error

Signed-off-by: Stephen Wakely <[email protected]>

* Update trace.proto

fix ident

---------

Signed-off-by: Stephen Wakely <[email protected]>
Co-authored-by: Stephen Wakely <[email protected]>
Co-authored-by: Jesse Szwedko <[email protected]>
Co-authored-by: Stephen Wakely <[email protected]>
  • Loading branch information
4 people authored and AndrooTheChen committed Sep 23, 2024
1 parent 6c9b059 commit c0f991b
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 44 deletions.
3 changes: 3 additions & 0 deletions changelog.d/opentelemetry_trace.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Support ingesting opentelemetry traces(Experimentally)

authors: caibirdme
2 changes: 2 additions & 0 deletions lib/opentelemetry-proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ fn main() -> Result<(), Error> {
"src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto",
],
&["src/proto/opentelemetry-proto"],
Expand Down
158 changes: 154 additions & 4 deletions lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@ use bytes::Bytes;
use chrono::{DateTime, TimeZone, Utc};
use lookup::path;
use ordered_float::NotNan;
use std::collections::BTreeMap;
use vector_core::{
config::{log_schema, LegacyKey, LogNamespace},
event::{Event, LogEvent},
event::{Event, LogEvent, TraceEvent},
};
use vrl::value::KeyString;
use vrl::{
event_path,
value::{ObjectMap, Value},
};
use vrl::value::{ObjectMap, Value};

use super::proto::{
common::v1::{any_value::Value as PBValue, KeyValue},
logs::v1::{LogRecord, ResourceLogs, SeverityNumber},
resource::v1::Resource,
trace::v1::{
span::{Event as SpanEvent, Link},
ResourceSpans, Span, Status as SpanStatus,
},
};

const SOURCE_NAME: &str = "opentelemetry";
Expand Down Expand Up @@ -44,6 +53,24 @@ impl ResourceLogs {
}
}

impl ResourceSpans {
pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
let resource = self.resource;
let now = Utc::now();

self.scope_spans
.into_iter()
.flat_map(|instrumentation_library_spans| instrumentation_library_spans.spans)
.map(move |span| {
ResourceSpan {
resource: resource.clone(),
span,
}
.into_event(now)
})
}
}

impl From<PBValue> for Value {
fn from(av: PBValue) -> Self {
match av {
Expand All @@ -68,6 +95,11 @@ struct ResourceLog {
log_record: LogRecord,
}

struct ResourceSpan {
resource: Option<Resource>,
span: Span,
}

fn kv_list_into_value(arr: Vec<KeyValue>) -> Value {
Value::Object(
arr.into_iter()
Expand All @@ -83,6 +115,83 @@ fn kv_list_into_value(arr: Vec<KeyValue>) -> Value {
)
}

fn to_hex(d: &[u8]) -> String {
if d.is_empty() {
return "".to_string();
}
hex::encode(d)
}

// Unlike log events(log body + metadata), trace spans are just metadata, so we don't handle log_namespace here,
// insert all attributes into log root, just like what datadog_agent/traces does.
impl ResourceSpan {
fn into_event(self, now: DateTime<Utc>) -> Event {
let mut trace = TraceEvent::default();
let span = self.span;
trace.insert(
event_path!(TRACE_ID_KEY),
Value::from(to_hex(&span.trace_id)),
);
trace.insert(event_path!(SPAN_ID_KEY), Value::from(to_hex(&span.span_id)));
trace.insert(event_path!("trace_state"), span.trace_state);
trace.insert(
event_path!("parent_span_id"),
Value::from(to_hex(&span.parent_span_id)),
);
trace.insert(event_path!("name"), span.name);
trace.insert(event_path!("kind"), span.kind);
trace.insert(
event_path!("start_time_unix_nano"),
Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64)),
);
trace.insert(
event_path!("end_time_unix_nano"),
Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64)),
);
if !span.attributes.is_empty() {
trace.insert(
event_path!(ATTRIBUTES_KEY),
kv_list_into_value(span.attributes),
);
}
trace.insert(
event_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
Value::from(span.dropped_attributes_count),
);
if !span.events.is_empty() {
trace.insert(
event_path!("events"),
Value::Array(span.events.into_iter().map(Into::into).collect()),
);
}
trace.insert(
event_path!("dropped_events_count"),
Value::from(span.dropped_events_count),
);
if !span.links.is_empty() {
trace.insert(
event_path!("links"),
Value::Array(span.links.into_iter().map(Into::into).collect()),
);
}
trace.insert(
event_path!("dropped_links_count"),
Value::from(span.dropped_links_count),
);
trace.insert(event_path!("status"), Value::from(span.status));
if let Some(resource) = self.resource {
if !resource.attributes.is_empty() {
trace.insert(
event_path!(RESOURCE_KEY),
kv_list_into_value(resource.attributes),
);
}
}
trace.insert(event_path!("ingest_timestamp"), Value::from(now));
trace.into()
}
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.15.0/specification/logs/data-model.md
impl ResourceLog {
fn into_event(self, log_namespace: LogNamespace, now: DateTime<Utc>) -> Event {
Expand Down Expand Up @@ -130,7 +239,7 @@ impl ResourceLog {
&mut log,
Some(LegacyKey::Overwrite(path!(TRACE_ID_KEY))),
path!(TRACE_ID_KEY),
Bytes::from(hex::encode(self.log_record.trace_id)),
Bytes::from(to_hex(&self.log_record.trace_id)),
);
}
if !self.log_record.span_id.is_empty() {
Expand All @@ -139,7 +248,7 @@ impl ResourceLog {
&mut log,
Some(LegacyKey::Overwrite(path!(SPAN_ID_KEY))),
path!(SPAN_ID_KEY),
Bytes::from(hex::encode(self.log_record.span_id)),
Bytes::from(to_hex(&self.log_record.span_id)),
);
}
if !self.log_record.severity_text.is_empty() {
Expand Down Expand Up @@ -224,3 +333,44 @@ impl ResourceLog {
log.into()
}
}

impl From<SpanEvent> for Value {
fn from(ev: SpanEvent) -> Self {
let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
obj.insert("name".into(), ev.name.into());
obj.insert(
"time_unix_nano".into(),
Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64)),
);
obj.insert("attributes".into(), kv_list_into_value(ev.attributes));
obj.insert(
"dropped_attributes_count".into(),
Value::Integer(ev.dropped_attributes_count as i64),
);
Value::Object(obj)
}
}

impl From<Link> for Value {
fn from(link: Link) -> Self {
let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
obj.insert("trace_id".into(), Value::from(to_hex(&link.trace_id)));
obj.insert("span_id".into(), Value::from(to_hex(&link.span_id)));
obj.insert("trace_state".into(), link.trace_state.into());
obj.insert("attributes".into(), kv_list_into_value(link.attributes));
obj.insert(
"dropped_attributes_count".into(),
Value::Integer(link.dropped_attributes_count as i64),
);
Value::Object(obj)
}
}

impl From<SpanStatus> for Value {
fn from(status: SpanStatus) -> Self {
let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
obj.insert("message".into(), status.message.into());
obj.insert("code".into(), status.code.into());
Value::Object(obj)
}
}
12 changes: 12 additions & 0 deletions lib/opentelemetry-proto/src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
/// Service stub and clients.
pub mod collector {
pub mod trace {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.collector.trace.v1");
}
}
pub mod logs {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.collector.logs.v1");
Expand All @@ -21,6 +26,13 @@ pub mod logs {
}
}

/// Generated types used for trace.
pub mod trace {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.trace.v1");
}
}

/// Generated types used in resources.
pub mod resource {
pub mod v1 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ message Span {
// attributes is a collection of key/value pairs. Note, global attributes
// like server name can be set using the resource API. Examples of attributes:
//
// "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
// "/http/server_latency": 300
// "example.com/my_attribute": true
// "example.com/score": 10.239
// "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
// "/http/server_latency": 300
// "example.com/my_attribute": true
// "example.com/score": 10.239
//
// The OpenTelemetry API specification further restricts the allowed value types:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute
Expand Down
52 changes: 44 additions & 8 deletions src/sources/opentelemetry/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use futures::TryFutureExt;
use tonic::{Request, Response, Status};
use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
use vector_lib::opentelemetry::proto::collector::logs::v1::{
logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse,
use vector_lib::opentelemetry::proto::collector::{
logs::v1::{
logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse,
},
trace::v1::{
trace_service_server::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse,
},
};
use vector_lib::{
config::LogNamespace,
Expand All @@ -12,7 +17,7 @@ use vector_lib::{

use crate::{
internal_events::{EventsReceived, StreamClosedError},
sources::opentelemetry::LOGS,
sources::opentelemetry::{LOGS, TRACES},
SourceSender,
};

Expand All @@ -24,19 +29,52 @@ pub(super) struct Service {
pub log_namespace: LogNamespace,
}

#[tonic::async_trait]
impl TraceService for Service {
async fn export(
&self,
request: Request<ExportTraceServiceRequest>,
) -> Result<Response<ExportTraceServiceResponse>, Status> {
let events: Vec<Event> = request
.into_inner()
.resource_spans
.into_iter()
.flat_map(|v| v.into_event_iter())
.collect();
self.handle_events(events, TRACES).await?;

Ok(Response::new(ExportTraceServiceResponse {
partial_success: None,
}))
}
}

#[tonic::async_trait]
impl LogsService for Service {
async fn export(
&self,
request: Request<ExportLogsServiceRequest>,
) -> Result<Response<ExportLogsServiceResponse>, Status> {
let mut events: Vec<Event> = request
let events: Vec<Event> = request
.into_inner()
.resource_logs
.into_iter()
.flat_map(|v| v.into_event_iter(self.log_namespace))
.collect();
self.handle_events(events, LOGS).await?;

Ok(Response::new(ExportLogsServiceResponse {
partial_success: None,
}))
}
}

impl Service {
async fn handle_events(
&self,
mut events: Vec<Event>,
log_name: &'static str,
) -> Result<(), Status> {
let count = events.len();
let byte_size = events.estimated_json_encoded_size_of();
self.events_received.emit(CountByteSize(count, byte_size));
Expand All @@ -45,17 +83,15 @@ impl LogsService for Service {

self.pipeline
.clone()
.send_batch_named(LOGS, events)
.send_batch_named(log_name, events)
.map_err(|error| {
let message = error.to_string();
emit!(StreamClosedError { count });
Status::unavailable(message)
})
.and_then(|_| handle_batch_status(receiver))
.await?;
Ok(Response::new(ExportLogsServiceResponse {
partial_success: None,
}))
Ok(())
}
}

Expand Down
Loading

0 comments on commit c0f991b

Please sign in to comment.