Skip to content

Commit

Permalink
feat(destination): Use trace and metric (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohsenHa authored Jul 10, 2024
1 parent 990f8ca commit 85224aa
Show file tree
Hide file tree
Showing 31 changed files with 1,097 additions and 205 deletions.
40 changes: 38 additions & 2 deletions adapter/etcd/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package etcd
import (
"context"
"fmt"
"github.com/ormushq/ormus/adapter/otela"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"log/slog"
"time"

Expand All @@ -21,16 +24,29 @@ type Adapter struct {
}

func New(config Config) (Adapter, error) {
return NewWithContext(context.Background(), config)
}

func NewWithContext(ctx context.Context, config Config) (Adapter, error) {
tracer := otela.NewTracer("etcd")
_, span := tracer.Start(ctx, "etcd@NewWithContext", trace.WithAttributes(
attribute.String("config", fmt.Sprintf("%+v", config))))
defer span.End()

etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
DialTimeout: time.Duration(config.DialTimeoutSeconds) * time.Second,
})
if err != nil {
slog.Error("Error creating etcd client: ", err)
span.AddEvent("error-on-connect-to-etcd", trace.WithAttributes(
attribute.String("error", err.Error())))

return Adapter{}, err
}

span.AddEvent("connected-to-etcd-successfully")

return Adapter{
client: etcdClient,
}, nil
Expand All @@ -45,17 +61,37 @@ func (a Adapter) Close() error {
}

func (a Adapter) Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error) {
session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttl)))
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttl))

tracer := otela.NewTracer("etcd")
ctx, span := tracer.Start(ctx, "etcd@Lock", trace.WithAttributes(
attribute.String("key", key)))
defer span.End()

session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttl)), concurrency.WithContext(ctx))
if err != nil {
span.AddEvent("error-on-new-session", trace.WithAttributes(
attribute.String("error", err.Error())))
cancel()

return nil, err
}

mutex := concurrency.NewMutex(session, key)
if err := mutex.Lock(ctx); err != nil {
span.AddEvent("error-on-new-mutex", trace.WithAttributes(
attribute.String("error", err.Error())))
cancel()

return nil, err
}

span.AddEvent("key-locked")

return func() error {
return mutex.Unlock(ctx)
err = mutex.Unlock(ctx)
cancel()

return err
}, nil
}
48 changes: 39 additions & 9 deletions adapter/otela/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,35 @@ package otela
import (
"context"
"fmt"

"github.com/ormushq/ormus/logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/metric"
sdkMetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
)

func (opr *otelProvider) newMetricExporter(ctx context.Context) (metric.Exporter, error) {
return otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithInsecure(), otlpmetricgrpc.WithGRPCConn(opr.conn))
// Your preferred exporter: console, jaeger, zipkin, OTLP, etc.
func (opr *otelProvider) newMetricExporter(ctx context.Context) (sdkMetric.Exporter, error) {
switch opr.exporter {
case ExporterGrpc:
return otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithInsecure(), otlpmetricgrpc.WithGRPCConn(opr.conn))
case ExporterConsole:
return stdoutmetric.New()
default:
panic("unsupported")
}

}

