Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c0067c1
Starting transfer orchestrator refactor in earnest.
jeff-cohere Oct 9, 2025
c189154
Breaking tasks logic up into concurrent sequential programs.
jeff-cohere Oct 11, 2025
552dcd0
Replacing tasks package with transfers package.
jeff-cohere Oct 14, 2025
88e529a
Fixed some test failures.
jeff-cohere Oct 14, 2025
e835116
Roughing in some transfer tests.
jeff-cohere Oct 15, 2025
183c163
Getting tests organized.
jeff-cohere Oct 16, 2025
2475179
Adding a clock for polling other routines.
jeff-cohere Oct 20, 2025
4287c9c
Debugging tests.
jeff-cohere Oct 20, 2025
64701f0
Fixed various issues with transfer tests.
jeff-cohere Oct 21, 2025
831f634
Fixed an issue with JSON descriptors.
jeff-cohere Oct 22, 2025
6cc3d8d
Added transfer status callbacks useful for testing.
jeff-cohere Oct 22, 2025
ebc9d08
Replaced callbacks with a simple publish/subscribe system that delive…
jeff-cohere Oct 24, 2025
8376abe
Implemented saving and loading.
jeff-cohere Oct 27, 2025
4d43a18
Fixed transfer tests.
jeff-cohere Oct 28, 2025
a0e1935
Test fixtures now mock file staging properly.
jeff-cohere Oct 28, 2025
9d23b7f
Switched prototype from tasks to transfers package.
jeff-cohere Oct 28, 2025
775d53c
Removing unused code.
jeff-cohere Oct 28, 2025
8025c64
Partially propagated config instance into orchestration logic.
jeff-cohere Oct 28, 2025
8eda76b
Placating static analyzer.
jeff-cohere Oct 28, 2025
b136f15
Addressed PR comments aside from channel buffer sizes.
jeff-cohere Oct 29, 2025
9ed81ef
Added orchestration README and diagram.
jeff-cohere Oct 29, 2025
a82d1da
Reduced channel buffering.
jeff-cohere Oct 30, 2025
18a6882
Rebased and patched up nmdc database constructor.
jeff-cohere Oct 30, 2025
2453c9c
Changes from testing DTS in Spin.
jeff-cohere Nov 5, 2025
cf1217c
Fixed a staging issue.
jeff-cohere Nov 6, 2025
0ff128a
Fixing last batch of defects.
jeff-cohere Nov 6, 2025
a86719d
Removed debugging statements.
jeff-cohere Nov 6, 2025
4b0e2dd
Eradicated old "task" language from transfers package, and removed ol…
jeff-cohere Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions databases/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ func RegisterDatabase(dbName string, createDb func() (Database, error)) error {
}
}

// returns a list of names of registered databases
func RegisteredDatabases() []string {
dbs := make([]string, 0)
for name := range createDatabaseFuncs_ {
dbs = append(dbs, name)
}
return dbs
}

