Skip to content

Commit

Permalink
fix: config api (#200)
Browse files Browse the repository at this point in the history
* Fix: Finished Configuration API

Signed-off-by: LaurenceLiZhixin <[email protected]>

* Fix: add configuration validation

Signed-off-by: LaurenceLiZhixin <[email protected]>

* Fix: fix validation test

Signed-off-by: LaurenceLiZhixin <[email protected]>

* Fix: remove validation

Signed-off-by: LaurenceLiZhixin <[email protected]>

* add ut

Signed-off-by: LaurenceLiZhixin <[email protected]>

* Fix: comment

Signed-off-by: LaurenceLiZhixin <[email protected]>
  • Loading branch information
LaurenceLiZhixin authored Apr 6, 2022
1 parent f0e0931 commit 6f2ae7d
Show file tree
Hide file tree
Showing 12 changed files with 688 additions and 13 deletions.
4 changes: 2 additions & 2 deletions actor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func GetConfigFromOptions(opts ...Option) *ActorConfig {
conf := &ActorConfig{
SerializerType: constant.DefaultSerializerType,
}
for _, opt := range opts {
opt(conf)
for _, o := range opts {
o(conf)
}
return conf
}
12 changes: 12 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ type Client interface {
// ExecuteStateTransaction provides way to execute multiple operations on a specified store.
ExecuteStateTransaction(ctx context.Context, storeName string, meta map[string]string, ops []*StateOperation) error

// GetConfigurationItem can get target configuration item by storeName and key
GetConfigurationItem(ctx context.Context, storeName, key string, opts ...ConfigurationOpt) (*ConfigurationItem, error)

// GetConfigurationItems can get a list of configuration item by storeName and keys
GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) ([]*ConfigurationItem, error)

// SubscribeConfigurationItems can subscribe the change of configuration items by storeName and keys, and return subscription id
SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) error

// UnsubscribeConfigurationItems can stop the subscription with target store's and id
UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error

// DeleteBulkState deletes content for multiple keys from store.
DeleteBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string) error