func (opr *otelProvider) newMetricProvider(exp metric.Exporter) *metric.MeterProvider {
func (opr *otelProvider) newMetricProvider(exp sdkMetric.Exporter) *sdkMetric.MeterProvider {
// Ensure default SDK resources and the required service name are set.
return metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(exp)),
metric.WithResource(resource.NewWithAttributes(
return sdkMetric.NewMeterProvider(
sdkMetric.WithReader(sdkMetric.NewPeriodicReader(exp)),
sdkMetric.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(op.serviceName),
)),
Expand Down Expand Up @@ -50,3 +60,23 @@ func (opr *otelProvider) initMetric() error {

return nil
}

func AddFloat64Counter(ctx context.Context, meter metric.Meter, name, desc string, cv float64, options ...metric.Float64CounterOption) {
tracer := NewTracer("otela")
ctx, span := tracer.Start(ctx, "otela@metric")
options = append(options, metric.WithDescription(desc))

processedEventCounter, err := meter.Float64Counter(name, options...)
if err != nil {
span.AddEvent("error on create counter", trace.WithAttributes(
attribute.String("error", err.Error())))
logger.L().Error("error on create counter", "error", err.Error())
} else {
processedEventCounter.Add(ctx, cv)
}
}

func IncrementFloat64Counter(ctx context.Context, meter metric.Meter, name, desc string, options ...metric.Float64CounterOption) {
cv := 1.0
AddFloat64Counter(ctx, meter, name, desc, cv, options...)
}
46 changes: 28 additions & 18 deletions adapter/otela/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,6 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

type otelProvider struct {
wg *sync.WaitGroup
done <-chan bool
conn *grpc.ClientConn
isConfigure bool
tracerProvider trace.TracerProvider
metricProvider metric.MeterProvider
serviceName string
}

type Config struct {
Endpoint string
ServiceName string
EnableMetricExpose bool
MetricExposePath string
MetricExposePort int
}

var op otelProvider

func Configure(wg *sync.WaitGroup, done <-chan bool, cfg Config) error {
Expand All @@ -54,6 +36,7 @@ func Configure(wg *sync.WaitGroup, done <-chan bool, cfg Config) error {
tracerProvider: nil,
metricProvider: nil,
serviceName: cfg.ServiceName,
exporter: cfg.Exporter,
}
err = op.initTrace()
if err != nil {
Expand Down Expand Up @@ -108,3 +91,30 @@ func initConn(cfg Config) (*grpc.ClientConn, error) {

return conn, err
}

type Exporter string

const (
ExporterGrpc = Exporter("grpc")
ExporterConsole = Exporter("console")
)

type otelProvider struct {
wg *sync.WaitGroup
done <-chan bool
conn *grpc.ClientConn
isConfigure bool
tracerProvider trace.TracerProvider
metricProvider metric.MeterProvider
serviceName string
exporter Exporter
}

type Config struct {
Endpoint string
ServiceName string
EnableMetricExpose bool
MetricExposePath string
MetricExposePort int
Exporter Exporter
}
36 changes: 33 additions & 3 deletions adapter/otela/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,28 @@ package otela
import (
"context"
"fmt"
"go.opentelemetry.io/otel/propagation"

"github.com/ormushq/ormus/logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
)

func (opr *otelProvider) newTraceExporter(ctx context.Context) (sdktrace.SpanExporter, error) {
return otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure(), otlptracegrpc.WithGRPCConn(opr.conn))
switch opr.exporter {
case ExporterGrpc:
return otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure(), otlptracegrpc.WithGRPCConn(opr.conn))
case ExporterConsole:
return stdouttrace.New()
default:
panic("unsupported")
}

}

func (opr *otelProvider) newTraceProvider(exp sdktrace.SpanExporter) *sdktrace.TracerProvider {
Expand Down Expand Up @@ -54,10 +64,30 @@ func (opr *otelProvider) initTrace() error {
return nil
}

func NewTracer(name string) trace.Tracer {
func NewTracer(name string, options ...trace.TracerOption) trace.Tracer {
if !op.isConfigure {
panic("You must configure adapter before calling NewTracer")
}

return op.tracerProvider.Tracer(name)
return op.tracerProvider.Tracer(name, options...)
}

func GetCarrierFromContext(ctx context.Context) map[string]string {
propgator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})

carrier := propagation.MapCarrier{}
propgator.Inject(ctx, carrier)

return carrier
}

func GetContextFromCarrier(carrier map[string]string) context.Context {
propgator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
c := propagation.MapCarrier{}
for k, v := range carrier {
c.Set(k, v)
}
parentCtx := propgator.Extract(context.Background(), c)

return parentCtx
}
Loading

0 comments on commit 85224aa

Please sign in to comment.