Skip to content

Commit 575aafc

Browse files
authored
refactor: use tube term instead of queue (#134)
Signed-off-by: Zike Yang <[email protected]>
1 parent 14ca732 commit 575aafc

File tree

11 files changed

+69
-71
lines changed

11 files changed

+69
-71
lines changed

benchmark/bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
105105

106106
svrConf := &fs.Config{
107107
ListenAddr: common.DefaultAddr,
108-
QueueBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) {
108+
TubeBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) {
109109
return memoryQueueFactory, nil
110110
},
111111
}

common/constants.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package common
1818

1919
const (
20-
PulsarQueueType = "pulsar"
20+
PulsarTubeType = "pulsar"
2121

2222
DefaultAddr = "localhost:7300"
2323
DefaultPulsarURL = "pulsar://localhost:6650"
24-
DefaultQueueType = PulsarQueueType
24+
DefaultTubeType = PulsarTubeType
2525
)

fs/config.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import (
2121
"github.com/functionstream/functionstream/fs/contube"
2222
)
2323

24-
type QueueBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error)
24+
type TubeBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error)
2525

26+
// Config is a struct that holds the configuration for a function stream.
2627
type Config struct {
27-
ListenAddr string
28-
PulsarURL string
29-
QueueBuilder QueueBuilder
28+
ListenAddr string // ListenAddr is the address that the function stream REST service will listen on.
29+
PulsarURL string // PulsarURL is the URL of the Pulsar service. It's used for the pulsar_tube
30+
TubeBuilder TubeBuilder // TubeBuilder is a function that will be used to build the tube.
3031
}

fs/instance.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ import (
3333
)
3434

3535
type FunctionInstance struct {
36-
ctx context.Context
37-
cancelFunc context.CancelFunc
38-
definition *model.Function
39-
queueFactory contube.TubeFactory
40-
readyCh chan error
41-
index int32
36+
ctx context.Context
37+
cancelFunc context.CancelFunc
38+
definition *model.Function
39+
tubeFactory contube.TubeFactory
40+
readyCh chan error
41+
index int32
4242
}
4343

4444
func NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFactory, index int32) *FunctionInstance {
@@ -48,12 +48,12 @@ func NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFa
4848
"function-index": index,
4949
})
5050
return &FunctionInstance{
51-
ctx: ctx,
52-
cancelFunc: cancelFunc,
53-
definition: definition,
54-
queueFactory: queueFactory,
55-
readyCh: make(chan error),
56-
index: index,
51+
ctx: ctx,
52+
cancelFunc: cancelFunc,
53+
definition: definition,
54+
tubeFactory: queueFactory,
55+
readyCh: make(chan error),
56+
index: index,
5757
}
5858
}
5959

@@ -113,12 +113,12 @@ func (instance *FunctionInstance) Run() {
113113
return
114114
}
115115

