Skip to content

Commit

Permalink
Merge pull request #153 from liftbridge-io/update_api
Browse files Browse the repository at this point in the history
Update api
  • Loading branch information
tylertreat authored Feb 22, 2020
2 parents 0fa66f5 + c1c3a27 commit 37038c2
Show file tree
Hide file tree
Showing 27 changed files with 960 additions and 529 deletions.
5 changes: 3 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ jobs:
- run:
name: Test
command: |
go test -v .
go test -v -coverprofile=coverage.txt ./server/...
go test -v . # Run tests in root but exclude from coverage
go test -v -coverprofile=coverage.txt.tmp ./server/...
cat coverage.txt.tmp | grep -v ".pb.go" > coverage.txt # Exclude protobuf from coverage
goveralls -coverprofile=coverage.txt -service=circle-ci -repotoken=$COVERALLS_REPO_TOKEN
build-and-push-standalone-dev-image:
Expand Down
370 changes: 244 additions & 126 deletions documentation/client_implementation.md

Large diffs are not rendered by default.

67 changes: 67 additions & 0 deletions documentation/envelope_protocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
id: envelope-protocol
title: Envelope Protocol
---

Liftbridge works by wrapping NATS with higher-level stream APIs. It is also
possible for a client to publish messages to Liftbridge via NATS directly.
Liftbridge accepts plain NATS messages, allowing it to make existing subjects
durable without any publisher changes. However, these messages will not have
features such as acks.

