-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconsume_command.go
90 lines (79 loc) · 2.29 KB
/
consume_command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main
import (
"fmt"
"io/fs"
"os"
"path/filepath"
"time"
"github.com/ccremer/paperless-cli/pkg/consumer"
"github.com/ccremer/paperless-cli/pkg/paperless"
"github.com/go-logr/logr"
"github.com/urfave/cli/v2"
)
type ConsumeCommand struct {
cli.Command
PaperlessURL string
PaperlessToken string
PaperlessUser string
ConsumeDirName string
ConsumeDelay time.Duration
}
func newConsumeCommand() *ConsumeCommand {
c := &ConsumeCommand{}
c.Command = cli.Command{
Name: "consume",
Usage: "Consumes a local directory and uploads each file to Paperless instance. The files will be deleted once uploaded.",
Before: loadConfigFileFn,
Action: actions(LogMetadata, c.Action),
Flags: []cli.Flag{
newURLFlag(&c.PaperlessURL),
newUsernameFlag(&c.PaperlessUser),
newTokenFlag(&c.PaperlessToken),
newConsumeDirFlag(&c.ConsumeDirName),
newConsumeDelayFlag(&c.ConsumeDelay),
},
}
return c
}
func (c *ConsumeCommand) Action(ctx *cli.Context) error {
log := logr.FromContextOrDiscard(ctx.Context)
log.Info("Start consuming directory", "dir", c.ConsumeDirName)
clt := paperless.NewClient(c.PaperlessURL, c.PaperlessUser, c.PaperlessToken)
q := consumer.NewQueue[string]()
q.Subscribe(ctx.Context, func(fileName string) {
log.V(1).Info("Uploading file...", "file", fileName)
err := clt.Upload(ctx.Context, fileName, paperless.UploadParams{})
if err != nil {
log.Error(err, "Could not upload file")
return
}
if deleteErr := os.Remove(fileName); deleteErr != nil {
log.Error(err, "Could not delete file, this might be re-uploaded later again", "file", fileName)
}
log.Info("File uploaded", "file", fileName)
})
walkErr := filepath.WalkDir(c.ConsumeDirName, func(path string, entry fs.DirEntry, err error) error {
if path == c.ConsumeDirName {
return nil // same directory, not interesting
}
if entry.IsDir() {
return fs.SkipDir
}
if err != nil {
return fs.SkipDir
}
q.Put(path)
return nil
})
if walkErr != nil {
return fmt.Errorf("cannot walk consumption dir: %w", walkErr)
}
watchErr := consumer.StartWatchingDir(ctx.Context, c.ConsumeDirName, c.ConsumeDelay, func(filePath string) {
q.Put(filePath)
})
if watchErr != nil {
return fmt.Errorf("cannot watch consumption dir: %w", watchErr)
}
<-make(chan struct{})
return nil
}