Skip to content

Commit 6052bd1

Browse files
authored
Merge pull request #540 from wgerlach/master
various AWE updates
2 parents 195d210 + c4b96e0 commit 6052bd1

File tree

91 files changed

+5757
-3540
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+5757
-3540
lines changed

Dockerfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
# docker build --force-rm --no-cache --rm -t mgrast/awe .
33
# docker tag mgrast/awe mgrast/awe:${TAG}
44

5+
# skycore push mgrast/awe:${TAG}
6+
# docker create --name awe-temporary mgrast/awe:${TAG}
7+
# docker cp awe-temporary:/go/bin/awe-worker .
8+
# docker cp awe-temporary:/go/bin/awe-submitter .
9+
# docker rm awe-temporary
10+
511

612
FROM golang:1.7.6-alpine
713

awe-submitter/awe-submitter.go

Lines changed: 155 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,15 @@ import (
77
"github.com/MG-RAST/AWE/lib/core/cwl"
88
cwl_types "github.com/MG-RAST/AWE/lib/core/cwl/types"
99
"github.com/MG-RAST/AWE/lib/logger"
10+
//"github.com/MG-RAST/AWE/lib/logger/event"
11+
"bytes"
1012
"github.com/MG-RAST/AWE/lib/shock"
1113
"github.com/davecgh/go-spew/spew"
1214
"gopkg.in/yaml.v2"
15+
"io"
16+
"io/ioutil"
17+
"mime/multipart"
18+
"net/http"
1319
"net/url"
1420
"os"
1521
"path"
@@ -54,6 +60,8 @@ func uploadFile(file *cwl_types.File, inputfile_path string) (err error) {
5460

5561
file_path := file.Path
5662

63+
basename := path.Base(file_path)
64+
5765
if file_path == "" {
5866
return
5967
}
@@ -75,35 +83,39 @@ func uploadFile(file *cwl_types.File, inputfile_path string) (err error) {
7583
}
7684
spew.Dump(node)
7785

78-
file.Location_url, err = url.Parse(conf.SERVER_URL + "/node/" + node.Id + "?download")
86+
file.Location_url, err = url.Parse(conf.SHOCK_URL + "/node/" + node.Id + "?download")
7987
if err != nil {
8088
return
8189
}
8290

8391
file.Location = file.Location_url.String()
8492
file.Path = ""
93+
file.Basename = basename
8594

8695
return
8796
}
8897

89-
func processInputData(native interface{}, inputfile_path string) (err error) {
98+
func processInputData(native interface{}, inputfile_path string) (count int, err error) {
99+
90100
fmt.Printf("(processInputData) start\n")
91101
defer fmt.Printf("(processInputData) end\n")
92102
switch native.(type) {
93103
case *cwl.Job_document:
94-
104+
fmt.Printf("found Job_document\n")
95105
job_doc_ptr := native.(*cwl.Job_document)
96106

97107
job_doc := *job_doc_ptr
98108

99-
for key, value := range job_doc {
109+
for _, value := range job_doc {
100110

101-
fmt.Printf("recurse into key: %s\n", key)
102-
err = processInputData(value, inputfile_path)
111+
id := value.GetId()
112+
fmt.Printf("recurse into key: %s\n", id)
113+
var sub_count int
114+
sub_count, err = processInputData(value, inputfile_path)
103115
if err != nil {
104116
return
105117
}
106-
118+
count += sub_count
107119
}
108120

109121
return
@@ -123,7 +135,7 @@ func processInputData(native interface{}, inputfile_path string) (err error) {
123135
if err != nil {
124136
return
125137
}
126-
138+
count += 1
127139
return
128140
default:
129141
spew.Dump(native)
@@ -135,10 +147,19 @@ func processInputData(native interface{}, inputfile_path string) (err error) {
135147
}
136148

137149
func main() {
150+
err := main_wrapper()
151+
if err != nil {
152+
println(err.Error())
153+
os.Exit(1)
154+
}
155+
os.Exit(0)
156+
}
157+
158+
func main_wrapper() (err error) {
138159

139160
conf.LOG_OUTPUT = "console"
140161

141-
err := conf.Init_conf("submitter")
162+
err = conf.Init_conf("submitter")
142163

143164
if err != nil {
144165
fmt.Fprintf(os.Stderr, "ERROR: error reading conf file: "+err.Error())
@@ -147,23 +168,24 @@ func main() {
147168

148169
logger.Initialize("client")
149170

150-
if conf.CWL_JOB == "" {
151-
logger.Error("cwl job file missing")
152-
time.Sleep(time.Second)
153-
os.Exit(1)
171+
for _, value := range conf.ARGS {
172+
println(value)
154173
}
155174

156-
inputfile_path := path.Dir(conf.CWL_JOB)
175+
job_file := conf.ARGS[0]
176+
workflow_file := conf.ARGS[1]
177+
178+
inputfile_path := path.Dir(job_file)
157179
fmt.Printf("job path: %s\n", inputfile_path) // needed to resolve relative paths
158180

159-
job_doc, err := cwl.ParseJob(conf.CWL_JOB)
181+
job_doc, err := cwl.ParseJobFile(job_file)
160182
if err != nil {
161183
logger.Error("error parsing cwl job: %v", err)
162184
time.Sleep(time.Second)
163185
os.Exit(1)
164186
}
165187

166-
fmt.Println("Job input fter reading from file:")
188+
fmt.Println("Job input after reading from file:")
167189
spew.Dump(*job_doc)
168190

169191
data, err := yaml.Marshal(*job_doc)
@@ -172,15 +194,23 @@ func main() {
172194
os.Exit(1)
173195
}
174196

175-
fmt.Printf("yaml:\n%s\n", string(data[:]))
197+
job_doc_string := string(data[:])
198+
fmt.Printf("job_doc_string: \"%s\"\n", job_doc_string)
199+
if job_doc_string == "" {
200+
fmt.Println("job_doc_string is empty")
201+
os.Exit(1)
202+
}
203+
204+
fmt.Printf("yaml:\n%s\n", job_doc_string)
176205

177206
// process input files
178207

179-
err = processInputData(job_doc, inputfile_path)
208+
upload_count, err := processInputData(job_doc, inputfile_path)
180209
if err != nil {
181210
fmt.Printf("error: %s", err.Error())
182211
os.Exit(1)
183212
}
213+
fmt.Printf("%d files have been uploaded\n", upload_count)
184214
time.Sleep(2)
185215

186216
fmt.Println("------------Job input after parsing:")
@@ -192,4 +222,111 @@ func main() {
192222

193223
fmt.Printf("yaml:\n%s\n", string(data[:]))
194224

225+
// job submission example:
226+
// curl -X POST -F [email protected] -F cwl=@/Users/wolfganggerlach/awe_data/pipeline/CWL/PackedWorkflow/preprocess-fasta.workflow.cwl http://localhost:8001/job
227+
228+
//var b bytes.Buffer
229+
//w := multipart.NewWriter(&b)
230+
err = SubmitCWLJobToAWE(workflow_file, job_file, &data)
231+
if err != nil {
232+
return
233+
}
234+
235+
return
236+
}
237+
238+
func SubmitCWLJobToAWE(workflow_file string, job_file string, data *[]byte) (err error) {
239+
multipart := NewMultipartWriter()
240+
err = multipart.AddFile("cwl", workflow_file)
241+
if err != nil {
242+
return
243+
}
244+
err = multipart.AddDataAsFile("job", job_file, data)
245+
if err != nil {
246+
return
247+
}
248+
response, err := multipart.Send("POST", conf.SERVER_URL+"/job")
249+
if err != nil {
250+
return
251+
}
252+
responseData, err := ioutil.ReadAll(response.Body)
253+
if err != nil {
254+
return
255+
}
256+
responseString := string(responseData)
257+
258+
fmt.Println(responseString)
259+
return
260+
261+
}
262+
263+
type MultipartWriter struct {
264+
b bytes.Buffer
265+
w *multipart.Writer
266+
}
267+
268+
func NewMultipartWriter() *MultipartWriter {
269+
m := &MultipartWriter{}
270+
m.w = multipart.NewWriter(&m.b)
271+
return m
272+
}
273+
274+
func (m *MultipartWriter) Send(method string, url string) (response *http.Response, err error) {
275+
m.w.Close()
276+
fmt.Println("------------")
277+
spew.Dump(m.w)
278+
fmt.Println("------------")
279+
280+
req, err := http.NewRequest(method, url, &m.b)
281+
if err != nil {
282+
return
283+
}
284+
// Don't forget to set the content type, this will contain the boundary.
285+
req.Header.Set("Content-Type", m.w.FormDataContentType())
286+
287+
// Submit the request
288+
client := &http.Client{}
289+
fmt.Printf("%s %s\n\n", method, url)
290+
response, err = client.Do(req)
291+
if err != nil {
292+
return
293+
}
294+
295+
// Check the response
296+
//if response.StatusCode != http.StatusOK {
297+
// err = fmt.Errorf("bad status: %s", response.Status)
298+
//}
299+
return
300+
301+
}
302+
303+
func (m *MultipartWriter) AddDataAsFile(fieldname string, filepath string, data *[]byte) (err error) {
304+
305+
fw, err := m.w.CreateFormFile(fieldname, filepath)
306+
if err != nil {
307+
return
308+
}
309+
_, err = fw.Write(*data)
310+
if err != nil {
311+
return
312+
}
313+
return
314+
}
315+
316+
func (m *MultipartWriter) AddFile(fieldname string, filepath string) (err error) {
317+
318+
f, err := os.Open(filepath)
319+
if err != nil {
320+
return
321+
}
322+
defer f.Close()
323+
fw, err := m.w.CreateFormFile(fieldname, filepath)
324+
if err != nil {
325+
return
326+
}
327+
if _, err = io.Copy(fw, f); err != nil {
328+
return
329+
}
330+
331+
return
195332
}

awe-worker/awe-worker.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ func main() {
7878
os.Exit(1)
7979
}
8080

81-
var self *core.Client
81+
core.SetClientProfile(profile)
82+
self := core.Self
83+
//var self *core.Client
8284
if worker.Client_mode == "online" {
8385
if conf.SERVER_URL == "" {
8486
fmt.Fprintf(os.Stderr, "AWE server url not configured or is empty. Please check the [Client]serverurl field in the configuration file.\n")
@@ -89,20 +91,17 @@ func main() {
8991
os.Exit(1)
9092
}
9193

92-
self, err = worker.RegisterWithAuth(conf.SERVER_URL, profile)
94+
err = worker.RegisterWithAuth(conf.SERVER_URL, profile)
9395
if err != nil {
9496
fmt.Fprintf(os.Stderr, "fail to register: %s\n", err.Error())
9597
logger.Error("fail to register: %s\n", err.Error())
9698
os.Exit(1)
9799
}
98100

99-
} else {
100-
self = core.NewClient()
101101
}
102102

103-
core.SetClientProfile(self)
104103
if worker.Client_mode == "online" {
105-
fmt.Printf("Client registered, name=%s, id=%s\n", self.Name, self.Id)
104+
fmt.Printf("Client registered, name=%s, id=%s\n", self.WorkerRuntime.Name, self.Id)
106105
logger.Event(event.CLIENT_REGISTRATION, "clientid="+self.Id)
107106
}
108107

@@ -114,7 +113,7 @@ func main() {
114113
time.Sleep(time.Second)
115114
os.Exit(1)
116115
}
117-
job_doc, err := cwl.ParseJob(conf.CWL_JOB)
116+
job_doc, err := cwl.ParseJobFile(conf.CWL_JOB)
118117
if err != nil {
119118
logger.Error("error parsing cwl job: %v", err)
120119
time.Sleep(time.Second)
@@ -126,13 +125,13 @@ func main() {
126125

127126
os.Getwd() //https://golang.org/pkg/os/#Getwd
128127

129-
workunit := &core.Workunit{Id: "00000000-0000-0000-0000-000000000000_0_0", CWL: core.NewCWL_workunit()}
128+
workunit := &core.Workunit{Id: "00000000-0000-0000-0000-000000000000_0_0", CWL_workunit: core.NewCWL_workunit()}
130129

131-
workunit.CWL.Job_input = job_doc
132-
workunit.CWL.Job_input_filename = conf.CWL_JOB
130+
workunit.CWL_workunit.Job_input = job_doc
131+
workunit.CWL_workunit.Job_input_filename = conf.CWL_JOB
133132

134-
workunit.CWL.CWL_tool_filename = conf.CWL_TOOL
135-
workunit.CWL.CWL_tool = &cwl.CommandLineTool{} // TODO parsing and testing ?
133+
workunit.CWL_workunit.CWL_tool_filename = conf.CWL_TOOL
134+
workunit.CWL_workunit.CWL_tool = &cwl.CommandLineTool{} // TODO parsing and testing ?
136135

137136
current_working_directory, err := os.Getwd()
138137
if err != nil {
@@ -142,13 +141,15 @@ func main() {
142141
}
143142
workunit.WorkPath = current_working_directory
144143

145-
workunit.Cmd = &core.Command{}
146-
workunit.Cmd.Local = true // this makes sure the working directory is not deleted
147-
workunit.Cmd.Name = "/usr/bin/cwl-runner"
144+
cmd := &core.Command{}
145+
cmd.Local = true // this makes sure the working directory is not deleted
146+
cmd.Name = "/usr/bin/cwl-runner"
148147

149-
workunit.Cmd.ArgsArray = []string{"--leave-outputs", "--leave-tmpdir", "--tmp-outdir-prefix", "./tmp/", "--tmpdir-prefix", "./tmp/", "--disable-pull", "--rm-container", "--on-error", "stop", workunit.CWL.CWL_tool_filename, workunit.CWL.Job_input_filename}
148+
cmd.ArgsArray = []string{"--leave-outputs", "--leave-tmpdir", "--tmp-outdir-prefix", "./tmp/", "--tmpdir-prefix", "./tmp/", "--disable-pull", "--rm-container", "--on-error", "stop", workunit.CWL_workunit.CWL_tool_filename, workunit.CWL_workunit.Job_input_filename}
150149

151-
workunit.WorkPerf = core.NewWorkPerf(workunit.Id)
150+
workunit.Cmd = cmd
151+
152+
workunit.WorkPerf = core.NewWorkPerf()
152153
workunit.WorkPerf.Checkout = time.Now().Unix()
153154

154155
logger.Debug(1, "injecting cwl job into worker...")

0 commit comments

Comments
 (0)