// returns true if a database has been registered with the given name, false if not
func HaveDatabase(dbName string) bool {
_, found := createDatabaseFuncs_[dbName]
Expand Down
48 changes: 35 additions & 13 deletions dtstest/dtstest.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type Endpoint struct {
Xfers map[uuid.UUID]transferInfo
// root path
RootPath string
// a set of files on this endpoint that have been staged
StagedFiles map[string]bool
}

// Registers an endpoint test fixture with the given name in the configuration,
Expand All @@ -112,9 +114,10 @@ func RegisterEndpoint(endpointName string, options EndpointOptions) error {
slog.Debug(fmt.Sprintf("Registering test endpoint %s...", endpointName))
newEndpointFunc := func(name string) (endpoints.Endpoint, error) {
return &Endpoint{
Options: options,
Xfers: make(map[uuid.UUID]transferInfo),
RootPath: config.Endpoints[endpointName].Root,
Options: options,
Xfers: make(map[uuid.UUID]transferInfo),
RootPath: config.Endpoints[endpointName].Root,
StagedFiles: make(map[string]bool),
}, nil
}
provider := config.Endpoints[endpointName].Provider
Expand All @@ -129,21 +132,29 @@ func (ep *Endpoint) Root() string {
return ep.RootPath
}

func (ep *Endpoint) FilesStaged(files []any) (bool, error) {
func (ep *Endpoint) FilesStaged(files []map[string]any) (bool, error) {
if ep.Database != nil {
// are there any unrecognized files?
for _, file := range files {
descriptor := file.(map[string]any)
for _, descriptor := range files {
fileId := descriptor["id"].(string)
if _, found := ep.Database.descriptors[fileId]; !found {
return false, fmt.Errorf("unrecognized file: %s", fileId)
}
}
// the source endpoint should report true for the staged files as long
// as the source database has had time to stage them
for _, req := range ep.Database.Staging {
if time.Since(req.Time) < ep.Options.StagingDuration {
return false, nil
if ep.Options.StagingDuration > 0 {
for _, descriptor := range files {
fileId := descriptor["id"].(string)
if _, staged := ep.StagedFiles[fileId]; !staged {
return false, nil
}
}
// the source endpoint should report true for the staged files as long
// as the source database has had time to stage them
for _, req := range ep.Database.Staging {
slog.Debug(fmt.Sprintf("Request time: %s", req.Time.String()))
if time.Since(req.Time) < ep.Options.StagingDuration {
return false, nil
}
}
}
}
Expand Down Expand Up @@ -180,7 +191,7 @@ func (ep *Endpoint) Status(id uuid.UUID) (endpoints.TransferStatus, error) {
}
return info.Status, nil
}
return endpoints.TransferStatus{}, fmt.Errorf("invalid transfer ID: %s", id.String())
return endpoints.TransferStatus{}, fmt.Errorf("invalid endpoint transfer ID: %s", id.String())
}

func (ep *Endpoint) Cancel(id uuid.UUID) error {
Expand All @@ -198,6 +209,7 @@ type stagingRequest struct {

// This type implements a databases.Database test fixture
type Database struct {
Name string
Endpt endpoints.Endpoint
descriptors map[string]map[string]any
Staging map[uuid.UUID]stagingRequest
Expand All @@ -212,6 +224,7 @@ func RegisterDatabase(databaseName string, descriptors map[string]map[string]any
return nil, err
}
db := Database{
Name: databaseName,
Endpt: endpoint,
descriptors: descriptors,
Staging: make(map[uuid.UUID]stagingRequest),
Expand Down Expand Up @@ -267,6 +280,12 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error)
if info, found := db.Staging[id]; found {
endpoint := db.Endpt.(*Endpoint)
if time.Since(info.Time) >= endpoint.Options.StagingDuration { // FIXME: not always so!
// update the staged status on the test endpoint
stagingRequest := db.Staging[id]
for _, fileId := range stagingRequest.FileIds {
endpoint.StagedFiles[fileId] = true
}

return databases.StagingStatusSucceeded, nil
}
return databases.StagingStatusActive, nil
Expand All @@ -287,9 +306,12 @@ func (db *Database) LocalUser(orcid string) (string, error) {
}

func (db *Database) Save() (databases.DatabaseSaveState, error) {
return databases.DatabaseSaveState{}, nil
return databases.DatabaseSaveState{
Name: db.Name,
}, nil
}

func (db *Database) Load(state databases.DatabaseSaveState) error {
db.Name = state.Name
return nil
}
2 changes: 1 addition & 1 deletion endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Endpoint interface {
Root() string
// Returns true if the files associated with the given Frictionless
// descriptors are staged at this endpoint AND are valid, false otherwise.
FilesStaged(files []any) (bool, error)
FilesStaged(descriptors []map[string]any) (bool, error)
// Returns a list of UUIDs for all transfers associated with this endpoint.
Transfers() ([]uuid.UUID, error)
// Begins a transfer task that moves the files identified by the FileTransfer
Expand Down
5 changes: 2 additions & 3 deletions endpoints/globus/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,10 @@ func (ep *Endpoint) Root() string {
return ep.RootDir
}

func (ep *Endpoint) FilesStaged(files []any) (bool, error) {
func (ep *Endpoint) FilesStaged(descriptors []map[string]any) (bool, error) {
// find all the directories in which these files reside
filesInDir := make(map[string][]string)
for _, resource := range files {
descriptor := resource.(map[string]any)
for _, descriptor := range descriptors {
dir, file := filepath.Split(descriptor["path"].(string))
dir = filepath.Join(ep.RootDir, dir)
if _, found := filesInDir[dir]; !found {
Expand Down
6 changes: 3 additions & 3 deletions endpoints/globus/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ func TestGlobusFilesStaged(t *testing.T) {
endpoint, _ := NewEndpointFromConfig("source")

// provide an empty slice of filenames, which should return true
staged, err := endpoint.FilesStaged([]any{})
staged, err := endpoint.FilesStaged([]map[string]any{})
assert.True(staged)
assert.Nil(err)

// provide a file that's known to be on the source endpoint, which
// should return true
descriptors := make([]any, 0)
descriptors := make([]map[string]any, 0)
for i := 1; i <= 3; i++ {
id := fmt.Sprintf("%d", i)
d := map[string]any{ // descriptor
Expand All @@ -147,7 +147,7 @@ func TestGlobusFilesStaged(t *testing.T) {
"path": "yaddayadda/yadda/yaddayadda/yaddayaddayadda.xml",
}
assert.Nil(err)
descriptors = []any{nonexistent}
descriptors = []map[string]any{nonexistent}
staged, err = endpoint.FilesStaged(descriptors)
assert.False(staged)
assert.Nil(err)
Expand Down
7 changes: 3 additions & 4 deletions endpoints/local/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ func (ep *Endpoint) Root() string {
return ep.root
}

func (ep *Endpoint) FilesStaged(files []any) (bool, error) {
for _, resource := range files {
descriptor := resource.(map[string]any)
func (ep *Endpoint) FilesStaged(descriptors []map[string]any) (bool, error) {
for _, descriptor := range descriptors {
absPath := filepath.Join(ep.root, descriptor["path"].(string))
_, err := os.Stat(absPath)
if err != nil {
Expand Down Expand Up @@ -191,7 +190,7 @@ func (ep *Endpoint) Transfer(dst endpoints.Endpoint, files []endpoints.FileTrans
}

// first, we check that all requested files are staged on this endpoint
requestedFiles := make([]any, len(files))
requestedFiles := make([]map[string]any, len(files))
for i, file := range files {
requestedFiles[i] = map[string]any{
"path": file.SourcePath, // only the Path field is required
Expand Down
6 changes: 3 additions & 3 deletions endpoints/local/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ func TestGlobusFilesStaged(t *testing.T) {
endpoint, _ := NewEndpoint("source")

// provide an empty slice of filenames, which should return true
staged, err := endpoint.FilesStaged([]any{})
staged, err := endpoint.FilesStaged([]map[string]any{})
assert.True(staged)
assert.Nil(err)

// provide files that are known to be on the source endpoint
descriptors := make([]any, 0)
descriptors := make([]map[string]any, 0)
for i := 1; i <= 3; i++ {
id := fmt.Sprintf("%d", i)
d := map[string]any{
Expand All @@ -167,7 +167,7 @@ func TestGlobusFilesStaged(t *testing.T) {
"id": "yadda",
"path": "yaddayadda/yadda/yaddayadda/yaddayaddayadda.xml",
}
descriptors = []any{nonexistent}
descriptors = []map[string]any{nonexistent}
staged, err = endpoint.FilesStaged(descriptors)
assert.False(staged)
assert.Nil(err)
Expand Down
2 changes: 1 addition & 1 deletion journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Record struct {
// status of the transfer ("succeeded", "failed", or "canceled")
Status string `json:"status"`
// size of the transfer's payload in bytes
PayloadSize int64 `json:"payload_size"`
PayloadSize uint64 `json:"payload_size"`
// number of files in the transfer's payload
NumFiles int `json:"num_files"`
// manifest containing metadata for the transfer's payload (stored separate from record)
Expand Down
4 changes: 2 additions & 2 deletions journal/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (t *SerialTests) TestRecordSuccessfulTransfer() {
Status: "succeeded",
StartTime: time.Now().Add(-time.Hour),
StopTime: time.Now(),
PayloadSize: int64(12853294),
PayloadSize: uint64(12853294),
NumFiles: 12,
Manifest: manifest,
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func (t *SerialTests) TestRecordFailedTransfer() {
Status: "failed",
StartTime: time.Now(),
StopTime: time.Now(),
PayloadSize: int64(12853294),
PayloadSize: uint64(12853294),
NumFiles: 12,
}
err = RecordTransfer(record)
Expand Down
22 changes: 11 additions & 11 deletions services/prototype.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/kbase/dts/databases"
"github.com/kbase/dts/endpoints"
"github.com/kbase/dts/journal"
"github.com/kbase/dts/tasks"
"github.com/kbase/dts/transfers"
)

// This type implements the TransferService interface, allowing file transfers
Expand Down Expand Up @@ -95,8 +95,8 @@ func (service *prototype) Start(conf config.Config) error {
defer listener.Close()
listener = netutil.LimitListener(listener, conf.Service.MaxConnections)

// start tasks processing
err = tasks.Start(conf)
// start processing transfers
err = transfers.Start(conf)
if err != nil {
return err
}
Expand All @@ -115,7 +115,7 @@ func (service *prototype) Start(conf config.Config) error {

// gracefully shuts down the service without interrupting active connections
func (service *prototype) Shutdown(ctx context.Context) error {
tasks.Stop()
transfers.Stop()
if service.Server != nil {
return service.Server.Shutdown(ctx)
}
Expand All @@ -124,7 +124,7 @@ func (service *prototype) Shutdown(ctx context.Context) error {

// closes down the service abruptly, freeing all resources
func (service *prototype) Close() {
tasks.Stop()
transfers.Stop()
if service.Server != nil {
service.Server.Close()
}
Expand All @@ -136,8 +136,8 @@ func (service *prototype) Close() {

// Version numbers
var majorVersion = 0
var minorVersion = 9
var patchVersion = 7
var minorVersion = 10
var patchVersion = 0

// Version string
var version = fmt.Sprintf("%d.%d.%d", majorVersion, minorVersion, patchVersion)
Expand Down Expand Up @@ -684,7 +684,7 @@ func (service *prototype) createTransfer(ctx context.Context,
}
}

taskId, err := tasks.Create(tasks.Specification{
taskId, err := transfers.Create(transfers.Specification{
User: user,
Source: input.Body.Source,
Destination: input.Body.Destination,
Expand All @@ -695,7 +695,7 @@ func (service *prototype) createTransfer(ctx context.Context,
if err != nil {
slog.Error(err.Error())
switch err.(type) {
case *tasks.NoFilesRequestedError:
case *transfers.NoFilesRequestedError:
return nil, huma.Error400BadRequest(err.Error())
case *databases.NotFoundError:
return nil, huma.Error404NotFound(err.Error())
Expand Down Expand Up @@ -747,7 +747,7 @@ func (service *prototype) getTransferStatus(ctx context.Context,
}

// fetch the status for the job using the appropriate task data
status, err := tasks.Status(input.Id)
status, err := transfers.Status(input.Id)
if err != nil {
return nil, huma.Error404NotFound(err.Error())
}
Expand All @@ -774,7 +774,7 @@ func (service *prototype) deleteTransfer(ctx context.Context,
}) (*TaskDeletionOutput, error) {

// request that the task be canceled
err := tasks.Cancel(input.Id)
err := transfers.Cancel(input.Id)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading