Skip to content

Commit 5f357d9

Browse files
authored
Merge pull request #447 from MG-RAST/develop
version 0.9.27
2 parents 2e30198 + a1d87cc commit 5f357d9

File tree

12 files changed

+154
-73
lines changed

12 files changed

+154
-73
lines changed

RELEASE_NOTES.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# v0.9.27
2+
3+
- improved log / error messages
4+
- added 'report' option to job GET, returns a summary of job with stdout, stderr, and worknotes for each workunit ran
5+
- changed max jid to be stored in mongodb and not on filesystem
6+
- this is backwards compatible, will use jid on filesystem for first time if no value is in mongodb
7+
18
# v0.9.26
29

310
- added 'full' option to job DELETE, actually deletes job from mongoDB and filesystem

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.9.26
1+
0.9.27

awe-server/awe-server.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,7 @@ func main() {
239239
fmt.Println("Done")
240240
}
241241

242-
if conf.DEBUG_LEVEL > 0 {
243-
fmt.Println("init max job number (jid)...")
244-
}
245-
//init max job number (jid)
242+
// init max job number (jid), backwards compatible with jobid file
246243
if err := core.QMgr.InitMaxJid(); err != nil {
247244
fmt.Fprintf(os.Stderr, "ERROR from InitMaxJid : %v\n", err)
248245
os.Exit(1)

lib/conf/conf.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ var GIT_COMMIT_HASH string // use -ldflags "-X github.com/MG-RAST/AWE/lib/conf.G
1919
const BasePriority int = 1
2020

2121
const DB_COLL_JOBS string = "Jobs"
22+
const DB_COLL_JID string = "MaxID"
2223
const DB_COLL_PERF string = "Perf"
2324
const DB_COLL_CGS string = "ClientGroups"
2425
const DB_COLL_USERS string = "Users"
26+
const JOB_ID_START int = 10000
2527

2628
//default index type used for intermediate data
2729
const DEFAULT_INDEX string = "chunkrecord"

lib/controller/jobController.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,16 @@ func (cr *JobController) Read(id string, cx *goweb.Context) {
205205
return
206206
}
207207

208+
if query.Has("report") {
209+
jobLogs, err := job.GetJobLogs()
210+
if err != nil {
211+
logger.Error("Err@GetJobLogs: " + id + ":" + err.Error())
212+
cx.RespondWithErrorMessage("job logs not found: "+id, http.StatusBadRequest)
213+
}
214+
cx.RespondWithData(jobLogs)
215+
return
216+
}
217+
208218
if core.QMgr.IsJobRegistered(id) {
209219
job.Registered = true
210220
} else {
@@ -677,7 +687,7 @@ func (cr *JobController) Update(id string, cx *goweb.Context) {
677687
cx.RespondWithErrorMessage("fail to recompute job: "+id+" "+err.Error(), http.StatusBadRequest)
678688
return
679689
}
680-
cx.RespondWithData("job recompute started: " + id)
690+
cx.RespondWithData("job recompute started at task " + stage + ": " + id)
681691
return
682692
}
683693
if query.Has("resubmit") { // to recompute a job from the beginning, all tasks will be computed

lib/core/core.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,6 @@ func getParentTask(taskid string, origin int) string {
652652
return taskid
653653
}
654654

655-
//
656655
func contains(list []string, elem string) bool {
657656
for _, t := range list {
658657
if t == elem {
@@ -662,14 +661,6 @@ func contains(list []string, elem string) bool {
662661
return false
663662
}
664663

665-
func jidIncr(jid string) (newjid string) {
666-
if jidint, err := strconv.Atoi(jid); err == nil {
667-
jidint += 1
668-
return strconv.Itoa(jidint)
669-
}
670-
return jid
671-
}
672-
673664
//functions for REST API communication (=deprecated=)
674665
//notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"
675666
func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error) {

lib/core/db.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,21 @@ func InitClientGroupDB() {
5353
cc.EnsureIndex(mgo.Index{Key: []string{"token"}, Unique: true})
5454
}
5555

56+
// if max job id does not exist, set to start value
57+
func initMaxJidDB(startjid int) (err error) {
58+
session := db.Connection.Session.Copy()
59+
defer session.Close()
60+
cc := session.DB(conf.MONGODB_DATABASE).C(conf.DB_COLL_JID)
61+
cc.EnsureIndex(mgo.Index{Key: []string{"name"}, Unique: true})
62+
if _, jerr := dbFindMaxJobID(); jerr != nil {
63+
initjid := &JobID{"jid", startjid}
64+
if err := dbUpsert(initjid); err != nil {
65+
return err
66+
}
67+
}
68+
return
69+
}
70+
5671
func dbDelete(q bson.M, coll string) (err error) {
5772
session := db.Connection.Session.Copy()
5873
defer session.Close()
@@ -80,6 +95,9 @@ func dbUpsert(t interface{}) (err error) {
8095
case *ClientGroup:
8196
c := session.DB(conf.MONGODB_DATABASE).C(conf.DB_COLL_CGS)
8297
_, err = c.Upsert(bson.M{"id": t.Id}, &t)
98+
case *JobID:
99+
c := session.DB(conf.MONGODB_DATABASE).C(conf.DB_COLL_JID)
100+
_, err = c.Upsert(bson.M{"name": t.Name}, &t)
83101
default:
84102
fmt.Printf("invalid database entry type\n")
85103
}
@@ -181,6 +199,16 @@ func dbFindSortClientGroups(q bson.M, results *ClientGroups, options map[string]
181199
return
182200
}
183201

202+
func dbFindMaxJobID() (jid *JobID, err error) {
203+
session := db.Connection.Session.Copy()
204+
defer session.Close()
205+
c := session.DB(conf.MONGODB_DATABASE).C(conf.DB_COLL_JID)
206+
if err = c.Find(bson.M{}).One(&jid); err == nil {
207+
return jid, nil
208+
}
209+
return nil, err
210+
}
211+
184212
func LoadJob(id string) (job *Job, err error) {
185213
job = new(Job)
186214
session := db.Connection.Session.Copy()

lib/core/io.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type IO struct {
3232
ShockIndex string `bson:"shockindex" json:"shockindex"` // on input it indicates that Shock node has to be indexed by AWE server
3333
AttrFile string `bson:"attrfile" json:"attrfile"`
3434
NoFile bool `bson:"nofile" json:"nofile"`
35-
Delete bool `bson:"delete" json:"delete"`
35+
Delete bool `bson:"delete" json:"delete"` // speficies that this is a temorary node, to be deleted from shock on job completion
3636
Type string `bson:"type" json:"type"`
3737
NodeAttr map[string]interface{} `bson:"nodeattr" json:"nodeattr"` // specifies attribute data to be stored in shock node (output only)
3838
FormOptions map[string]string `bson:"formoptions" json:"formoptions"`

lib/core/job.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ type JobMin struct {
7878
UserAttr map[string]string `bson:"userattr" json:"userattr"`
7979
}
8080

81+
type JobLog struct {
82+
Id string `bson:"id" json:"id"`
83+
State string `bson:"state" json:"state"`
84+
UpdateTime time.Time `bson:"updatetime" json:"updatetime"`
85+
Notes string `bson:"notes" json:"notes"`
86+
LastFailed string `bson:"lastfailed" json:"lastfailed"`
87+
Resumed int `bson:"resumed" json:"resumed"`
88+
Tasks []*TaskLog `bson:"tasks" json:"tasks"`
89+
}
90+
91+
type JobID struct {
92+
Name string `bson:"name" json:"name"`
93+
Max int `bson:"max" json:"max"`
94+
}
95+
8196
//set job's uuid
8297
func (job *Job) setId() {
8398
job.Id = uuid.New()
@@ -302,6 +317,20 @@ func (job *Job) GetPrivateEnv(taskid string) (env map[string]string) {
302317
return
303318
}
304319

320+
func (job *Job) GetJobLogs() (jlog *JobLog, err error) {
321+
jlog = new(JobLog)
322+
jlog.Id = job.Id
323+
jlog.State = job.State
324+
jlog.UpdateTime = job.UpdateTime
325+
jlog.Notes = job.Notes
326+
jlog.LastFailed = job.LastFailed
327+
jlog.Resumed = job.Resumed
328+
for _, task := range job.Tasks {
329+
jlog.Tasks = append(jlog.Tasks, task.GetTaskLogs())
330+
}
331+
return
332+
}
333+
305334
func ReloadFromDisk(path string) (err error) {
306335
id := filepath.Base(path)
307336
jobbson, err := ioutil.ReadFile(path + "/" + id + ".bson")

lib/core/servermgr.go

Lines changed: 26 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ type ServerMgr struct {
3333
taskMap map[string]*Task
3434
actJobs map[string]*JobPerf
3535
susJobs map[string]bool
36-
jsReq chan bool //channel for job submission request (JobController -> qmgr.Handler)
37-
jsAck chan string //channel for return an assigned job number in response to jsReq (qmgr.Handler -> JobController)
38-
taskIn chan *Task //channel for receiving Task (JobController -> qmgr.Handler)
39-
coSem chan int //semaphore for checkout (mutual exclusion between different clients)
40-
nextJid string //next jid that will be assigned to newly submitted job
36+
jsReq chan bool //channel for job submission request (JobController -> qmgr.Handler)
37+
jsAck chan int //channel for return an assigned job number in response to jsReq (qmgr.Handler -> JobController)
38+
taskIn chan *Task //channel for receiving Task (JobController -> qmgr.Handler)
39+
coSem chan int //semaphore for checkout (mutual exclusion between different clients)
40+
nextJid int //next jid that will be assigned to newly submitted job
4141
}
4242

4343
func NewServerMgr() *ServerMgr {
@@ -53,11 +53,11 @@ func NewServerMgr() *ServerMgr {
5353
},
5454
taskMap: map[string]*Task{},
5555
jsReq: make(chan bool),
56-
jsAck: make(chan string),
56+
jsAck: make(chan int),
5757
taskIn: make(chan *Task, 1024),
5858
actJobs: map[string]*JobPerf{},
5959
susJobs: map[string]bool{},
60-
nextJid: "",
60+
nextJid: 0,
6161
}
6262
}
6363

@@ -68,7 +68,7 @@ func (qm *ServerMgr) JidHandle() {
6868
<-qm.jsReq
6969
jid := qm.getNextJid()
7070
qm.jsAck <- jid
71-
logger.Debug(2, fmt.Sprintf("qmgr:receive a job submission request, assigned jid=%s", jid))
71+
logger.Debug(2, fmt.Sprintf("qmgr:receive a job submission request, assigned jid=%d", jid))
7272
}
7373
}
7474

@@ -336,47 +336,22 @@ func (qm *ServerMgr) listTasks() (ids []string) {
336336
//--------server methods-------
337337

338338
func (qm *ServerMgr) InitMaxJid() (err error) {
339+
// this is for backwards compatibility with jid on filysystem
340+
startjid := conf.JOB_ID_START
339341
jidfile := conf.DATA_PATH + "/maxjid"
340-
341-
if _, err := os.Stat(jidfile); err != nil {
342-
343-
f, err := os.Create(jidfile)
344-
if err != nil {
345-
fmt.Fprintf(os.Stderr, fmt.Sprintf("error creating jidfile ", err.Error())) // logger does not work
346-
logger.Error(fmt.Sprintf("error creating jidfile ", err.Error()))
347-
return err
348-
}
349-
f.WriteString("10000")
350-
qm.nextJid = "10001"
351-
f.Close()
352-
} else {
353-
354-
buf, err := ioutil.ReadFile(jidfile)
355-
if err != nil {
356-
if conf.DEBUG_LEVEL > 0 {
357-
fmt.Println("error ioutil.ReadFile(jidfile)")
358-
}
359-
return err
360-
}
342+
if buf, ferr := ioutil.ReadFile(jidfile); ferr == nil {
361343
bufstr := strings.TrimSpace(string(buf))
362-
363-
maxjid, err := strconv.Atoi(bufstr)
364-
if err != nil {
365-
if conf.DEBUG_LEVEL > 0 {
366-
fmt.Println(fmt.Sprintf("error strconv.Atoi(bufstr), bufstr=\"%s\"", bufstr))
367-
}
368-
fmt.Fprintf(os.Stderr, fmt.Sprintf("Could not convert \"%s\" into int", bufstr)) // logger does not work
369-
logger.Error(fmt.Sprintf("Could not convert \"%s\" into int", bufstr))
370-
return err
344+
if jid, serr := strconv.Atoi(bufstr); serr == nil {
345+
startjid = jid
371346
}
372-
373-
qm.nextJid = strconv.Itoa(maxjid + 1)
374-
375347
}
376-
if conf.DEBUG_LEVEL > 0 {
377-
fmt.Println("in InitMaxJid C")
348+
// set in mongoDB
349+
if err := initMaxJidDB(startjid); err != nil {
350+
return err
378351
}
379-
logger.Debug(2, fmt.Sprintf("qmgr:jid initialized, next jid=%s", qm.nextJid))
352+
qm.nextJid = startjid + 1
353+
354+
logger.Debug(2, fmt.Sprintf("qmgr:jid initialized, next jid=%d", qm.nextJid))
380355
return
381356
}
382357

@@ -1012,19 +987,18 @@ func (qm *ServerMgr) ShowTasks() {
1012987
//---job methods---
1013988
func (qm *ServerMgr) JobRegister() (jid string, err error) {
1014989
qm.jsReq <- true
1015-
jid = <-qm.jsAck
1016-
1017-
if jid == "" {
990+
id := <-qm.jsAck
991+
if id == 0 {
1018992
return "", errors.New("failed to assign a job number for the newly submitted job")
1019993
}
1020-
return jid, nil
994+
return strconv.Itoa(id), nil
1021995
}
1022996

1023-
func (qm *ServerMgr) getNextJid() (jid string) {
997+
func (qm *ServerMgr) getNextJid() (jid int) {
1024998
jid = qm.nextJid
1025-
jidfile := conf.DATA_PATH + "/maxjid"
1026-
ioutil.WriteFile(jidfile, []byte(jid), 0644)
1027-
qm.nextJid = jidIncr(qm.nextJid)
999+
nextjid := &JobID{"jid", jid}
1000+
dbUpsert(nextjid)
1001+
qm.nextJid += 1
10281002
return jid
10291003
}
10301004

0 commit comments

Comments
 (0)