Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions rust/otap-dataflow/configs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ Demonstrates the filter processor:

- Generates fake data -> filter processor -> debug processor -> noop exporter

### `fake-metric-filter-debug-noop.yaml`

Demonstrates metric-name filtering:

- Generates fake metrics -> filter processor by metric name -> debug processor
-> noop exporter

### `fake-transform-debug-noop.yaml`

Demonstrate using the transform processor to transform data
Expand Down Expand Up @@ -80,11 +87,10 @@ Generates mixed-tenant traffic using weighted resource attribute rotation:

The `resource_attributes` field accepts three forms:

| Form | Description |
| ---- | ----------- |
| Single map | All batches carry the same attributes (weight 1) |
| List of maps | Equal round-robin rotation across entries (weight 1 each) |
| List of weighted entries (`attrs` + `weight`) | Each entry receives batches proportional to its weight |
- Single map: all batches carry the same attributes (weight 1).
- List of maps: equal round-robin rotation across entries (weight 1 each).
- List of weighted entries (`attrs` + `weight`): each entry receives batches
proportional to its weight.

> **Note:** `resource_attributes` only applies to `data_source: static`.
> With `generation_strategy: pre_generated`, only the first attribute set is used.
Expand Down Expand Up @@ -163,14 +169,21 @@ Syslog/CEF receiver with performance metrics:
To send a quick test message (UDP):

```bash
echo "<134>$(date '+%b %d %H:%M:%S') testhost testtag: Test message" | nc -u -w1 127.0.0.1 5140
echo "<134>$(date '+%b %d %H:%M:%S') testhost testtag: Test message" \
| nc -u -w1 127.0.0.1 5140
```

For sustained load testing, see the [load generator](../../tools/pipeline_perf_test/load_generator/readme.md):
For sustained load testing, see the
[load generator](../../tools/pipeline_perf_test/load_generator/readme.md):

```bash
cd tools/pipeline_perf_test/load_generator
python loadgen.py --load-type syslog --syslog-server 127.0.0.1 --syslog-port 5140 --syslog-transport udp --duration 15
python loadgen.py \
--load-type syslog \
--syslog-server 127.0.0.1 \
--syslog-port 5140 \
--syslog-transport udp \
--duration 15
```

> **Note:** The default `syslog-perf.yaml` config only enables UDP.
Expand Down
50 changes: 50 additions & 0 deletions rust/otap-dataflow/configs/fake-metric-filter-debug-noop.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: otel_dataflow/v1
engine: {}
groups:
default:
pipelines:
main:
policies:
channel_capacity:
control:
node: 100
pipeline: 100
pdata: 128

nodes:
receiver:
type: receiver:traffic_generator
config:
data_source: static
traffic_config:
max_signal_count: 1000
max_batch_size: 100
signals_per_second: 100
metric_weight: 100
trace_weight: 0
log_weight: 0
filter:
type: processor:filter
config:
metrics:
include:
match_type: strict
metric_names:
- http.server.request.count
- process.cpu.utilization
debug:
type: processor:debug
config:
verbosity: detailed
mode: signal
noop:
type: exporter:noop
config:

connections:
- from: receiver
to: filter
- from: filter
to: debug
- from: debug
to: noop
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ For reference please the golang version of the filter processor.

```yaml
config:
metrics:
include:
match_type: strict
metric_names:
- http.server.request.count
- process.cpu.utilization
exclude:
match_type: regexp
metric_names:
- ^internal\..*$
logs:
include:
match_type: strict
Expand Down Expand Up @@ -77,7 +87,15 @@ config:
value: false
```

Currently we don't support metric filtering
For a runnable metric-name filter pipeline, see
[`configs/fake-metric-filter-debug-noop.yaml`](../../../../configs/fake-metric-filter-debug-noop.yaml).

### Metrics

To filter metrics, define `metrics.include` or `metrics.exclude` with a
`match_type` and `metric_names`. Supported `match_type` values are `strict` and
`regexp`. When both `include` and `exclude` are defined, include filtering runs
first, and exclude filtering is applied to that result.

### Logs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
//! Implementation of the configuration of the filter processor
//!

use otap_df_pdata::otap::filter::{logs::LogFilter, traces::TraceFilter};
use otap_df_pdata::otap::filter::{logs::LogFilter, metrics::MetricFilter, traces::TraceFilter};

use serde::Deserialize;

#[derive(Debug, Clone, Deserialize)]
pub struct Config {
// ToDo: add metrics
#[serde(default = "default_metric_filter")]
metrics: MetricFilter,
#[serde(default = "default_log_filter")]
logs: LogFilter,
#[serde(default = "default_trace_filter")]
Expand All @@ -22,14 +23,40 @@ const fn default_log_filter() -> LogFilter {
LogFilter::new(None, None, vec![])
}

/// create empty metric filter as default value
const fn default_metric_filter() -> MetricFilter {
MetricFilter::new(None, None)
}

/// create empty trace filter as default value
const fn default_trace_filter() -> TraceFilter {
TraceFilter::new(None, None)
}

impl Config {
pub const fn new(logs: LogFilter, traces: TraceFilter) -> Self {
Self { logs, traces }
Self {
metrics: MetricFilter::new(None, None),
logs,
traces,
}
}

pub const fn new_with_metrics(
metrics: MetricFilter,
logs: LogFilter,
traces: TraceFilter,
) -> Self {
Self {
metrics,
logs,
traces,
}
}

#[must_use]
pub const fn metric_filters(&self) -> &MetricFilter {
&self.metrics
}

#[must_use]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ pub struct FilterPdataMetrics {
/// Number of log signals consumed
#[metric(unit = "{log}")]
pub log_signals_consumed: Counter<u64>,
/// Number of metric signals consumed
#[metric(unit = "{metric}")]
pub metric_signals_consumed: Counter<u64>,
/// Number of span signals consumed
#[metric(unit = "{span}")]
pub span_signals_consumed: Counter<u64>,

/// Number of log signals filtered
#[metric(unit = "{log}")]
pub log_signals_filtered: Counter<u64>,
/// Number of metric signals filtered
#[metric(unit = "{metric}")]
pub metric_signals_filtered: Counter<u64>,
/// Number of span signals filtered
#[metric(unit = "{span}")]
pub span_signals_filtered: Counter<u64>,
Expand Down
Loading
Loading