This repository has been archived by the owner on Jan 11, 2023. It is now read-only.
forked from saymedia/journald-cloudwatch-logs
-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
143 lines (114 loc) · 3.24 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"flag"
"fmt"
"log"
"os"
"github.com/coreos/go-systemd/sdjournal"
)
var help = flag.Bool("help", false, "set to true to show this help")
func main() {
flag.Parse()
if *help {
usage()
os.Exit(0)
}
configFilename := flag.Arg(0)
if configFilename == "" {
usage()
os.Exit(1)
}
err := run(configFilename)
if err != nil {
os.Stderr.WriteString(err.Error())
os.Stderr.Write([]byte{'\n'})
os.Exit(2)
}
}
func usage() {
os.Stderr.WriteString("Usage: journald-cloudwatch-logs <config-file>\n\n")
flag.PrintDefaults()
os.Stderr.WriteString("\n")
}
func run(configFilename string) error {
config, err := LoadConfig(configFilename)
if err != nil {
return fmt.Errorf("error reading config: %s", err)
}
var journal *sdjournal.Journal
if config.JournalDir == "" {
journal, err = sdjournal.NewJournal()
} else {
log.Printf("using journal dir: %s", config.JournalDir)
journal, err = sdjournal.NewJournalFromDir(config.JournalDir)
}
if err != nil {
return fmt.Errorf("error opening journal: %s", err)
}
defer journal.Close()
AddLogFilters(journal, config)
state, err := OpenState(config.StateFilename)
if err != nil {
return fmt.Errorf("Failed to open %s: %s", config.StateFilename, err)
}
lastBootId, nextSeq := state.LastState()
awsSession := config.NewAWSSession()
writer, err := NewWriter(
awsSession,
config.LogGroupName,
config.LogStreamName,
nextSeq,
)
if err != nil {
return fmt.Errorf("error initializing writer: %s", err)
}
seeked, err := journal.Next()
if seeked == 0 || err != nil {
return fmt.Errorf("unable to seek to first item in journal")
}
bootId, err := journal.GetData("_BOOT_ID")
bootId = bootId[9:] // Trim off "_BOOT_ID=" prefix
// If the boot id has changed since our last run then we'll start from
// the beginning of the stream, but if we're starting up with the same
// boot id then we'll seek to the end of the stream to avoid repeating
// anything. However, we will miss any items that were added while we
// weren't running.
skip := uint64(0)
if bootId == lastBootId {
// If we're still in the same "boot" as we were last time then
// we were stopped and started again, so we'll seek to the last
// item in the log as an approximation of resuming streaming,
// though we will miss any logs that were added while we were
// running.
journal.SeekTail()
// Skip the last item so our log will resume only when we get
// the *next item.
skip = 1
}
err = state.SetState(bootId, nextSeq)
if err != nil {
return fmt.Errorf("Failed to write state: %s", err)
}
bufSize := config.BufferSize
records := make(chan Record)
batches := make(chan []Record)
go ReadRecords(config.EC2InstanceId, journal, records, skip)
go BatchRecords(records, batches, bufSize)
for batch := range batches {
nextSeq, err = writer.WriteBatch(batch)
if err != nil {
return fmt.Errorf("Failed to write to cloudwatch: %s", err)
}
err = state.SetState(bootId, nextSeq)
if err != nil {
return fmt.Errorf("Failed to write state: %s", err)
}
}
// We fall out here when interrupted by a signal.
// Last chance to write the state.
err = state.SetState(bootId, nextSeq)
if err != nil {
return fmt.Errorf("Failed to write state on exit: %s", err)
}
return nil
}