-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathoffset_commit_response.go
86 lines (72 loc) · 1.93 KB
/
offset_commit_response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package healer
import (
"encoding/binary"
"fmt"
)
/*
OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
TopicName => string
Partition => int32
ErrorCode => int16
*/
type OffsetCommitResponsePartition struct {
PartitionID uint32
ErrorCode int16
}
type OffsetCommitResponseTopic struct {
Topic string
Partitions []*OffsetCommitResponsePartition
}
type OffsetCommitResponse struct {
CorrelationID uint32
Topics []*OffsetCommitResponseTopic
}
func (r OffsetCommitResponse) Error() error {
for _, topic := range r.Topics {
for _, partition := range topic.Partitions {
if partition.ErrorCode != 0 {
return KafkaError(partition.ErrorCode)
}
}
}
return nil
}
func NewOffsetCommitResponse(payload []byte) (r OffsetCommitResponse, err error) {
var (
offset int = 0
l int = 0
)
responseLength := int(binary.BigEndian.Uint32(payload))
if responseLength+4 != len(payload) {
return r, fmt.Errorf("offsetcommit response length did not match: %d!=%d", responseLength+4, len(payload))
}
offset += 4
r.CorrelationID = uint32(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
l = int(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
r.Topics = make([]*OffsetCommitResponseTopic, l)
for i := range r.Topics {
topic := &OffsetCommitResponseTopic{}
r.Topics[i] = topic
l = int(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
topic.Topic = string(payload[offset : offset+l])
offset += l
l = int(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
topic.Partitions = make([]*OffsetCommitResponsePartition, l)
for j := range topic.Partitions {
p := &OffsetCommitResponsePartition{}
topic.Partitions[j] = p
p.PartitionID = binary.BigEndian.Uint32(payload[offset:])
offset += 4
p.ErrorCode = int16(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
if err == nil && p.ErrorCode != 0 {
err = KafkaError(p.ErrorCode)
}
}
}
return r, err
}