Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bulksubscribe http #478

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions examples/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ name: Run Subscriber Server
output_match_mode: substring
expected_stdout_lines:
- 'event - PubsubName: messages, Topic: neworder'
- 'event - PubsubName: messages, Topic: newbulkorder'
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
- 'event - PubsubName: messages, Topic: newbulkorder'
background: true
sleep: 15
-->

#### Note: sub/sub.go contains both AddTopicEventHandler (used for subscribe of messages) and AddBulkTopicEventHandler (used for bulksubscribe of messages)

Comment on lines +28 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This folder contains two Go files that use the Go SDK to invoke the Dapr Pub/Sub API.

Remove this and modify the above to include something along the lines of:

This folder contains a publisher and subscriber that demonstrates both standard pubsub and bulk-pubsub

```bash
dapr run --app-id sub \
--app-protocol http \
Expand All @@ -44,10 +48,9 @@ expected_stdout_lines:
background: true
sleep: 15
-->
#### Note: pub/pub.go contains both PublishEvents (used for publish of messages) and PublishEvent (used for bulkPublish of messages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above


```bash
export DAPR_PUBSUB_NAME=messages

dapr run --app-id pub \
--log-level debug \
--resources-path ./config \
Expand Down Expand Up @@ -76,6 +79,6 @@ dapr stop --app-id sub

```shell
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 82427280-1c18-4fab-b901-c7e68d295d31, Data: ping
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: cc13829c-af77-4303-a4d7-55cdc0b0fa7d, Data: multi-pong
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 0147f10a-d6c3-4b16-ad5a-6776956757dd, Data: multi-ping
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: newbulkorder, ID: cc13829c-af77-4303-a4d7-55cdc0b0fa7d, Data: multi-pong
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: newbulkorder, ID: 0147f10a-d6c3-4b16-ad5a-6776956757dd, Data: multi-ping
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
```
8 changes: 4 additions & 4 deletions examples/pubsub/pub/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ package main
import (
"context"
"fmt"
"os"

dapr "github.com/dapr/go-sdk/client"
)

var (
// set the environment as instructions.
pubsubName = os.Getenv("DAPR_PUBSUB_NAME")
topicName = "neworder"
pubsubName = "messages"
topicName = "neworder"
bulkTopicName = "newbulkorder"
)

func main() {
Expand All @@ -44,7 +44,7 @@ func main() {
}

// Publish multiple events
if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil {
if res := client.PublishEvents(ctx, pubsubName, bulkTopicName, publishEventsData); res.Error != nil {
panic(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem entirely right, you're panicking here with an unrelated (potentially) nil error

}

Expand Down
15 changes: 4 additions & 11 deletions examples/pubsub/sub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@ var defaultSubscription = &common.Subscription{
Route: "/orders",
}

var importantSubscription = &common.Subscription{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see the importantSubscription re-implemented as a way to validate the route being correct for a message on the same topic alongside match/priority

var bulkSubscription = &common.Subscription{
PubsubName: "messages",
Topic: "neworder",
Route: "/important",
Match: `event.type == "important"`,
Priority: 1,
Topic: "newbulkorder",
Route: "/bulkorders",
}

func main() {
Expand All @@ -50,7 +48,7 @@ func main() {
log.Fatalf("error adding topic subscription: %v", err)
}

if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
if err := s.AddBulkTopicEventHandler(bulkSubscription, eventHandler, 10, 100); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}

Expand All @@ -63,8 +61,3 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}

func importantEventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("important event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}
2 changes: 2 additions & 0 deletions service/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Service interface {
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
// AddBulkTopicEventHandler appends provided event handler with its topic along with configuring maxMessagesCount, maxAwaitDurationMs for bulk handling and optional metadata to the service.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break this comment up so it doesn't wrap/exceed a reasonable character count (~140ish)

AddBulkTopicEventHandler(sub *Subscription, fn TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
Expand Down
28 changes: 24 additions & 4 deletions service/grpc/topic.go
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,25 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
return s.topicRegistrar.AddSubscription(sub, fn)
}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As my previous review - the validation of arguments passed to these parameters should be implemented as per the implementation spec. I do think that this is something we need to address both sdk-side and in the runtime explicitly as part of best practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what default values you suggest if nil values are given?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error-out if either are X <= 0 this would prevent a negative value, a nil value is not possible for the int type.

if sub == nil {
return errors.New("subscription required")
}

return s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs)
}

// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *emptypb.Empty) (*runtimev1pb.ListTopicSubscriptionsResponse, error) {
subs := make([]*runtimev1pb.TopicSubscription, 0)
for _, v := range s.topicRegistrar {
s := v.Subscription
sub := &runtimev1pb.TopicSubscription{
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
BulkSubscribe: convertBulkSubscribe(s.BulkSubscribe),
}
subs = append(subs, sub)
}
Expand All @@ -74,6 +83,17 @@ func convertRoutes(routes *internal.TopicRoutes) *runtimev1pb.TopicRoutes {
}
}

func convertBulkSubscribe(bulkSubscribe *internal.BulkSubscribeOptions) *runtimev1pb.BulkSubscribeConfig {
if bulkSubscribe == nil {
return nil
}
return &runtimev1pb.BulkSubscribeConfig{
Enabled: bulkSubscribe.Enabled,
MaxMessagesCount: bulkSubscribe.MaxMessagesCount,
MaxAwaitDurationMs: bulkSubscribe.MaxAwaitDurationMs,
}
}

// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed.
// Dapr sends published messages in a CloudEvents v1.0 envelope.
func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) {
Expand Down
Loading
Loading