From 6385da580c157da2da6c31236a3225c7f63e535e Mon Sep 17 00:00:00 2001 From: Mohsen Haghgoo Date: Tue, 8 Oct 2024 08:31:30 +0330 Subject: [PATCH] fix: some update for channel package. --- pkg/channel/adapter/rabbitmq/channel.go | 27 ++++++++++++--------- pkg/errmsg/message.go | 31 +++++++++++++------------ 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/pkg/channel/adapter/rabbitmq/channel.go b/pkg/channel/adapter/rabbitmq/channel.go index b890ee9..991fc4c 100644 --- a/pkg/channel/adapter/rabbitmq/channel.go +++ b/pkg/channel/adapter/rabbitmq/channel.go @@ -27,6 +27,7 @@ type rabbitmqChannel struct { queue string numberInstants int maxRetryPolicy int + bufferSize int } type rabbitmqChannelParams struct { mode channel.Mode @@ -93,6 +94,7 @@ func newChannelWithContext(ctx context.Context, done <-chan bool, wg *sync.WaitG maxRetryPolicy: rabbitmqChannelParams.maxRetryPolicy, inputChannel: make(chan []byte, rabbitmqChannelParams.bufferSize), outputChannel: make(chan channel.Message, rabbitmqChannelParams.bufferSize), + bufferSize: rabbitmqChannelParams.bufferSize, } rc.startWithContext(ctx) span.AddEvent("rabbitmq-channel-started") @@ -274,6 +276,14 @@ func (rc *rabbitmqChannel) startOutputWitContext(ctx context.Context) { } span.AddEvent("channel-opened") + err = ch.Qos(rc.bufferSize, 0, false) + if err != nil { + logger.L().Error(errmsg.ErrFailedToSetQosOnChannel) + rc.callMeNextTime(rc.startOutput) + + return + } + defer func(ch *amqp.Channel) { err = ch.Close() if err != nil { @@ -321,17 +331,12 @@ func (rc *rabbitmqChannel) startOutputWitContext(ctx context.Context) { return case msg := <-msgs: - rc.wg.Add(1) - go func() { - defer rc.wg.Done() - - rc.outputChannel <- channel.Message{ - Body: msg.Body, - Ack: func() error { - return msg.Ack(false) - }, - } - }() + rc.outputChannel <- channel.Message{ + Body: msg.Body, + Ack: func() error { + return msg.Ack(false) + }, + } } } }() diff --git a/pkg/errmsg/message.go b/pkg/errmsg/message.go index a1e89e6..80da452 100644 --- a/pkg/errmsg/message.go +++ b/pkg/errmsg/message.go @@ -1,19 +1,20 @@ package errmsg const ( - ErrJwtEmptyUser = "for generating a JWT token, email is required" - ErrWrongCredentials = "username or password isn't correct" - ErrSomeThingWentWrong = "some thing went wrong" - ErrAuthUserNotFound = "user not found" - ErrProjectNotFound = "project not found" - ErrEmailIsNotValid = "email is not valid" - ErrAuthUserExisting = "a user with this email is already registered" - ErrPasswordIsNotValid = "password is not valid" - ErrorMsgInvalidInput = "invalid input" - ErrBadRequest = "Bad request" - ErrUserNotFound = "user not found" - ErrChannelNotFound = "channel not found: %v" - ErrFailedToOpenChannel = "failed to open rabbitmq channel" - ErrFailedToCloseChannel = "failed to close rabbitmq channel" - ErrAccessDenied = "Access denied" + ErrJwtEmptyUser = "for generating a JWT token, email is required" + ErrWrongCredentials = "username or password isn't correct" + ErrSomeThingWentWrong = "some thing went wrong" + ErrAuthUserNotFound = "user not found" + ErrProjectNotFound = "project not found" + ErrEmailIsNotValid = "email is not valid" + ErrAuthUserExisting = "a user with this email is already registered" + ErrPasswordIsNotValid = "password is not valid" + ErrorMsgInvalidInput = "invalid input" + ErrBadRequest = "Bad request" + ErrUserNotFound = "user not found" + ErrChannelNotFound = "channel not found: %v" + ErrFailedToOpenChannel = "failed to open rabbitmq channel" + ErrFailedToSetQosOnChannel = "failed to set QOS on channel" + ErrFailedToCloseChannel = "failed to close rabbitmq channel" + ErrAccessDenied = "Access denied" )