Skip to content

Commit c9def83

Browse files
authored
feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink crates (#3140)
1 parent 65020fc commit c9def83

35 files changed

Lines changed: 11074 additions & 2091 deletions

Cargo.lock

Lines changed: 25 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ governor = "0.10.4"
171171
harness_derive = { path = "core/harness_derive" }
172172
hash32 = "1.0.0"
173173
hostname = "0.4.2"
174-
http = "1"
174+
http = "1.4.0"
175175
human-repr = "1.1.0"
176176
humantime = "2.3.0"
177177
hwlocality = "1.0.0-alpha.12"

core/connectors/sinks/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s
1111
| **doris_sink** | Loads JSON messages into Apache Doris tables via the Stream Load HTTP API |
1212
| **elasticsearch_sink** | Sends messages to Elasticsearch indices for full-text search and analytics |
1313
| **iceberg_sink** | Writes data to Apache Iceberg tables via REST catalog with S3/GCS/Azure storage |
14+
| **influxdb_sink** | Writes messages to InfluxDB as line-protocol points; supports both V2 (org/bucket, Flux) and V3 (db, SQL) |
1415
| **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas |
1516
| **quickwit_sink** | Indexes messages in Quickwit search engine for log analytics |
1617
| **stdout_sink** | Prints messages to standard output (useful for debugging and development) |

core/connectors/sinks/influxdb_sink/Cargo.toml

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,24 @@ repository = "https://github.com/apache/iggy"
2929
readme = "../../README.md"
3030
publish = false
3131

32-
[package.metadata.cargo-machete]
33-
ignored = ["dashmap", "once_cell", "futures"]
34-
3532
[lib]
3633
crate-type = ["cdylib", "lib"]
3734

3835
[dependencies]
3936
async-trait = { workspace = true }
4037
base64 = { workspace = true }
4138
bytes = { workspace = true }
42-
dashmap = { workspace = true }
43-
futures = { workspace = true }
4439
iggy_common = { workspace = true }
4540
iggy_connector_sdk = { workspace = true }
46-
once_cell = { workspace = true }
4741
reqwest = { workspace = true }
4842
reqwest-middleware = { workspace = true }
4943
secrecy = { workspace = true }
5044
serde = { workspace = true }
5145
serde_json = { workspace = true }
52-
simd-json = { workspace = true }
5346
tokio = { workspace = true }
5447
tracing = { workspace = true }
48+
49+
[dev-dependencies]
50+
axum = { workspace = true }
51+
simd-json = { workspace = true }
52+
toml = { workspace = true }
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# InfluxDB Sink Connector
2+
3+
A sink connector that consumes messages from Iggy streams and writes them to InfluxDB as line-protocol points. Supports both InfluxDB V2 (OSS 2.x / Cloud 2.x) and InfluxDB V3 (Core / Enterprise).
4+
5+
## V2 vs V3 Differences
6+
7+
| Aspect | InfluxDB V2 | InfluxDB V3 |
8+
| --- | --- | --- |
9+
| Data organisation | `org` + `bucket` | `db` |
10+
| Write endpoint | `POST /api/v2/write` | `POST /api/v3/write_lp` |
11+
| Auth header | `Authorization: Token {t}` | `Authorization: Bearer {t}` |
12+
| Precision values | `ns`, `us`, `ms`, `s` | same short forms accepted |
13+
| Config `version` key | `"v2"` (or omit — default) | `"v3"` |
14+
15+
The write body (InfluxDB line protocol), retry/circuit-breaker behaviour, batch accumulation, and payload format handling are identical between versions.
16+
17+
## Configuration
18+
19+
Select the version with `version = "v2"` or `version = "v3"`. Omitting `version` defaults to `"v2"` for backward compatibility with existing deployments.
20+
21+
### V2 — InfluxDB OSS 2.x / Cloud
22+
23+
```toml
24+
version = "v2"
25+
url = "http://localhost:8086"
26+
org = "my-org"
27+
bucket = "my-bucket"
28+
token = "my-token"
29+
30+
# Optional
31+
measurement = "iggy_events" # line-protocol measurement name (default: iggy_messages)
32+
precision = "us" # ns | us | ms | s (default: "us")
33+
batch_size = 500 # messages per write request (default: 500)
34+
payload_format = "json" # json | text | base64 (default: "json")
35+
36+
include_metadata = true # inject offset/stream/topic fields (default: true)
37+
include_checksum = false
38+
include_origin_timestamp = false
39+
include_stream_tag = false # add stream name as a line-protocol tag
40+
include_topic_tag = false # add topic name as a line-protocol tag
41+
include_partition_tag = false # add partition id as a line-protocol tag
42+
verbose_logging = false
43+
```
44+
45+
### V3 — InfluxDB 3.x Core / Enterprise
46+
47+
```toml
48+
version = "v3"
49+
url = "http://localhost:8181"
50+
db = "my-db"
51+
token = "my-token"
52+
53+
# Optional — same fields as V2 except org/bucket are replaced by db
54+
measurement = "iggy_events"
55+
precision = "us"
56+
batch_size = 500
57+
payload_format = "json"
58+
59+
include_metadata = true
60+
include_checksum = false
61+
include_origin_timestamp = false
62+
include_stream_tag = false
63+
include_topic_tag = false
64+
include_partition_tag = false
65+
verbose_logging = false
66+
```
67+
68+
### Resilience Fields (both versions)
69+
70+
```toml
71+
timeout = "30s" # per-request timeout
72+
max_retries = 3 # retries per write on transient errors (429/5xx)
73+
retry_delay = "1s" # initial backoff between retries
74+
retry_max_delay = "5s" # backoff cap
75+
max_open_retries = 10 # retries during open() health check
76+
open_retry_max_delay = "60s" # backoff cap for open() retries
77+
circuit_breaker_threshold = 5 # consecutive failures before circuit trips
78+
circuit_breaker_cool_down = "30s" # how long circuit stays open before half-open probe
79+
```
80+
81+
## Payload Formats
82+
83+
- **`json`** (default): Each message payload is parsed as JSON and its fields become line-protocol field set entries.
84+
- **`text`**: Payload treated as a plain string; written as a single `value` field.
85+
- **`base64`**: Payload decoded from base64; written as raw bytes (stored as a string field).
86+
87+
## Full Configuration Example
88+
89+
```toml
90+
[[sinks]]
91+
key = "influxdb-sink"
92+
enabled = true
93+
path = "target/release/libiggy_connector_influxdb_sink"
94+
95+
[[sinks.streams]]
96+
stream = "metrics"
97+
topic = "cpu"
98+
schema = "json"
99+
batch_length = 100
100+
linger_time = "10ms"
101+
102+
[sinks.plugin_config]
103+
version = "v2"
104+
url = "http://localhost:8086"
105+
org = "acme"
106+
bucket = "telemetry"
107+
token = "my-secret-token"
108+
measurement = "cpu_metrics"
109+
batch_size = 200
110+
precision = "ms"
111+
include_stream_tag = true
112+
include_topic_tag = true
113+
circuit_breaker_threshold = 3
114+
circuit_breaker_cool_down = "15s"
115+
```
116+
117+
## Architecture Notes
118+
119+
The sink uses a layered design:
120+
121+
- **Batch accumulator**: messages are serialised to line protocol and buffered until `batch_size` is reached, then flushed in a single HTTP POST.
122+
- **Retry middleware**: `reqwest-retry` with exponential backoff handles 429 and 5xx responses automatically before the connector-level retry logic runs.
123+
- **Circuit breaker**: after `circuit_breaker_threshold` consecutive failures the connector stops issuing writes and waits for the cool-down window before probing again.
124+
- **Precision mapping**: V3's `/api/v3/write_lp` endpoint requires full English words (`nanosecond`, `microsecond`, `millisecond`, `second`); the connector maps the short forms automatically.

core/connectors/sinks/influxdb_sink/config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ poll_interval = "5ms"
3232
consumer_group = "influxdb_sink"
3333

3434
[plugin_config]
35+
version = "v2"
3536
url = "http://localhost:8086"
3637
org = "iggy"
3738
bucket = "events"

0 commit comments

Comments
 (0)