Skip to content

Commit 2ff2f8f

Browse files
fix: address synchronization issues in async deployment (#337)
1 parent 5c1ff07 commit 2ff2f8f

File tree

1 file changed

+16
-46
lines changed

1 file changed

+16
-46
lines changed

job/deploy_manager.go

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package job
33
import (
44
"context"
55
"errors"
6-
"fmt"
7-
"sync"
86
"sync/atomic"
97
"time"
108

@@ -40,8 +38,6 @@ type deployManager struct {
4038
requestQ chan models.JobDeployment
4139

4240
assignerScheduler *cron.Cron
43-
44-
wg sync.WaitGroup
4541
}
4642

4743
func (m *deployManager) Deploy(ctx context.Context, projectSpec models.ProjectSpec) (models.DeploymentID, error) {
@@ -86,15 +82,6 @@ func (m *deployManager) GetStatus(ctx context.Context, deployID models.Deploymen
8682
}
8783

8884
func (m *deployManager) Init() {
89-
m.l.Info("starting deployers", "count", m.config.NumWorkers)
90-
for i := 0; i < m.config.NumWorkers; i++ {
91-
m.wg.Add(1)
92-
go m.spawnDeployer(m.deployer)
93-
}
94-
95-
// wait until all deployers are ready
96-
m.wg.Wait()
97-
9885
if m.assignerScheduler != nil {
9986
_, err := m.assignerScheduler.AddFunc(deployAssignInterval, m.Assign)
10087
if err != nil {
@@ -105,23 +92,15 @@ func (m *deployManager) Init() {
10592
}
10693

10794
// start a deployer goroutine that runs the deployment in background
108-
func (m *deployManager) spawnDeployer(deployer Deployer) {
95+
func (m *deployManager) spawnDeployer(deployer Deployer, deployRequest models.JobDeployment) {
10996
// deployer has spawned
110-
atomic.AddInt32(&m.deployerCapacity, 1)
111-
m.wg.Done()
112-
113-
// TODO: avoid having multiple query
114-
for deployRequest := range m.requestQ {
115-
atomic.AddInt32(&m.deployerCapacity, -1)
116-
117-
m.l.Info("deployer start processing a request", "request id", deployRequest.ID.UUID(), "project name", deployRequest.Project.Name)
118-
ctx, cancelCtx := context.WithTimeout(context.Background(), m.config.WorkerTimeout)
119-
if err := deployer.Deploy(ctx, deployRequest); err != nil {
120-
m.l.Error("deployer failed to process", deployRequest.ID.UUID(), "project name", deployRequest.Project.Name, "error", err.Error())
121-
}
122-
cancelCtx()
97+
defer atomic.AddInt32(&m.deployerCapacity, 1)
12398

124-
atomic.AddInt32(&m.deployerCapacity, 1)
99+
m.l.Info("deployer start processing a request", "request id", deployRequest.ID.UUID(), "project name", deployRequest.Project.Name)
100+
ctx, cancelCtx := context.WithTimeout(context.Background(), m.config.WorkerTimeout)
101+
defer cancelCtx()
102+
if err := deployer.Deploy(ctx, deployRequest); err != nil {
103+
m.l.Error("deployer failed to process", deployRequest.ID.UUID(), "project name", deployRequest.Project.Name, "error", err.Error())
125104
}
126105
}
127106

@@ -131,33 +110,24 @@ func (m *deployManager) Assign() {
131110
m.cancelTimedOutDeployments(ctx)
132111

133112
if int(atomic.LoadInt32(&m.deployerCapacity)) <= 0 {
134-
m.l.Debug("deployers are all occupied.")
113+
m.l.Info("deployers are all occupied.")
135114
return
136115
}
137116

138-
capacity := int(atomic.LoadInt32(&m.deployerCapacity))
139-
m.l.Debug("attempting to assign deployment request...", "capacity", capacity)
140-
for i := 0; i < capacity; i++ {
117+
for int(atomic.LoadInt32(&m.deployerCapacity)) > 0 {
118+
// TODO: avoid having multiple query
141119
jobDeployment, err := m.deployRepository.GetFirstExecutableRequest(ctx)
142120
if err != nil {
143121
if errors.Is(err, store.ErrResourceNotFound) {
144-
m.l.Debug(fmt.Sprintf("no deployment request found to assign deployer %d", i+1))
122+
m.l.Debug("no deployment request found to assign deployer")
145123
return
146124
}
147-
m.l.Error(fmt.Sprintf("failed to fetch executable deployment request to assign deployer %d", i+1), "error", err.Error())
125+
m.l.Error("failed to fetch executable deployment request to assign deployer", "error", err.Error())
148126
return
149127
}
150-
151-
select {
152-
case m.requestQ <- jobDeployment:
153-
m.l.Info(fmt.Sprintf("deployer %d taking a request", i+1), "request id", jobDeployment.ID.UUID(), "project name", jobDeployment.Project.Name)
154-
default:
155-
m.l.Error(fmt.Sprintf("unable to assign deployer %d to take the request", i+1), "request id", jobDeployment.ID.UUID(), "project name", jobDeployment.Project.Name)
156-
jobDeployment.Status = models.JobDeploymentStatusCancelled
157-
if err := m.deployRepository.Update(ctx, jobDeployment); err != nil {
158-
m.l.Error(fmt.Sprintf("unable to mark job deployment %s as cancelled", jobDeployment.ID.UUID()), "project name", jobDeployment.Project.Name, "error", err.Error())
159-
}
160-
}
128+
m.l.Info("deployer taking a request", "request id", jobDeployment.ID.UUID(), "project name", jobDeployment.Project.Name)
129+
atomic.AddInt32(&m.deployerCapacity, -1)
130+
go m.spawnDeployer(m.deployer, jobDeployment)
161131
}
162132
}
163133

@@ -186,7 +156,7 @@ func NewDeployManager(l log.Logger, deployerConfig config.Deployer, deployer Dep
186156
requestQ: make(chan models.JobDeployment),
187157
l: l,
188158
config: deployerConfig,
189-
deployerCapacity: 0,
159+
deployerCapacity: int32(deployerConfig.NumWorkers),
190160
deployer: deployer,
191161
uuidProvider: uuidProvider,
192162
deployRepository: deployRepository,

0 commit comments

Comments
 (0)