Skip to content

Commit a95c50a

Browse files
authored
feat: state store (#153)
Signed-off-by: Zike Yang <[email protected]>
1 parent eeae101 commit a95c50a

29 files changed

+1042
-302
lines changed

.github/workflows/ci.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ jobs:
5757
- run: make build_all
5858
- name: Wait for Pulsar service
5959
run: until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
60-
- run: nohup bin/function-stream server > nohup.out &
6160
- run: make test
6261
- name: Collect Docker Compose logs
6362
if: failure()

benchmark/bench_test.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
)
3636

3737
func BenchmarkStressForBasicFunc(b *testing.B) {
38-
s, err := server.NewServer(server.LoadConfigFromEnv())
38+
s, err := server.NewDefaultServer()
3939
if err != nil {
4040
b.Fatal(err)
4141
}
@@ -110,15 +110,7 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
110110
func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
111111
memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background())
112112

113-
svrConf := &common.Config{
114-
ListenAddr: common.DefaultAddr,
115-
}
116-
117-
fm, err := fs.NewFunctionManager(fs.WithDefaultTubeFactory(memoryQueueFactory))
118-
if err != nil {
119-
b.Fatal(err)
120-
}
121-
s, err := server.NewServer(svrConf, server.WithFunctionManager(fm))
113+
s, err := server.NewServer(server.WithFunctionManager(fs.WithDefaultTubeFactory(memoryQueueFactory)))
122114
if err != nil {
123115
b.Fatal(err)
124116
}

cmd/server/cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var (
3535

3636
func exec(*cobra.Command, []string) {
3737
common.RunProcess(func() (io.Closer, error) {
38-
s, err := server.NewServer(server.LoadConfigFromEnv())
38+
s, err := server.NewServer()
3939
if err != nil {
4040
return nil, err
4141
}

cmd/standalone/cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var (
3535

3636
func exec(*cobra.Command, []string) {
3737
common.RunProcess(func() (io.Closer, error) {
38-
s, err := server.NewServer(server.LoadStandaloneConfigFromEnv())
38+
s, err := server.NewServerWithConfig(server.LoadStandaloneConfigFromEnv())
3939
if err != nil {
4040
return nil, err
4141
}

fs/api/func_ctx.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package api
18+
19+
type FunctionContext interface {
20+
PutState(key string, value []byte) error
21+
GetState(key string) ([]byte, error)
22+
}

fs/api/instance.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
type FunctionInstance interface {
2727
Context() context.Context
28+
FunctionContext() FunctionContext
2829
Definition() *model.Function
2930
Index() int32
3031
Stop()
@@ -34,5 +35,5 @@ type FunctionInstance interface {
3435
}
3536

3637
type FunctionInstanceFactory interface {
37-
NewFunctionInstance(f *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, i int32, logger *slog.Logger) FunctionInstance
38+
NewFunctionInstance(f *model.Function, funcCtx FunctionContext, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, i int32, logger *slog.Logger) FunctionInstance
3839
}

fs/api/statestore.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import "github.com/pkg/errors"
20+
21+
var ErrNotFound = errors.New("key not found")
22+
23+
type StateStore interface {
24+
PutState(key string, value []byte) error
25+
GetState(key string) ([]byte, error)
26+
Close() error
27+
}

fs/contube/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type HttpTubeFactory struct {
5656
endpoints map[string]*endpointHandler
5757
}
5858

59-
func NewHttpTubeFactory(ctx context.Context) TubeFactory {
59+
func NewHttpTubeFactory(ctx context.Context) *HttpTubeFactory {
6060
return &HttpTubeFactory{
6161
ctx: ctx,
6262
endpoints: make(map[string]*endpointHandler),

fs/contube/http_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
func TestHttpTubeHandleRecord(t *testing.T) {
2626
ctx, cancel := context.WithCancel(context.Background())
27-
f := NewHttpTubeFactory(ctx).(*HttpTubeFactory)
27+
f := NewHttpTubeFactory(ctx)
2828

2929
endpoint := "test"
3030
err := f.Handle(ctx, "test", []byte("test"))
@@ -51,7 +51,7 @@ func TestHttpTubeHandleRecord(t *testing.T) {
5151
}
5252

5353
func TestHttpTubeSinkTubeNotImplement(t *testing.T) {
54-
f := NewHttpTubeFactory(context.Background()).(*HttpTubeFactory)
54+
f := NewHttpTubeFactory(context.Background())
5555
_, err := f.NewSinkTube(context.Background(), make(ConfigMap))
5656
assert.ErrorIs(t, err, ErrSinkTubeNotImplemented)
5757
}

fs/func_ctx_impl.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fs
18+
19+
import (
20+
"github.com/functionstream/function-stream/fs/api"
21+
"github.com/pkg/errors"
22+
)
23+
24+
var ErrStateStoreNotLoaded = errors.New("state store not loaded")
25+
26+
type FuncCtxImpl struct {
27+
api.FunctionContext
28+
store api.StateStore
29+
putStateFunc func(key string, value []byte) error
30+
getStateFunc func(key string) ([]byte, error)
31+
}
32+
33+
func NewFuncCtxImpl(store api.StateStore) *FuncCtxImpl {
34+
putStateFunc := func(key string, value []byte) error {
35+
return ErrStateStoreNotLoaded
36+
}
37+
getStateFunc := func(key string) ([]byte, error) {
38+
return nil, ErrStateStoreNotLoaded
39+
}
40+
if store != nil {
41+
putStateFunc = store.PutState
42+
getStateFunc = store.GetState
43+
}
44+
return &FuncCtxImpl{store: store,
45+
putStateFunc: putStateFunc,
46+
getStateFunc: getStateFunc}
47+
}
48+
49+
func (f *FuncCtxImpl) PutState(key string, value []byte) error {
50+
return f.putStateFunc(key, value)
51+
}
52+
53+
func (f *FuncCtxImpl) GetState(key string) ([]byte, error) {
54+
return f.getStateFunc(key)
55+
}

0 commit comments

Comments
 (0)