Skip to content

Commit 65677bb

Browse files
committed
Roughing in some transfer tests.
1 parent 6534977 commit 65677bb

File tree

13 files changed

+533
-27
lines changed

13 files changed

+533
-27
lines changed

databases/databases.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,11 @@ const (
132132
// registers a database creation function under the given database name
133133
// to allow for e.g. test database implementations
134134
func RegisterDatabase(dbName string, createDb func() (Database, error)) error {
135-
if firstTime {
135+
if firstTime_ {
136136
// register types that appear in Frictionless Descriptors (for manifests)
137137
gob.Register(credit.CreditMetadata{})
138138

139-
firstTime = false
139+
firstTime_ = false
140140
}
141141

142142
// make one to check the configuration
@@ -155,6 +155,15 @@ func RegisterDatabase(dbName string, createDb func() (Database, error)) error {
155155
}
156156
}
157157

158+
// returns a list of names of registered databases
159+
func RegisteredDatabases() []string {
160+
dbs := make([]string, 0)
161+
for name, _ := range createDatabaseFuncs_ {
162+
dbs = append(dbs, name)
163+
}
164+
return dbs
165+
}
166+
158167
// returns true if a database has been registered with the given name, false if not
159168
func HaveDatabase(dbName string) bool {
160169
_, found := createDatabaseFuncs_[dbName]
@@ -222,7 +231,7 @@ func Load(states DatabaseSaveStates) error {
222231
//-----------
223232

224233
// set to false after the first database is registered
225-
var firstTime = true
234+
var firstTime_ = true
226235

227236
// we maintain a table of database instances, identified by their names
228237
var allDatabases_ = make(map[string]Database)

transfers/dispatcher.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,16 @@ type dispatcherChannels struct {
5454
Stop chan struct{} // used by client to stop task management
5555
}
5656

57+
func (channels *dispatcherChannels) close() {
58+
close(channels.RequestTransfer)
59+
close(channels.ReturnTransferId)
60+
close(channels.CancelTransfer)
61+
close(channels.RequestStatus)
62+
close(channels.ReturnStatus)
63+
close(channels.Error)
64+
close(channels.Stop)
65+
}
66+
5767
func (d *dispatcherState) Start() error {
5868
d.Channels = dispatcherChannels{
5969
RequestTransfer: make(chan Specification, 32),
@@ -71,7 +81,9 @@ func (d *dispatcherState) Start() error {
7181

7282
func (d *dispatcherState) Stop() error {
7383
d.Channels.Stop <- struct{}{}
74-
return <-d.Channels.Error
84+
err := <-d.Channels.Error
85+
d.Channels.close()
86+
return err
7587
}
7688

7789
func (d *dispatcherState) CreateTransfer(spec Specification) (uuid.UUID, error) {
@@ -99,9 +111,9 @@ func (d *dispatcherState) CancelTransfer(id uuid.UUID) error {
99111
return <-d.Channels.Error
100112
}
101113

102-
//----------------------------------------------------
114+
//---------------------------------------------------------
103115
// everything past here runs in the dispatcher's goroutine
104-
//----------------------------------------------------
116+
//---------------------------------------------------------
105117

106118
// the goroutine itself
107119
func (d *dispatcherState) process() {
@@ -125,6 +137,7 @@ func (d *dispatcherState) process() {
125137
transferId, numFiles, err := d.create(spec)
126138
if err != nil {
127139
returnError <- err
140+
break
128141
}
129142
returnTransferId <- transferId
130143
slog.Info(fmt.Sprintf("Created new transfer %s (%d file(s) requested)", transferId.String(),
@@ -144,6 +157,7 @@ func (d *dispatcherState) process() {
144157
returnStatus <- status
145158
case <-stopRequested:
146159
running = false
160+
returnError <- nil
147161
}
148162
}
149163
}

transfers/dispatcher_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) 2023 The KBase Project and its Contributors
2+
// Copyright (c) 2023 Cohere Consulting, LLC
3+
//
4+
// Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
// this software and associated documentation files (the "Software"), to deal in
6+
// the Software without restriction, including without limitation the rights to
7+
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
8+
// of the Software, and to permit persons to whom the Software is furnished to do
9+
// so, subject to the following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included in all
12+
// copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
// SOFTWARE.
21+
22+
package transfers
23+
24+
import (
25+
"testing"
26+
27+
"github.com/stretchr/testify/assert"
28+
)
29+
30+
func TestStartAndStopDispatcher(t *testing.T) {
31+
assert := assert.New(t)
32+
err := dispatcher.Start()
33+
assert.Nil(err)
34+
err = dispatcher.Stop()
35+
assert.Nil(err)
36+
}

transfers/manifestor.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ type manifestorChannels struct {
5757
Stop chan struct{}
5858
}
5959

60+
func (channels *manifestorChannels) close() {
61+
close(channels.RequestGeneration)
62+
close(channels.RequestCancellation)
63+
close(channels.Error)
64+
close(channels.Stop)
65+
}
66+
6067
// starts the mover
6168
func (m *manifestorState) Start() error {
6269
m.Channels = manifestorChannels{
@@ -73,7 +80,9 @@ func (m *manifestorState) Start() error {
7380
// stops the manifestor goroutine
7481
func (m *manifestorState) Stop() error {
7582
m.Channels.Stop <- struct{}{}
76-
return <-m.Channels.Error
83+
err := <-m.Channels.Error
84+
m.Channels.close()
85+
return err
7786
}
7887

7988
// starts generating a manifest for the given transfer, moving it subsequently to that transfer's
@@ -117,8 +126,9 @@ func (m *manifestorState) process() {
117126
} else {
118127
manifestor.Channels.Error <- NotFoundError{Id: transferId}
119128
}
120-
case <-mover.Channels.Stop:
129+
case <-manifestor.Channels.Stop:
121130
running = false
131+
manifestor.Channels.Error <- nil
122132
}
123133

124134
time.Sleep(pollInterval)

transfers/manifestor_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) 2023 The KBase Project and its Contributors
2+
// Copyright (c) 2023 Cohere Consulting, LLC
3+
//
4+
// Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
// this software and associated documentation files (the "Software"), to deal in
6+
// the Software without restriction, including without limitation the rights to
7+
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
8+
// of the Software, and to permit persons to whom the Software is furnished to do
9+
// so, subject to the following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included in all
12+
// copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
// SOFTWARE.
21+
22+
package transfers
23+
24+
import (
25+
"testing"
26+
27+
"github.com/stretchr/testify/assert"
28+
)
29+
30+
func TestStartAndStopManifestor(t *testing.T) {
31+
assert := assert.New(t)
32+
err := manifestor.Start()
33+
assert.Nil(err)
34+
err = manifestor.Stop()
35+
assert.Nil(err)
36+
}

transfers/mover.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,18 @@ type moverChannels struct {
5252
Stop chan struct{}
5353
}
5454

55+
func (channels *moverChannels) close() {
56+
close(channels.RequestMove)
57+
close(channels.RequestCancellation)
58+
close(channels.Error)
59+
close(channels.Stop)
60+
}
61+
5562
// starts the mover
5663
func (m *moverState) Start() error {
5764
m.Channels = moverChannels{
5865
RequestMove: make(chan uuid.UUID, 32),
66+
RequestCancellation: make(chan uuid.UUID, 32),
5967
Error: make(chan error, 32),
6068
Stop: make(chan struct{}),
6169
}
@@ -67,7 +75,9 @@ func (m *moverState) Start() error {
6775
// stops the mover goroutine
6876
func (m *moverState) Stop() error {
6977
m.Channels.Stop <- struct{}{}
70-
return <-m.Channels.Error
78+
err := <-m.Channels.Error
79+
m.Channels.close()
80+
return err
7181
}
7282

7383
// starts moving files associated with the given transfer ID
@@ -111,6 +121,7 @@ func (m *moverState) process() {
111121
}
112122
case <-mover.Channels.Stop:
113123
running = false
124+
mover.Channels.Error <- nil
114125
}
115126

116127
time.Sleep(pollInterval)

transfers/mover_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) 2023 The KBase Project and its Contributors
2+
// Copyright (c) 2023 Cohere Consulting, LLC
3+
//
4+
// Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
// this software and associated documentation files (the "Software"), to deal in
6+
// the Software without restriction, including without limitation the rights to
7+
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
8+
// of the Software, and to permit persons to whom the Software is furnished to do
9+
// so, subject to the following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included in all
12+
// copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
// SOFTWARE.
21+
22+
package transfers
23+
24+
import (
25+
"testing"
26+
27+
"github.com/stretchr/testify/assert"
28+
)
29+
30+
func TestStartAndStopMover(t *testing.T) {
31+
assert := assert.New(t)
32+
err := mover.Start()
33+
assert.Nil(err)
34+
err = mover.Stop()
35+
assert.Nil(err)
36+
}

transfers/stager.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,18 @@ type stagerChannels struct {
5151
Stop chan struct{}
5252
}
5353

54+
func (channels *stagerChannels) close() {
55+
close(channels.RequestStaging)
56+
close(channels.RequestCancellation)
57+
close(channels.Error)
58+
close(channels.Stop)
59+
}
60+
5461
// starts the stager
5562
func (s *stagerState) Start() error {
5663
s.Channels = stagerChannels{
5764
RequestStaging: make(chan uuid.UUID, 32),
65+
RequestCancellation: make(chan uuid.UUID, 32),
5866
Error: make(chan error, 32),
5967
Stop: make(chan struct{}),
6068
}
@@ -65,7 +73,9 @@ func (s *stagerState) Start() error {
6573
// stops the stager goroutine
6674
func (s *stagerState) Stop() error {
6775
s.Channels.Stop <- struct{}{}
68-
return <-s.Channels.Error
76+
err := <-s.Channels.Error
77+
s.Channels.close()
78+
return err
6979
}
7080

7181
// requests that files be staged for the transfer with the given ID
@@ -106,6 +116,7 @@ func (s *stagerState) process() {
106116
}
107117
case <-stager.Channels.Stop:
108118
running = false
119+
stager.Channels.Error <- nil
109120
}
110121

111122
time.Sleep(pollInterval)

transfers/stager_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) 2023 The KBase Project and its Contributors
2+
// Copyright (c) 2023 Cohere Consulting, LLC
3+
//
4+
// Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
// this software and associated documentation files (the "Software"), to deal in
6+
// the Software without restriction, including without limitation the rights to
7+
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
8+
// of the Software, and to permit persons to whom the Software is furnished to do
9+
// so, subject to the following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included in all
12+
// copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
// SOFTWARE.
21+
22+
package transfers
23+
24+
import (
25+
"testing"
26+
27+
"github.com/stretchr/testify/assert"
28+
)
29+
30+
func TestStartAndStopStager(t *testing.T) {
31+
assert := assert.New(t)
32+
err := stager.Start()
33+
assert.Nil(err)
34+
err = stager.Stop()
35+
assert.Nil(err)
36+
}

transfers/store.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,21 @@ type storeChannels struct {
6161
Stop chan struct{}
6262
}
6363

64+
func (channels *storeChannels) close() {
65+
close(channels.RequestNewTransfer)
66+
close(channels.ReturnNewTransfer)
67+
close(channels.RequestSpec)
68+
close(channels.ReturnSpec)
69+
close(channels.RequestDescriptors)
70+
close(channels.ReturnDescriptors)
71+
close(channels.SetStatus)
72+
close(channels.RequestStatus)
73+
close(channels.ReturnStatus)
74+
close(channels.RequestRemoval)
75+
close(channels.Error)
76+
close(channels.Stop)
77+
}
78+
6479
type transferIdAndStatus struct {
6580
Id uuid.UUID
6681
Status TransferStatus
@@ -88,7 +103,10 @@ func (s *storeState) Start() error {
88103

89104
// stops the store goroutine
90105
func (s *storeState) Stop() error {
91-
return nil
106+
s.Channels.Stop <- struct{}{}
107+
err := <-s.Channels.Error
108+
s.Channels.close()
109+
return err
92110
}
93111

94112
// creates a new entry for a transfer within the store, populating it with relevant metadata and
@@ -205,6 +223,7 @@ func (s *storeState) process() {
205223
store.Channels.Error <- TransferNotFoundError{Id: id}
206224
}
207225
case <-store.Channels.Stop:
226+
store.Channels.Error <- nil
208227
running = false
209228
}
210229
}

0 commit comments

Comments
 (0)