Expand Down
75 changes: 71 additions & 4 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ import (
"fmt"
"net"
"os"
"strconv"
"sync"
"testing"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -130,7 +134,8 @@ func TestShutdown(t *testing.T) {
func getTestClient(ctx context.Context) (client Client, closer func()) {
s := grpc.NewServer()
pb.RegisterDaprServer(s, &testDaprServer{
state: make(map[string][]byte),
state: make(map[string][]byte),
configurationSubscriptionID: map[string]chan struct{}{},
})

l := bufconn.Listen(testBufSize)
Expand Down Expand Up @@ -161,7 +166,8 @@ func getTestClient(ctx context.Context) (client Client, closer func()) {
func getTestClientWithSocket(ctx context.Context) (client Client, closer func()) {
s := grpc.NewServer()
pb.RegisterDaprServer(s, &testDaprServer{
state: make(map[string][]byte),
state: make(map[string][]byte),
configurationSubscriptionID: map[string]chan struct{}{},
})

var lc net.ListenConfig
Expand Down Expand Up @@ -191,7 +197,9 @@ func getTestClientWithSocket(ctx context.Context) (client Client, closer func())

type testDaprServer struct {
pb.UnimplementedDaprServer
state map[string][]byte
state map[string][]byte
configurationSubscriptionIDMapLoc sync.Mutex
configurationSubscriptionID map[string]chan struct{}
}

func (s *testDaprServer) InvokeService(ctx context.Context, req *pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
Expand Down Expand Up @@ -348,3 +356,62 @@ func (s *testDaprServer) UnregisterActorTimer(context.Context, *pb.UnregisterAct
func (s *testDaprServer) Shutdown(ctx context.Context, req *empty.Empty) (*empty.Empty, error) {
return &empty.Empty{}, nil
}

func (s *testDaprServer) GetConfigurationAlpha1(ctx context.Context, in *pb.GetConfigurationRequest) (*pb.GetConfigurationResponse, error) {
if in.GetStoreName() == "" {
return &pb.GetConfigurationResponse{}, errors.New("store name notfound")
}
items := make([]*commonv1pb.ConfigurationItem, 0)
for _, v := range in.GetKeys() {
items = append(items, &commonv1pb.ConfigurationItem{
Key: v,
Value: v + valueSuffix,
})
}
return &pb.GetConfigurationResponse{
Items: items,
}, nil
}

func (s *testDaprServer) SubscribeConfigurationAlpha1(in *pb.SubscribeConfigurationRequest, server pb.Dapr_SubscribeConfigurationAlpha1Server) error {
stopCh := make(chan struct{})
id, _ := uuid.NewUUID()
s.configurationSubscriptionIDMapLoc.Lock()
s.configurationSubscriptionID[id.String()] = stopCh
s.configurationSubscriptionIDMapLoc.Unlock()
for i := 0; i < 5; i++ {
select {
case <-stopCh:
return nil
default:
}
items := make([]*commonv1pb.ConfigurationItem, 0)
for _, v := range in.GetKeys() {
items = append(items, &commonv1pb.ConfigurationItem{
Key: v,
Value: v + "_" + strconv.Itoa(i),
},
)
}
if err := server.Send(&pb.SubscribeConfigurationResponse{
Id: id.String(),
Items: items,
}); err != nil {
return err
}
time.Sleep(time.Second)
}
return nil
}

func (s *testDaprServer) UnsubscribeConfigurationAlpha1(ctx context.Context, in *pb.UnsubscribeConfigurationRequest) (*pb.UnsubscribeConfigurationResponse, error) {
s.configurationSubscriptionIDMapLoc.Lock()
defer s.configurationSubscriptionIDMapLoc.Unlock()
ch, ok := s.configurationSubscriptionID[in.Id]
if !ok {
return &pb.UnsubscribeConfigurationResponse{Ok: true}, nil
}
close(ch)
delete(s.configurationSubscriptionID, in.Id)
return &pb.UnsubscribeConfigurationResponse{Ok: true}, nil
}
126 changes: 126 additions & 0 deletions client/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package client

import (
"context"
"fmt"
"io"

"github.com/pkg/errors"

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

type ConfigurationItem struct {
Key string
Value string
Version string
Metadata map[string]string
}

type ConfigurationOpt func(map[string]string)

func WithConfigurationMetadata(key, value string) ConfigurationOpt {
return func(m map[string]string) {
m[key] = value
}
}

func (c *GRPCClient) GetConfigurationItem(ctx context.Context, storeName, key string, opts ...ConfigurationOpt) (*ConfigurationItem, error) {
items, err := c.GetConfigurationItems(ctx, storeName, []string{key}, opts...)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, nil
}
return items[0], nil
}

func (c *GRPCClient) GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) ([]*ConfigurationItem, error) {
metadata := make(map[string]string)
for _, opt := range opts {
opt(metadata)
}
rsp, err := c.protoClient.GetConfigurationAlpha1(ctx, &pb.GetConfigurationRequest{
StoreName: storeName,
Keys: keys,
Metadata: metadata,
})
if err != nil {
return nil, err
}

configItems := make([]*ConfigurationItem, 0)
for _, v := range rsp.Items {
configItems = append(configItems, &ConfigurationItem{
Key: v.Key,
Value: v.Value,
Version: v.Version,
Metadata: v.Metadata,
})
}
return configItems, nil
}

type ConfigurationHandleFunction func(string, []*ConfigurationItem)

func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) error {
metadata := make(map[string]string)
for _, opt := range opts {
opt(metadata)
}

client, err := c.protoClient.SubscribeConfigurationAlpha1(ctx, &pb.SubscribeConfigurationRequest{
StoreName: storeName,
Keys: keys,
Metadata: metadata,
})
if err != nil {
return errors.Errorf("subscribe configuration failed with error = %s", err)
}

var subcribeID string
stopCh := make(chan struct{})
go func() {
for {
rsp, err := client.Recv()
if err == io.EOF || rsp == nil {
// receive goroutine would close if unsubscribe is called
fmt.Println("dapr configuration subscribe finished.")
close(stopCh)
break
}
subcribeID = rsp.Id
configurationItems := make([]*ConfigurationItem, 0)
for _, v := range rsp.Items {
configurationItems = append(configurationItems, &ConfigurationItem{
Key: v.Key,
Value: v.Value,
Version: v.Version,
Metadata: v.Metadata,
})
}
handler(rsp.Id, configurationItems)
}
}()
select {
case <-ctx.Done():
return c.UnsubscribeConfigurationItems(context.Background(), storeName, subcribeID)
case <-stopCh:
return nil
}
}

func (c *GRPCClient) UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error {
alpha1, err := c.protoClient.UnsubscribeConfigurationAlpha1(ctx, &pb.UnsubscribeConfigurationRequest{
StoreName: storeName,
Id: id,
})
if err != nil {
return err
}
if !alpha1.Ok {
return errors.Errorf("unsubscribe error message = %s", alpha1.GetMessage())
}
return nil
}
98 changes: 98 additions & 0 deletions client/configuration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package client

import (
"context"
"strconv"
"testing"
"time"

"go.uber.org/atomic"

"github.com/stretchr/testify/assert"
)

const (
valueSuffix = "_value"
)

func TestGetConfigurationItem(t *testing.T) {
ctx := context.Background()

t.Run("get configuration item", func(t *testing.T) {
resp, err := testClient.GetConfigurationItem(ctx, "example-config", "mykey")
assert.Nil(t, err)
assert.Equal(t, "mykey"+valueSuffix, resp.Value)
})

t.Run("get configuration item with invalid storeName", func(t *testing.T) {
_, err := testClient.GetConfigurationItem(ctx, "", "mykey")
assert.NotNil(t, err)
})
}

func TestGetConfigurationItems(t *testing.T) {
ctx := context.Background()

t.Run("Test get configuration items", func(t *testing.T) {
resp, err := testClient.GetConfigurationItems(ctx, "example-config", []string{"mykey1", "mykey2", "mykey3"})
assert.Nil(t, err)
for i, v := range resp {
assert.Equal(t, "mykey"+strconv.Itoa(i+1)+valueSuffix, v.Value)
}
})
}

func TestSubscribeConfigurationItems(t *testing.T) {
ctx := context.Background()

counter := 0
totalCounter := 0
t.Run("Test subscribe configuration items", func(t *testing.T) {
err := testClient.SubscribeConfigurationItems(ctx, "example-config",
[]string{"mykey", "mykey2", "mykey3"}, func(s string, items []*ConfigurationItem) {
counter++
for _, v := range items {
assert.Equal(t, v.Value, v.Key+"_"+strconv.Itoa(counter-1))
totalCounter++
}
})
assert.Nil(t, err)
})
time.Sleep(time.Second*5 + time.Millisecond*500)
assert.Equal(t, 5, counter)
assert.Equal(t, 15, totalCounter)
}

func TestUnSubscribeConfigurationItems(t *testing.T) {
ctx := context.Background()

counter := atomic.Int32{}
totalCounter := atomic.Int32{}
t.Run("Test unsubscribe configuration items", func(t *testing.T) {
subscribeID := ""
subscribeIDChan := make(chan string)
go func() {
err := testClient.SubscribeConfigurationItems(ctx, "example-config",
[]string{"mykey", "mykey2", "mykey3"}, func(id string, items []*ConfigurationItem) {
counter.Inc()
for _, v := range items {
assert.Equal(t, v.Value, v.Key+"_"+strconv.Itoa(int(counter.Load()-1)))
totalCounter.Inc()
}
select {
case subscribeIDChan <- id:
default:
}
})
assert.Nil(t, err)
}()
subscribeID = <-subscribeIDChan
time.Sleep(time.Second * 2)
time.Sleep(time.Millisecond * 500)
err := testClient.UnsubscribeConfigurationItems(ctx, "example-config", subscribeID)
assert.Nil(t, err)
})
time.Sleep(time.Second * 5)
assert.Equal(t, 3, int(counter.Load()))
assert.Equal(t, 9, int(totalCounter.Load()))
}
4 changes: 2 additions & 2 deletions client/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (c *GRPCClient) PublishEvent(ctx context.Context, pubsubName, topicName str
PubsubName: pubsubName,
Topic: topicName,
}
for _, opt := range opts {
opt(request)
for _, o := range opts {
o(request)
}

if data != nil {
Expand Down
4 changes: 2 additions & 2 deletions examples/actor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ dapr stop --app-id actor-serving
== APP == get user = {Name: Age:1}
== APP == get user = {Name: Age:2}
✅ Exited App successfully
```

- server side
```

```
== APP == call get user req = &{abc 123}
== APP == get req = laurence
== APP == get post request = laurence
Expand Down
Loading

0 comments on commit 6f2ae7d

Please sign in to comment.