Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Issue with RabbitMQ Message Consumption #134

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions pkg/channel/adapter/rabbitmq/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type rabbitmqChannel struct {
queue string
numberInstants int
maxRetryPolicy int
bufferSize int
}
type rabbitmqChannelParams struct {
mode channel.Mode
Expand Down Expand Up @@ -93,6 +94,7 @@ func newChannelWithContext(ctx context.Context, done <-chan bool, wg *sync.WaitG
maxRetryPolicy: rabbitmqChannelParams.maxRetryPolicy,
inputChannel: make(chan []byte, rabbitmqChannelParams.bufferSize),
outputChannel: make(chan channel.Message, rabbitmqChannelParams.bufferSize),
bufferSize: rabbitmqChannelParams.bufferSize,
}
rc.startWithContext(ctx)
span.AddEvent("rabbitmq-channel-started")
Expand Down Expand Up @@ -274,6 +276,14 @@ func (rc *rabbitmqChannel) startOutputWitContext(ctx context.Context) {
}
span.AddEvent("channel-opened")

err = ch.Qos(rc.bufferSize, 0, false)
if err != nil {
logger.L().Error(errmsg.ErrFailedToSetQosOnChannel)
rc.callMeNextTime(rc.startOutput)

return
}

defer func(ch *amqp.Channel) {
err = ch.Close()
if err != nil {
Expand Down Expand Up @@ -321,17 +331,12 @@ func (rc *rabbitmqChannel) startOutputWitContext(ctx context.Context) {

return
case msg := <-msgs:
rc.wg.Add(1)
go func() {
defer rc.wg.Done()

rc.outputChannel <- channel.Message{
Body: msg.Body,
Ack: func() error {
return msg.Ack(false)
},
}
}()
rc.outputChannel <- channel.Message{
Body: msg.Body,
Ack: func() error {
return msg.Ack(false)
},
}
}
}
}()
Expand Down
31 changes: 16 additions & 15 deletions pkg/errmsg/message.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package errmsg

const (
ErrJwtEmptyUser = "for generating a JWT token, email is required"
ErrWrongCredentials = "username or password isn't correct"
ErrSomeThingWentWrong = "some thing went wrong"
ErrAuthUserNotFound = "user not found"
ErrProjectNotFound = "project not found"
ErrEmailIsNotValid = "email is not valid"
ErrAuthUserExisting = "a user with this email is already registered"
ErrPasswordIsNotValid = "password is not valid"
ErrorMsgInvalidInput = "invalid input"
ErrBadRequest = "Bad request"
ErrUserNotFound = "user not found"
ErrChannelNotFound = "channel not found: %v"
ErrFailedToOpenChannel = "failed to open rabbitmq channel"
ErrFailedToCloseChannel = "failed to close rabbitmq channel"
ErrAccessDenied = "Access denied"
ErrJwtEmptyUser = "for generating a JWT token, email is required"
ErrWrongCredentials = "username or password isn't correct"
ErrSomeThingWentWrong = "some thing went wrong"
ErrAuthUserNotFound = "user not found"
ErrProjectNotFound = "project not found"
ErrEmailIsNotValid = "email is not valid"
ErrAuthUserExisting = "a user with this email is already registered"
ErrPasswordIsNotValid = "password is not valid"
ErrorMsgInvalidInput = "invalid input"
ErrBadRequest = "Bad request"
ErrUserNotFound = "user not found"
ErrChannelNotFound = "channel not found: %v"
ErrFailedToOpenChannel = "failed to open rabbitmq channel"
ErrFailedToSetQosOnChannel = "failed to set QOS on channel"
ErrFailedToCloseChannel = "failed to close rabbitmq channel"
ErrAccessDenied = "Access denied"
)
Loading