Skip to content

Commit

Permalink
refactor: docker-compose files (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
PouriaSeyfi authored Jul 17, 2024
1 parent 85224aa commit 5abba61
Show file tree
Hide file tree
Showing 51 changed files with 279 additions and 119 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ destination/worker_log.json
temp
.env
ormus.iml

vendor
6 changes: 3 additions & 3 deletions adapter/etcd/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package etcd
import (
"context"
"fmt"
"github.com/ormushq/ormus/adapter/otela"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"log/slog"
"time"

"github.com/ormushq/ormus/adapter/otela"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type Config struct {
Expand Down
4 changes: 2 additions & 2 deletions adapter/otela/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package otela
import (
"context"
"fmt"

"github.com/ormushq/ormus/logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -11,7 +12,7 @@ import (
"go.opentelemetry.io/otel/metric"
sdkMetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv/v1.25.0"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -24,7 +25,6 @@ func (opr *otelProvider) newMetricExporter(ctx context.Context) (sdkMetric.Expor
default:
panic("unsupported")
}

}

func (opr *otelProvider) newMetricProvider(exp sdkMetric.Exporter) *sdkMetric.MeterProvider {
Expand Down
1 change: 1 addition & 0 deletions adapter/otela/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var op otelProvider

func Configure(wg *sync.WaitGroup, done <-chan bool, cfg Config) error {
if cfg.EnableMetricExpose && !op.isConfigure {
fmt.Println(cfg)
serveMetric(cfg, wg, done)
}

Expand Down
5 changes: 2 additions & 3 deletions adapter/otela/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package otela
import (
"context"
"fmt"
"go.opentelemetry.io/otel/propagation"

"github.com/ormushq/ormus/logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv/v1.25.0"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -24,7 +24,6 @@ func (opr *otelProvider) newTraceExporter(ctx context.Context) (sdktrace.SpanExp
default:
panic("unsupported")
}

}

func (opr *otelProvider) newTraceProvider(exp sdktrace.SpanExporter) *sdktrace.TracerProvider {
Expand Down
10 changes: 5 additions & 5 deletions cmd/destination/delivery_workers/webhook_delivery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@ package main

import (
"context"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"log"
"log/slog"
"os"
Expand All @@ -15,17 +10,22 @@ import (
"time"

"github.com/ormushq/ormus/adapter/etcd"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/destination/taskdelivery"
"github.com/ormushq/ormus/destination/taskdelivery/adapters/fakedeliveryhandler"
"github.com/ormushq/ormus/destination/taskmanager/adapter/rabbitmqchanneltaskmanager"
"github.com/ormushq/ormus/destination/taskservice"
"github.com/ormushq/ormus/destination/taskservice/adapter/idempotency/redistaskidempotency"
"github.com/ormushq/ormus/destination/taskservice/adapter/repository/inmemorytaskrepo"
"github.com/ormushq/ormus/destination/worker"
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/pkg/channel"
rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const waitingAfterShutdownInSeconds = 2
Expand Down
10 changes: 5 additions & 5 deletions cmd/destination/faker/fake_processed_event_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/pkg/metricname"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"log"
"log/slog"
"math/big"
"os"
"sync"
"time"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/manager/entity"
"github.com/ormushq/ormus/pkg/metricname"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const timeoutSeconds = 5
Expand Down
6 changes: 3 additions & 3 deletions cmd/destination/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package main

import (
"context"
"github.com/ormushq/ormus/adapter/otela"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"log"
"log/slog"
"os"
"os/signal"
"sync"
"time"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/destination/processedevent/adapter/rabbitmqconsumer"
Expand All @@ -21,6 +19,8 @@ import (
"github.com/ormushq/ormus/manager/entity"
"github.com/ormushq/ormus/pkg/channel"
rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const waitingAfterShutdownInSeconds = 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/example/otel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package main
import (
"context"
"fmt"
"go.opentelemetry.io/otel/metric"
"sync"
"time"

"github.com/ormushq/ormus/adapter/otela"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -45,6 +45,7 @@ func main() {
close(done)
wg.Wait()
}

func startService1(c chan<- context.Context) {
tracer := otela.NewTracer("test-tracer")

Expand Down
9 changes: 5 additions & 4 deletions cmd/example/otel/rabbitmq/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package main
import (
"encoding/json"
"fmt"
"os"
"os/signal"
"sync"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/pkg/channel"
rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq"
"os"
"os/signal"
"sync"
)

type MyMessage struct {
Expand Down Expand Up @@ -61,7 +62,7 @@ func main() {
select {
case msg := <-outputChannel:
func() {
fmt.Printf("recived message : %s\n", msg.Body)
fmt.Printf("received message : %s\n", msg.Body)
var decode MyMessage
err = json.Unmarshal(msg.Body, &decode)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions cmd/example/otel/rabbitmq/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/pkg/channel"
rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq"
"os"
"os/signal"
"sync"
"time"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/pkg/channel"
rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq"
)

type MyMessage struct {
Expand Down
10 changes: 5 additions & 5 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,31 @@ scylladb:
keyspace: ormus
redis:
port: 6379
host: localhost
host: redis
db: 0
password: ""
etcd:
port: 2379
host: localhost
host: etcd
dial_timeout: 5
destination:
debug_mode: true
consumer_topic: "pe.#" # pe stands for processed event. and # substitute for zero or more words.
rabbitmq_consumer_connection:
user: guest
password: guest
host: localhost
host: rabbitmq
port: 5672
vhost:
reconnect_second: 5
rabbitmq_task_manager_connection:
user: guest
password: guest
host: localhost
host: rabbitmq
port: 5672
redis_task_idempotency:
port: 6379
host: localhost
host: redis
db: 0
password: ""
prefix: "destination-task-status::"
Expand Down
13 changes: 6 additions & 7 deletions destination/processedevent/adapter/rabbitmqconsumer/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ package rabbitmqconsumer

import (
"context"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/pkg/metricname"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

//"context"
"fmt"
"log"
"log/slog"
"sync"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/destination/entity/taskentity"
"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/pkg/metricname"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type Consumer struct {
Expand Down Expand Up @@ -48,6 +46,7 @@ func (c *Consumer) Consume(done <-chan bool, wg *sync.WaitGroup) (<-chan event.P
defer close(events)
_, span := tracer.Start(c.ctx, "rabbitmqconsumer@consume")

fmt.Println(c.connectionConfig)
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", c.connectionConfig.User, c.connectionConfig.Password, c.connectionConfig.Host, c.connectionConfig.Port))
failOnError(err, "Failed to connect to RabbitMQ")
defer func(conn *amqp.Connection) {
Expand Down
10 changes: 5 additions & 5 deletions destination/taskcoordinator/adapter/dtcoordinator/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package dtcoordinator

import (
"fmt"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/pkg/metricname"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"log/slog"
"sync"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/destination/taskmanager"
"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/manager/entity"
"github.com/ormushq/ormus/pkg/metricname"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type TaskPublisherMap map[entity.DestinationType]taskmanager.Publisher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package fakedeliveryhandler

import (
"fmt"
"github.com/ormushq/ormus/adapter/otela"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"log/slog"
"time"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/destination/entity/taskentity"
"github.com/ormushq/ormus/destination/taskdelivery/param"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type FakeHandler struct{}
Expand Down
6 changes: 3 additions & 3 deletions destination/taskmanager/adapter/inmemorytaskmanager/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package inmemorytaskmanager
import (
"sync"

"github.com/ormushq/ormus/destination/taskmanager"
"github.com/ormushq/ormus/destination/worker"
)

type Worker struct {
TaskManager *TaskManager
Handler taskmanager.TaskHandler
Handler worker.TaskHandler
}

func NewWorker(tm *TaskManager, h taskmanager.TaskHandler) *Worker {
func NewWorker(tm *TaskManager, h worker.TaskHandler) *Worker {
return &Worker{
TaskManager: tm,
Handler: h,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package rabbitmqchanneltaskmanager
import (
"context"
"fmt"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/pkg/metricname"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"sync"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/destination/entity/taskentity"
"github.com/ormushq/ormus/event"
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/pkg/channel"
"github.com/ormushq/ormus/pkg/metricname"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type Consumer struct {
Expand Down
Loading

0 comments on commit 5abba61

Please sign in to comment.