diff --git a/adapter/etcd/adapter.go b/adapter/etcd/adapter.go index 3009ea0..b2a3c01 100644 --- a/adapter/etcd/adapter.go +++ b/adapter/etcd/adapter.go @@ -60,15 +60,15 @@ func (a Adapter) Close() error { return a.client.Close() } -func (a Adapter) Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttl)) +func (a Adapter) Lock(ctx context.Context, key string, ttlSeconds int64) (unlock func() error, err error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlSeconds)) tracer := otela.NewTracer("etcd") ctx, span := tracer.Start(ctx, "etcd@Lock", trace.WithAttributes( attribute.String("key", key))) defer span.End() - session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttl)), concurrency.WithContext(ctx)) + session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttlSeconds)), concurrency.WithContext(ctx)) if err != nil { span.AddEvent("error-on-new-session", trace.WithAttributes( attribute.String("error", err.Error()))) diff --git a/cmd/destination/delivery_workers/webhook_delivery_worker.go b/cmd/destination/delivery_workers/webhook_delivery_worker.go index 083b925..5aa72b0 100644 --- a/cmd/destination/delivery_workers/webhook_delivery_worker.go +++ b/cmd/destination/delivery_workers/webhook_delivery_worker.go @@ -28,7 +28,10 @@ import ( "go.opentelemetry.io/otel/trace" ) -const waitingAfterShutdownInSeconds = 2 +const ( + waitingAfterShutdownInSeconds = 2 + taskLockTTLInSeconds = 10 +) func main() { done := make(chan bool) @@ -93,7 +96,7 @@ func main() { } span.AddEvent("etcd-created") - taskHandler := taskservice.New(taskIdempotency, taskRepo, distributedLocker) + taskHandler := taskservice.New(taskIdempotency, taskRepo, distributedLocker, taskLockTTLInSeconds) span.AddEvent("task-handler-created") // Register delivery handlers diff --git a/destination/taskservice/service.go b/destination/taskservice/service.go index d6a569b..d0a0eb9 100644 --- a/destination/taskservice/service.go +++ b/destination/taskservice/service.go @@ -22,20 +22,22 @@ type Idempotency interface { } type Locker interface { - Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error) + Lock(ctx context.Context, key string, ttlSeconds int64) (unlock func() error, err error) } type Service struct { - idempotency Idempotency - repo Repository - locker Locker + idempotency Idempotency + repo Repository + locker Locker + taskLockTTLSeconds int64 } -func New(idempotency Idempotency, repo Repository, l Locker) Service { +func New(idempotency Idempotency, repo Repository, l Locker, taskLockTTLSeconds int64) Service { return Service{ - idempotency: idempotency, - repo: repo, - locker: l, + idempotency: idempotency, + repo: repo, + locker: l, + taskLockTTLSeconds: taskLockTTLSeconds, } } @@ -47,9 +49,8 @@ func (s Service) LockTaskByID(ctx context.Context, taskID string) (unlock func() defer span.End() lockKey := "task:" + taskID - const ttl = 10 - return s.locker.Lock(ctx, lockKey, ttl) + return s.locker.Lock(ctx, lockKey, s.taskLockTTLSeconds) } func (s Service) GetTaskStatusByID(ctx context.Context, taskID string) (taskentity.IntegrationDeliveryStatus, error) {