116-
sourceChan, err := instance.queueFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap())
116+
sourceChan, err := instance.tubeFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap())
117117
if err != nil {
118118
instance.readyCh <- errors.Wrap(err, "Error creating source event queue")
119119
return
120120
}
121-
sinkChan, err := instance.queueFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap())
121+
sinkChan, err := instance.tubeFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap())
122122
if err != nil {
123123
instance.readyCh <- errors.Wrap(err, "Error creating sink event queue")
124124
return

fs/manager.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,19 @@ import (
2828
)
2929

3030
type FunctionManager struct {
31-
functions map[string][]*FunctionInstance
32-
functionsLock sync.Mutex
33-
eventQueueFactory contube.TubeFactory
31+
functions map[string][]*FunctionInstance
32+
functionsLock sync.Mutex
33+
tubeFactory contube.TubeFactory
3434
}
3535

3636
func NewFunctionManager(config *Config) (*FunctionManager, error) {
37-
eventQueueFactory, err := config.QueueBuilder(context.Background(), config)
37+
tubeFactory, err := config.TubeBuilder(context.Background(), config)
3838
if err != nil {
3939
return nil, err
4040
}
4141
return &FunctionManager{
42-
functions: make(map[string][]*FunctionInstance),
43-
eventQueueFactory: eventQueueFactory,
42+
functions: make(map[string][]*FunctionInstance),
43+
tubeFactory: tubeFactory,
4444
}, nil
4545
}
4646

@@ -52,7 +52,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error {
5252
}
5353
fm.functions[f.Name] = make([]*FunctionInstance, f.Replicas)
5454
for i := int32(0); i < f.Replicas; i++ {
55-
instance := NewFunctionInstance(f, fm.eventQueueFactory, i)
55+
instance := NewFunctionInstance(f, fm.tubeFactory, i)
5656
fm.functions[f.Name][i] = instance
5757
go instance.Run()
5858
if err := <-instance.WaitForReady(); err != nil {
@@ -95,7 +95,7 @@ func (fm *FunctionManager) ListFunctions() (result []string) {
9595
func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error {
9696
ctx, cancel := context.WithCancel(context.Background())
9797
defer cancel()
98-
c, err := fm.eventQueueFactory.NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap())
98+
c, err := fm.tubeFactory.NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap())
9999
if err != nil {
100100
return err
101101
}
@@ -106,7 +106,7 @@ func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error
106106
func (fm *FunctionManager) ConsumeEvent(name string) (contube.Record, error) {
107107
ctx, cancel := context.WithCancel(context.Background())
108108
defer cancel()
109-
c, err := fm.eventQueueFactory.NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap())
109+
c, err := fm.tubeFactory.NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap())
110110
if err != nil {
111111
return nil, err
112112
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e
88
github.com/gorilla/mux v1.8.1
99
github.com/pkg/errors v0.9.1
10+
github.com/sirupsen/logrus v1.9.3
1011
github.com/spf13/cobra v1.8.0
1112
github.com/tetratelabs/wazero v1.6.0
1213
golang.org/x/time v0.5.0
@@ -48,7 +49,6 @@ require (
4849
github.com/prometheus/common v0.46.0 // indirect
4950
github.com/prometheus/procfs v0.12.0 // indirect
5051
github.com/rogpeppe/go-internal v1.11.0 // indirect
51-
github.com/sirupsen/logrus v1.9.3 // indirect
5252
github.com/spaolacci/murmur3 v1.1.0 // indirect
5353
github.com/spf13/pflag v1.0.5 // indirect
5454
go.uber.org/atomic v1.11.0 // indirect

perf/perf.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,32 +38,32 @@ type Config struct {
3838
PulsarURL string
3939
RequestRate float64
4040
Func *restclient.Function
41-
QueueBuilder fs.QueueBuilder
41+
QueueBuilder fs.TubeBuilder
4242
}
4343

4444
type Perf interface {
4545
Run(context.Context)
4646
}
4747

4848
type perf struct {
49-
config *Config
50-
input chan<- contube.Record
51-
output <-chan contube.Record
52-
queueBuilder fs.QueueBuilder
49+
config *Config
50+
input chan<- contube.Record
51+
output <-chan contube.Record
52+
tubeBuilder fs.TubeBuilder
5353
}
5454

5555
func New(config *Config) Perf {
5656
p := &perf{
5757
config: config,
5858
}
5959
if config.QueueBuilder == nil {
60-
p.queueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
60+
p.tubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
6161
return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{
6262
PulsarURL: config.PulsarURL,
6363
}).ToConfigMap())
6464
}
6565
} else {
66-
p.queueBuilder = config.QueueBuilder
66+
p.tubeBuilder = config.QueueBuilder
6767
}
6868
return p
6969
}
@@ -96,7 +96,7 @@ func (p *perf) Run(ctx context.Context) {
9696
PulsarURL: p.config.PulsarURL,
9797
}
9898

99-
queueFactory, err := p.queueBuilder(ctx, config)
99+
queueFactory, err := p.tubeBuilder(ctx, config)
100100
if err != nil {
101101
slog.Error(
102102
"Failed to create Record Queue Factory",

restclient/README.md

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
44

55
## Overview
6-
This API client was generated by the [OpenAPI Generator](https://openapi-generator.tech) project. By using the [OpenAPI-spec](https://www.openapis.org/) from a remote server, you can easily generate an API client.
6+
7+
This API client was generated by the [OpenAPI Generator](https://openapi-generator.tech) project. By using
8+
the [OpenAPI-spec](https://www.openapis.org/) from a remote server, you can easily generate an API client.
79

810
- API version: 0.1.0
911
- Package version: 1.0.0
@@ -44,7 +46,8 @@ ctx := context.WithValue(context.Background(), restclient.ContextServerIndex, 1)
4446

4547
### Templated Server URL
4648

47-
Templated server URL is formatted using default variables from configuration or from context value `restclient.ContextServerVariables` of type `map[string]string`.
49+
Templated server URL is formatted using default variables from configuration or from context
50+
value `restclient.ContextServerVariables` of type `map[string]string`.
4851

4952
```go
5053
ctx := context.WithValue(context.Background(), restclient.ContextServerVariables, map[string]string{
@@ -58,7 +61,8 @@ Note, enum values are always validated and all unused variables are silently ign
5861

5962
Each operation can use different server URL defined using `OperationServers` map in the `Configuration`.
6063
An operation is uniquely identified by `"{classname}Service.{nickname}"` string.
61-
Similar rules for overriding default operation server index and variables applies by using `restclient.ContextOperationServerIndices` and `restclient.ContextOperationServerVariables` context maps.
64+
Similar rules for overriding default operation server index and variables applies by
65+
using `restclient.ContextOperationServerIndices` and `restclient.ContextOperationServerVariables` context maps.
6266

6367
```go
6468
ctx := context.WithValue(context.Background(), restclient.ContextOperationServerIndices, map[string]int{
@@ -75,25 +79,22 @@ ctx = context.WithValue(context.Background(), restclient.ContextOperationServerV
7579

7680
All URIs are relative to *http://localhost:7300*
7781

78-
Class | Method | HTTP request | Description
79-
------------ | ------------- | ------------- | -------------
80-
*DefaultAPI* | [**ApiV1ConsumeQueueNameGet**](docs/DefaultAPI.md#apiv1consumequeuenameget) | **Get** /api/v1/consume/{queue_name} | Consumes an event from a queue
81-
*DefaultAPI* | [**ApiV1FunctionFunctionNameDelete**](docs/DefaultAPI.md#apiv1functionfunctionnamedelete) | **Delete** /api/v1/function/{function_name} | Deletes a function
82-
*DefaultAPI* | [**ApiV1FunctionFunctionNamePost**](docs/DefaultAPI.md#apiv1functionfunctionnamepost) | **Post** /api/v1/function/{function_name} | Starts a function
83-
*DefaultAPI* | [**ApiV1FunctionsGet**](docs/DefaultAPI.md#apiv1functionsget) | **Get** /api/v1/functions | Returns a list of functions
84-
*DefaultAPI* | [**ApiV1ProduceQueueNamePut**](docs/DefaultAPI.md#apiv1producequeuenameput) | **Put** /api/v1/produce/{queue_name} | Produces an event to a queue
85-
82+
Class | Method | HTTP request | Description
83+
--------------|-------------------------------------------------------------------------------------------|---------------------------------------------|--------------------------------
84+
*DefaultAPI* | [**ApiV1ConsumeQueueNameGet**](docs/DefaultAPI.md#apiv1consumequeuenameget) | **Get** /api/v1/consume/{queue_name} | Consumes an event from a queue
85+
*DefaultAPI* | [**ApiV1FunctionFunctionNameDelete**](docs/DefaultAPI.md#apiv1functionfunctionnamedelete) | **Delete** /api/v1/function/{function_name} | Deletes a function
86+
*DefaultAPI* | [**ApiV1FunctionFunctionNamePost**](docs/DefaultAPI.md#apiv1functionfunctionnamepost) | **Post** /api/v1/function/{function_name} | Starts a function
87+
*DefaultAPI* | [**ApiV1FunctionsGet**](docs/DefaultAPI.md#apiv1functionsget) | **Get** /api/v1/functions | Returns a list of functions
88+
*DefaultAPI* | [**ApiV1ProduceQueueNamePut**](docs/DefaultAPI.md#apiv1producequeuenameput) | **Put** /api/v1/produce/{queue_name} | Produces an event to a queue
8689

8790
## Documentation For Models
8891

89-
- [Function](docs/Function.md)
90-
92+
- [Function](docs/Function.md)
9193

9294
## Documentation For Authorization
9395

9496
Endpoints do not require authorization.
9597

96-
9798
## Documentation for Utility Methods
9899

99100
Due to the fact that model structure members are all pointers, this package contains

restclient/docs/Function.md

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
## Properties
44

5-
Name | Type | Description | Notes
6-
------------ | ------------- | ------------- | -------------
7-
**Name** | Pointer to **string** | | [optional]
8-
**Archive** | **string** | |
9-
**Inputs** | **[]string** | |
10-
**Output** | **string** | |
11-
**Replicas** | Pointer to **int32** | | [optional]
12-
**Config** | Pointer to **map[string]string** | | [optional]
5+
Name | Type | Description | Notes
6+
--------------|----------------------------------|-------------|------------
7+
**Name** | Pointer to **string** | | [optional]
8+
**Archive** | **string** | |
9+
**Inputs** | **[]string** | |
10+
**Output** | **string** | |
11+
**Replicas** | Pointer to **int32** | | [optional]
12+
**Config** | Pointer to **map[string]string** | | [optional]
1313

1414
## Methods
1515

@@ -74,7 +74,6 @@ and a boolean to check if the value has been set.
7474

7575
SetArchive sets Archive field to given value.
7676

77-
7877
### GetInputs
7978

8079
`func (o *Function) GetInputs() []string`
@@ -94,7 +93,6 @@ and a boolean to check if the value has been set.
9493

9594
SetInputs sets Inputs field to given value.
9695

97-
9896
### GetOutput
9997

10098
`func (o *Function) GetOutput() string`
@@ -114,7 +112,6 @@ and a boolean to check if the value has been set.
114112

115113
SetOutput sets Output field to given value.
116114

117-
118115
### GetReplicas
119116

120117
`func (o *Function) GetReplicas() int32`
@@ -165,7 +162,6 @@ SetConfig sets Config field to given value.
165162

166163
HasConfig returns a boolean if a field has been set.
167164

168-
169165
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
170166

171167

server/config_loader.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ func LoadConfigFromEnv() *fs.Config {
3636
ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr),
3737
PulsarURL: getEnvWithDefault("PULSAR_URL", common.DefaultPulsarURL),
3838
}
39-
queueType := getEnvWithDefault("QUEUE_TYPE", common.DefaultQueueType)
40-
switch queueType {
41-
case common.PulsarQueueType:
42-
loadedConfig.QueueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
39+
tubeType := getEnvWithDefault("TUBE_TYPE", common.DefaultTubeType)
40+
switch tubeType {
41+
case common.PulsarTubeType:
42+
loadedConfig.TubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
4343
return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{
4444
PulsarURL: c.PulsarURL,
4545
}).ToConfigMap())
@@ -54,7 +54,7 @@ func LoadStandaloneConfigFromEnv() *fs.Config {
5454
loadedConfig = &fs.Config{
5555
ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr),
5656
}
57-
loadedConfig.QueueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
57+
loadedConfig.TubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) {
5858
return contube.NewMemoryQueueFactory(ctx), nil
5959
}
6060
})

0 commit comments

Comments
 (0)