forked from Jonathan-Rosenberg/delta-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
checkpoint_writer.go
94 lines (78 loc) · 2.26 KB
/
checkpoint_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package deltago
import (
"log"
"github.com/csimplestring/delta-go/action"
"github.com/csimplestring/delta-go/errno"
"github.com/csimplestring/delta-go/internal/util/filenames"
"github.com/rotisserie/eris"
)
type checkpointWriter struct {
schemaText string
pw parquetActionWriter
}
// The scala version passed the DeltaLog instance to infer if we need to use 'rename'
// but here I decide to pass some Config struct in future to instantiate writer.
func (c *checkpointWriter) write(snapshot *snapshotImp) (*CheckpointMetaDataJSON, error) {
checkpointSize := int64(0)
numOfFiles := int64(0)
path := filenames.CheckpointFileSingular(snapshot.path, snapshot.version)
// exclude CommitInfo and CDC
actions, err := c.extractActions(snapshot)
if err != nil {
return nil, err
}
if err := c.pw.Open(path, c.schemaText); err != nil {
return nil, err
}
for _, action := range actions {
if err := c.pw.Write(action); err != nil {
return nil, err
}
checkpointSize++
if action.Add != nil {
numOfFiles++
}
}
if err := c.pw.Close(); err != nil {
return nil, err
}
if n, err := snapshot.numOfFiles(); err == nil {
if n != numOfFiles {
return nil, errno.IllegalStateError("State of the checkpoint doesn't match that of the snapshot.")
}
}
if checkpointSize == 0 {
log.Println("Attempted to write an empty checkpoint without any actions. ")
}
return &CheckpointMetaDataJSON{Version: snapshot.version, Size: checkpointSize}, nil
}
func (c *checkpointWriter) extractActions(snapshot *snapshotImp) ([]*action.SingleAction, error) {
var actions []*action.SingleAction
// protocol and metadata
protocolAndMetadata, err := snapshot.protocolAndMetadata.Get()
if err != nil {
return nil, eris.Wrap(err, "")
}
actions = append(actions, protocolAndMetadata.V2.Wrap(), protocolAndMetadata.V1.Wrap())
// transaction
for _, trx := range snapshot.setTransactions() {
actions = append(actions, trx.Wrap())
}
// addFile
addFiles, err := snapshot.AllFiles()
if err != nil {
return nil, err
}
for _, f := range addFiles {
actions = append(actions, f.Wrap())
}
// removeFile
removeFiles, err := snapshot.tombstones()
if err != nil {
return nil, err
}
for _, f := range removeFiles {
actions = append(actions, f.Wrap())
}
return actions, nil
}