-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob_runner.go
54 lines (46 loc) · 1.5 KB
/
job_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main
import (
"context"
"database/sql"
"github.com/chararch/gobatch"
"github.com/chararch/gobatch/util"
_ "github.com/go-sql-driver/mysql"
"log"
"time"
)
func openDB() *sql.DB {
var sqlDb *sql.DB
var err error
sqlDb, err = sql.Open("mysql", "root:root123@tcp(127.0.0.1:3306)/example?charset=utf8&parseTime=true")
if err != nil {
log.Fatal(err)
}
return sqlDb
}
func removeJobData() {
sqlDb := openDB()
_, err := sqlDb.Exec("DELETE FROM t_trade")
if err != nil {
log.Fatal(err)
}
_, err = sqlDb.Exec("DELETE FROM t_repay_plan")
if err != nil {
log.Fatal(err)
}
}
func buildAndRunJob() {
sqlDb := openDB()
gobatch.SetDB(sqlDb)
gobatch.SetTransactionManager(gobatch.NewTransactionManager(sqlDb))
step1 := gobatch.NewStep("import_trade").ReadFile(tradeFile).Writer(&tradeImporter{sqlDb}).Partitions(10).Build()
step2 := gobatch.NewStep("gen_repay_plan").Reader(&tradeReader{sqlDb}).Handler(&repayPlanHandler{sqlDb}).Partitions(10).Build()
step3 := gobatch.NewStep("stats").Reader(&statsHandler{sqlDb}).Processor(&statsHandler{sqlDb}).WriteFile(statsFileExport).Partitions(10).Build()
step4 := gobatch.NewStep("upload_file_to_ftp").CopyFile(copyFileToFtp, copyChecksumFileToFtp).Build()
job := gobatch.NewJob("accounting_job").Step(step1, step2, step3, step4).Build()
gobatch.Register(job)
params, _ := util.JsonString(map[string]interface{}{
"date": time.Now().Format("2006-01-02"),
"rand": time.Now().Nanosecond(),
})
gobatch.Start(context.Background(), job.Name(), params)
}