Skip to content

Commit

Permalink
feat(destination): add configurable TTL for task locking
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali-Farhadnia committed Jul 23, 2024
1 parent 38fa2a6 commit 2cfcd99
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
6 changes: 3 additions & 3 deletions adapter/etcd/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ func (a Adapter) Close() error {
return a.client.Close()
}

func (a Adapter) Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttl))
func (a Adapter) Lock(ctx context.Context, key string, ttlSeconds int64) (unlock func() error, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlSeconds))

tracer := otela.NewTracer("etcd")
ctx, span := tracer.Start(ctx, "etcd@Lock", trace.WithAttributes(
attribute.String("key", key)))
defer span.End()

session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttl)), concurrency.WithContext(ctx))
session, err := concurrency.NewSession(a.client, concurrency.WithTTL(int(ttlSeconds)), concurrency.WithContext(ctx))
if err != nil {
span.AddEvent("error-on-new-session", trace.WithAttributes(
attribute.String("error", err.Error())))
Expand Down
7 changes: 5 additions & 2 deletions cmd/destination/delivery_workers/webhook_delivery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
"go.opentelemetry.io/otel/trace"
)

const waitingAfterShutdownInSeconds = 2
const (
waitingAfterShutdownInSeconds = 2
taskLockTTLInSeconds = 10
)

func main() {
done := make(chan bool)
Expand Down Expand Up @@ -93,7 +96,7 @@ func main() {
}
span.AddEvent("etcd-created")

taskHandler := taskservice.New(taskIdempotency, taskRepo, distributedLocker)
taskHandler := taskservice.New(taskIdempotency, taskRepo, distributedLocker, taskLockTTLInSeconds)
span.AddEvent("task-handler-created")

// Register delivery handlers
Expand Down
21 changes: 11 additions & 10 deletions destination/taskservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ type Idempotency interface {
}

type Locker interface {
Lock(ctx context.Context, key string, ttl int64) (unlock func() error, err error)
Lock(ctx context.Context, key string, ttlSeconds int64) (unlock func() error, err error)
}

type Service struct {
idempotency Idempotency
repo Repository
locker Locker
idempotency Idempotency
repo Repository
locker Locker
taskLockTTLSeconds int64
}

func New(idempotency Idempotency, repo Repository, l Locker) Service {
func New(idempotency Idempotency, repo Repository, l Locker, taskLockTTLSeconds int64) Service {
return Service{
idempotency: idempotency,
repo: repo,
locker: l,
idempotency: idempotency,
repo: repo,
locker: l,
taskLockTTLSeconds: taskLockTTLSeconds,
}
}

Expand All @@ -47,9 +49,8 @@ func (s Service) LockTaskByID(ctx context.Context, taskID string) (unlock func()
defer span.End()

lockKey := "task:" + taskID
const ttl = 10

return s.locker.Lock(ctx, lockKey, ttl)
return s.locker.Lock(ctx, lockKey, s.taskLockTTLSeconds)
}

func (s Service) GetTaskStatusByID(ctx context.Context, taskID string) (taskentity.IntegrationDeliveryStatus, error) {
Expand Down

0 comments on commit 2cfcd99

Please sign in to comment.