forked from alephzero/go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
161 lines (134 loc) · 3.82 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package alephzero
/*
#cgo pkg-config: alephzero
#include "pubsub_adapter.h"
#include <stdlib.h> // free
*/
import "C"
import (
"unsafe"
)
type PubSubTopic struct {
Name string
FileOptions *FileOptions
}
func (t *PubSubTopic) c() (cTopic C.a0_pubsub_topic_t) {
cTopic.name = C.CString(t.Name)
if t.FileOptions != nil {
localOpts := t.FileOptions.toC()
cTopic.file_opts = &localOpts
}
return
}
func freeCPubSubTopic(cTopic C.a0_pubsub_topic_t) {
C.free(unsafe.Pointer(cTopic.name))
}
type Publisher struct {
c C.a0_publisher_t
}
func NewPublisher(topic PubSubTopic) (p *Publisher, err error) {
p = &Publisher{}
cTopic := topic.c()
defer freeCPubSubTopic(cTopic)
err = errorFrom(C.a0_publisher_init(&p.c, cTopic))
return
}
func (p *Publisher) Close() error {
return errorFrom(C.a0_publisher_close(&p.c))
}
func (p *Publisher) Pub(pkt Packet) error {
cPkt := pkt.c()
defer freeCPacket(cPkt)
return errorFrom(C.a0_publisher_pub(&p.c, cPkt))
}
type SubscriberSync struct {
c C.a0_subscriber_sync_t
allocId uintptr
// Memory must survive between the alloc and Next.
activePktSpace []byte
}
func NewSubscriberSync(topic PubSubTopic, init ReaderInit, iter ReaderIter) (ss *SubscriberSync, err error) {
ss = &SubscriberSync{}
cTopic := topic.c()
defer freeCPubSubTopic(cTopic)
ss.allocId = registry.Register(func(size C.size_t, out *C.a0_buf_t) C.a0_err_t {
ss.activePktSpace = make([]byte, int(size))
out.size = size
if size > 0 {
out.data = (*C.uint8_t)(&ss.activePktSpace[0])
}
return A0_OK
})
err = errorFrom(C.a0go_subscriber_sync_init(&ss.c, cTopic, C.uintptr_t(ss.allocId), C.a0_reader_init_t(init), C.a0_reader_iter_t(iter)))
return
}
func (ss *SubscriberSync) Close() (err error) {
err = errorFrom(C.a0_subscriber_sync_close(&ss.c))
if ss.allocId > 0 {
registry.Unregister(ss.allocId)
}
return
}
func (ss *SubscriberSync) HasNext() (hasNext bool, err error) {
err = errorFrom(C.a0_subscriber_sync_has_next(&ss.c, (*C.bool)(&hasNext)))
return
}
func (ss *SubscriberSync) Next() (pkt Packet, err error) {
var cPkt C.a0_packet_t
err = errorFrom(C.a0_subscriber_sync_next(&ss.c, &cPkt))
if err == nil {
pkt = packetFromC(cPkt)
}
return
}
type Subscriber struct {
c C.a0_subscriber_t
allocId uintptr
packetCallbackId uintptr
}
func NewSubscriber(topic PubSubTopic, init ReaderInit, iter ReaderIter, callback func(Packet)) (s *Subscriber, err error) {
s = &Subscriber{}
cTopic := topic.c()
defer freeCPubSubTopic(cTopic)
var activePktSpace []byte
s.allocId = registry.Register(func(size C.size_t, out *C.a0_buf_t) C.a0_err_t {
activePktSpace = make([]byte, int(size))
out.size = size
if size > 0 {
out.data = (*C.uint8_t)(&activePktSpace[0])
}
return A0_OK
})
s.packetCallbackId = registry.Register(func(cPkt C.a0_packet_t) {
callback(packetFromC(cPkt))
})
err = errorFrom(C.a0go_subscriber_init(&s.c, cTopic, C.uintptr_t(s.allocId), C.a0_reader_init_t(init), C.a0_reader_iter_t(iter), C.uintptr_t(s.packetCallbackId)))
return
}
func (s *Subscriber) Close() (err error) {
err = errorFrom(C.a0_subscriber_close(&s.c))
registry.Unregister(s.packetCallbackId)
if s.allocId > 0 {
registry.Unregister(s.allocId)
}
return
}
func SubscriberReadOne(topic PubSubTopic, init ReaderInit, flags int) (pkt Packet, err error) {
cTopic := topic.c()
defer freeCPubSubTopic(cTopic)
var pktSpace []byte
allocId := registry.Register(func(size C.size_t, out *C.a0_buf_t) C.a0_err_t {
pktSpace = make([]byte, int(size))
out.size = size
if size > 0 {
out.data = (*C.uint8_t)(&pktSpace[0])
}
return A0_OK
})
defer registry.Unregister(allocId)
cPkt := C.a0_packet_t{}
err = errorFrom(C.a0go_subscriber_read_one(cTopic, C.uintptr_t(allocId), C.a0_reader_init_t(init), C.int(flags), &cPkt))
pkt = packetFromC(cPkt)
copy(pkt.Payload, pkt.Payload)
return
}