In order to opt into Liftbridge-specific features, the message must be prefixed
with an envelope header and be encoded as a `Message` (defined in
[api.proto](https://github.com/liftbridge-io/liftbridge-api/blob/master/api.proto)).
Stream acks sent over NATS use the same envelope protocol. This envelope
protocol is described below. Refer
[here](https://github.com/liftbridge-io/liftbridge-api) for more information on
the Liftbridge API.

## Liftbridge Envelope Header

```plaintext
0 8 16 24 32
├───────────────┴───────────────┴───────────────┴───────────────┤
│ Magic Number │
├───────────────┬───────────────┬───────────────┬───────────────┤
│ Version │ HeaderLen │ Flags │ Reserved │
├───────────────┴───────────────┴───────────────┴───────────────┤
│ CRC-32C (optional) │
└───────────────────────────────────────────────────────────────┘
```

### Magic Number [4 bytes]

The Liftbridge magic number is `B9 0E 43 B4`. This was chosen by random but
deliberately restricted to invalid UTF-8 to reduce the chance of a collision.
This was also verified to not match known file signatures.

### Version [1 byte]

The version byte allows for future protocol upgrades. This should only be
bumped if the envelope format changes or if the message encoding changes in a
non-backwards-compatible way. Adding fields to the messages should not require
a version bump.

### HeaderLen [1 byte]

The header length is the offset of the payload. This is included primarily for
safety.

### Flags [1 byte]

The flag bits are defined as follows:

| Bit | Description |
| --- | --------------- |
| 0 | CRC-32C enabled |

### Reserved [1 byte]

Reserved for future use.

### CRC-32C [4 bytes, optional]

The CRC-32C (Castagnoli) is the checksum of the payload (i.e. from HeaderLen to
the end). This is optional but should significantly reduce the chance that a
random NATS message is interpreted as a Liftbridge message.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ go 1.13
require (
github.com/Workiva/go-datastructures v1.0.50
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.2
github.com/golang/protobuf v1.3.3
github.com/hako/durafmt v0.0.0-20191009132224-3f39dc1ed9f4
github.com/hashicorp/raft v1.1.1
github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617
github.com/liftbridge-io/go-liftbridge v0.0.0-20191106180712-4f7e4b9d8611
github.com/liftbridge-io/liftbridge-api v0.0.0-20190910222614-5694b15f251d
github.com/liftbridge-io/go-liftbridge v1.0.0-alpha
github.com/liftbridge-io/liftbridge-api v1.0.0-alpha
github.com/liftbridge-io/nats-on-a-log v0.0.0-20190703144237-760cefbfc85e
github.com/natefinch/atomic v0.0.0-20150920032501-a62ce929ffcc
github.com/nats-io/nats-server/v2 v2.1.0
Expand All @@ -21,5 +21,5 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.4.0
github.com/urfave/cli v1.22.1
google.golang.org/grpc v1.25.0
google.golang.org/grpc v1.27.1
)
37 changes: 37 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
Expand All @@ -50,6 +51,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -112,13 +115,24 @@ github.com/liftbridge-io/go-liftbridge v0.0.0-20190831233708-6fbf530bb220/go.mod
github.com/liftbridge-io/go-liftbridge v0.0.0-20191106171334-84163faf9fdd/go.mod h1:Q1PGCu9GUniz/0iHucxMwg2tiHwqhg8JrmIFOaPD88s=
github.com/liftbridge-io/go-liftbridge v0.0.0-20191106180712-4f7e4b9d8611 h1:VB8/4FRlmEaZ1toJC854gS+klexnEpFVghHAUN227Tk=
github.com/liftbridge-io/go-liftbridge v0.0.0-20191106180712-4f7e4b9d8611/go.mod h1:E+5gzmn2VHTfwQ21r0bujaQrUe5o1UtHzCAr+Xpo47A=
github.com/liftbridge-io/go-liftbridge v0.0.1-alpha.0.20200120163431-bfb99c041e6f h1:p8tufXYhYE0KLL5/ePtGCr05bUWxnfzb7Jl2bclGc4M=
github.com/liftbridge-io/go-liftbridge v0.0.1-alpha.0.20200120163431-bfb99c041e6f/go.mod h1:UO2x6evHiWpw37/jEvVbKr4dzg17JHJB8f3dzBtMhow=
github.com/liftbridge-io/go-liftbridge v1.0.0-alpha h1:Mgzb8ACTNDXAfSMGxbsU7Mg+maAZIDUgB1NE5rtGdPc=
github.com/liftbridge-io/go-liftbridge v1.0.0-alpha/go.mod h1:2H2qD8RQzUwR6yOSM77OLsMAwn9XDNIlkqHWtJP6jnk=
github.com/liftbridge-io/liftbridge v0.0.0-20190628061900-5f565727d49f/go.mod h1:DgUnOkzaVLgIOQ3W7ztMm0+xQmDaEM+tcgGCAdlZj8s=
github.com/liftbridge-io/liftbridge v0.0.0-20190704001405-928a9d17c609/go.mod h1:d9iQ/6VLp7paFMnNukU2nV1PnNjkz2vpNQzYS4zIy8Y=
github.com/liftbridge-io/liftbridge v0.0.0-20190831212313-ad1b5f9c2b17/go.mod h1:hnwhdV70dIFgIKvzBJt9rOfce19kE2a3ZGaRgSQmNOs=
github.com/liftbridge-io/liftbridge v0.0.0-20191101170542-6684dd2aa80f/go.mod h1:l/Y7oCEKS+evzr19nCjeatDJU7SHwtjh6uIYGULUJfE=
github.com/liftbridge-io/liftbridge v0.0.0-20191106173932-5d89c897d5b0/go.mod h1:Ysh4YXyf7Fvdeid4QJNmyYqDRWwwMAXNmhV3IPb1l3M=
github.com/liftbridge-io/liftbridge v0.0.0-20200118200014-2e716da78b8a/go.mod h1:j3eqt63LZefajwMwcLeYtS/BEFHqPXA7ogvjJCsB1h8=
github.com/liftbridge-io/liftbridge-api v0.0.0-20190910222614-5694b15f251d h1:yuAMJgvh7C/QPVAui2i+MF8X3wUzRPgdh/t0P9Gx/p4=
github.com/liftbridge-io/liftbridge-api v0.0.0-20190910222614-5694b15f251d/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v0.0.0-20200118013732-a474da01bc36 h1:O07A30xXqJdx6DKI+OQYB9OILob2nRfTde0Ok2VAzjs=
github.com/liftbridge-io/liftbridge-api v0.0.0-20200118013732-a474da01bc36/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v0.0.0-20200118015119-3db283f59b10 h1:k10Bg5EQTc/B5dTSU8cPW4Y0OkBlnMFuFGZwRmrPvJ4=
github.com/liftbridge-io/liftbridge-api v0.0.0-20200118015119-3db283f59b10/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.0.0-alpha h1:NwtHTgIdYNQXkLdmLHm9rguNvbtT4OHAs6FSSrHTq1Q=
github.com/liftbridge-io/liftbridge-api v1.0.0-alpha/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-grpc v0.0.0-20190829220806-66e3ee4b7943/go.mod h1:ObGO38WdO4ldLsa2oUFcultUk0rggc+yZWcBb7qjnDI=
github.com/liftbridge-io/liftbridge-grpc v0.0.0-20190910222614-5694b15f251d/go.mod h1:ObGO38WdO4ldLsa2oUFcultUk0rggc+yZWcBb7qjnDI=
github.com/liftbridge-io/nats-on-a-log v0.0.0-20180718011723-80d0727461af/go.mod h1:4tC6R+N3facyfCwDuuuLkFF/25ceiZEwoQUzIez2dVo=
Expand All @@ -145,6 +159,8 @@ github.com/nats-io/jwt v0.2.6/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrW
github.com/nats-io/jwt v0.2.8/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY=
github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA=
github.com/nats-io/nats-server v1.4.1/go.mod h1:c8f/fHd2B6Hgms3LtCaI7y6pC4WD1f4SUxcCud5vhBc=
github.com/nats-io/nats-server/v2 v2.0.0/go.mod h1:RyVdsHHvY4B6c9pWG+uRLpZ0h0XsqiuKp2XCTurP5LI=
Expand All @@ -157,6 +173,8 @@ github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.2 h1:kWUa9fUdAOW426qaAD1V4+suF4On2eTr2n/FE/XSGpg=
github.com/nats-io/nkeys v0.1.2/go.mod h1:oyD2oRRVULOkPJxjJ8mX1QY0ev0FNiCi7EoKNN09TYU=
github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
Expand Down Expand Up @@ -210,6 +228,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191105034135-c7e5f84aec59 h1:PyXRxSVbvzDGuqYXjHndV7xDzJ7w2K8KD9Ef8GB7KOE=
golang.org/x/crypto v0.0.0-20191105034135-c7e5f84aec59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200214034016-1d94cc7ab1c6 h1:Sy5bstxEqwwbYs6n0/pBuxKENqOeZUgD45Gp3Q3pqLg=
golang.org/x/crypto v0.0.0-20200214034016-1d94cc7ab1c6/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -242,6 +262,10 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191105084925-a882066a44e0 h1:QPlSTtPE2k6PZPasQUbzuK3p9JbS+vMXYVto8g/yrsg=
golang.org/x/net v0.0.0-20191105084925-a882066a44e0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200219183655-46282727080f h1:dB42wwhNuwPvh8f+5zZWNcU+F2Xs/B9wXXwvUCOH7r8=
golang.org/x/net v0.0.0-20200219183655-46282727080f/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -270,6 +294,10 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd h1:3x5uuvBgE6oaXJjCOvpCC1IpgJogqQ+PqGGU3ZxAgII=
golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200117145432-59e60aa80a0c h1:gUYreENmqtjZb2brVfUas1sC6UivSY8XwKwPo8tloLs=
golang.org/x/sys v0.0.0-20200117145432-59e60aa80a0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200219091948-cb0a6d8edb6c h1:jceGD5YNJGgGMkJz79agzOln1K9TaZUjv5ird16qniQ=
golang.org/x/sys v0.0.0-20200219091948-cb0a6d8edb6c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
Expand Down Expand Up @@ -306,6 +334,10 @@ google.golang.org/genproto v0.0.0-20190701230453-710ae3a149df/go.mod h1:z3L6/3dT
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6 h1:UXl+Zk3jqqcbEVV7ace5lrt4YdA4tXiz3f/KbmD29Vo=
google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/genproto v0.0.0-20200117163144-32f20d992d24 h1:wDju+RU97qa0FZT0QnZDg9Uc2dH0Ql513kFvHocz+WM=
google.golang.org/genproto v0.0.0-20200117163144-32f20d992d24/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/genproto v0.0.0-20200218151345-dad8c97a84f5 h1:jB9+PJSvu5tBfmJHy/OVapFdjDF3WvpkqRhxqrmzoEU=
google.golang.org/genproto v0.0.0-20200218151345-dad8c97a84f5/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand All @@ -316,6 +348,11 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
google.golang.org/grpc v1.25.0 h1:ItERT+UbGdX+s4u+nQNlVM/Q7cbmf7icKfvzbWqVtq0=
google.golang.org/grpc v1.25.0/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
74 changes: 51 additions & 23 deletions server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,45 +143,71 @@ func (a *apiServer) FetchMetadata(ctx context.Context, req *client.FetchMetadata
// is returned.
func (a *apiServer) Publish(ctx context.Context, req *client.PublishRequest) (
*client.PublishResponse, error) {
if req.Message == nil {
a.logger.Errorf("api: Failed to publish message: message is nil")
return nil, status.Error(codes.InvalidArgument, "Message is nil")
subject, err := a.getPublishSubject(req)
if err != nil {
return nil, err
}
a.logger.Debugf("api: Publish [subject=%s]", subject)

if req.AckInbox == "" {
req.AckInbox = nuid.Next()
}
a.logger.Debugf("api: Publish [subject=%s]", req.Message.Subject)

if req.Message.AckInbox == "" {
req.Message.AckInbox = nuid.Next()
msg := &client.Message{
Key: req.Key,
Value: req.Value,
Stream: req.Stream,
Subject: subject,
ReplySubject: req.ReplySubject,
Headers: req.Headers,
AckInbox: req.AckInbox,
CorrelationId: req.CorrelationId,
AckPolicy: req.AckPolicy,
}

msg, err := req.Message.Marshal()
buf, err := proto.MarshalEnvelope(msg)
if err != nil {
a.logger.Errorf("api: Failed to publish message: %v", err.Error())
a.logger.Errorf("api: Failed to marshal message: %v", err.Error())
return nil, err
}

buf := make([]byte, envelopeCookieLen+len(msg))
copy(buf[0:], envelopeCookie)
copy(buf[envelopeCookieLen:], msg)

// If AckPolicy is NONE or a timeout isn't specified, then we will fire and
// forget.
var (
resp = new(client.PublishResponse)
_, hasDeadline = ctx.Deadline()
)
if req.Message.AckPolicy == client.AckPolicy_NONE || !hasDeadline {
if err := a.ncPublishes.Publish(req.Message.Subject, buf); err != nil {
if req.AckPolicy == client.AckPolicy_NONE || !hasDeadline {
if err := a.ncPublishes.Publish(subject, buf); err != nil {
a.logger.Errorf("api: Failed to publish message: %v", err)
return nil, err
}
return resp, nil
}

// Otherwise we need to publish and wait for the ack.
resp.Ack, err = a.publishSync(ctx, req.Message.Subject, req.Message.AckInbox, buf)
resp.Ack, err = a.publishSync(ctx, subject, req.AckInbox, buf)
return resp, err
}

func (a *apiServer) getPublishSubject(req *client.PublishRequest) (string, error) {
if req.Subject != "" {
return req.Subject, nil
}
if req.Stream == "" {
return "", status.Error(codes.InvalidArgument, "No stream or subject provided")
}
stream := a.metadata.GetStream(req.Stream)
if stream == nil {
return "", status.Error(codes.NotFound, fmt.Sprintf("No such stream: %s", req.Stream))
}
subject := stream.subject
if req.Partition > 0 {
subject = fmt.Sprintf("%s.%d", subject, req.Partition)
}
return subject, nil
}

func (a *apiServer) publishSync(ctx context.Context, subject,
ackInbox string, msg []byte) (*client.Ack, error) {

Expand Down Expand Up @@ -212,7 +238,7 @@ func (a *apiServer) publishSync(ctx context.Context, subject,
}

ack := new(client.Ack)
if err := ack.Unmarshal(ackMsg.Data); err != nil {
if err := proto.UnmarshalEnvelope(ackMsg.Data, ack); err != nil {
a.logger.Errorf("api: Invalid ack for publish: %v", err)
return nil, err
}
Expand Down Expand Up @@ -257,13 +283,15 @@ func (a *apiServer) subscribe(ctx context.Context, partition *partition,
headers := m.Headers()
var (
msg = &client.Message{
Offset: offset,
Key: m.Key(),
Value: m.Value(),
Timestamp: timestamp,
Headers: headers,
Subject: string(headers["subject"]),
Reply: string(headers["reply"]),
Stream: partition.Stream,
Partition: partition.Id,
Offset: offset,
Key: m.Key(),
Value: m.Value(),
Timestamp: timestamp,
Headers: headers,
Subject: string(headers["subject"]),
ReplySubject: string(headers["reply"]),
}
)
select {
Expand Down
Loading

0 comments on commit 37038c2

Please sign in to comment.