Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ sources-logs = [
"sources-exec",
"sources-file",
"sources-fluent",
"sources-gcp_cloud_storage",
"sources-gcp_pubsub",
"sources-heroku_logs",
"sources-http_server",
Expand Down Expand Up @@ -608,6 +609,7 @@ sources-exec = []
sources-file = ["vector-lib/file-source"]
sources-file_descriptor = ["tokio-util/io"]
sources-fluent = ["dep:base64", "sources-utils-net-tcp", "tokio-util/net", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sources-gcp_cloud_storage = ["gcp", "dep:h2", "dep:prost", "dep:prost-types", "protobuf-build", "dep:tonic"]
sources-gcp_pubsub = ["gcp", "dep:h2", "dep:prost", "dep:prost-types", "protobuf-build", "dep:tonic"]
sources-heroku_logs = ["sources-utils-http", "sources-utils-http-query", "sources-http_server"]
sources-host_metrics = ["heim/cpu", "heim/host", "heim/memory", "heim/net"]
Expand Down Expand Up @@ -904,7 +906,7 @@ docker-logs-integration-tests = ["sources-docker_logs", "unix"]
es-integration-tests = ["sinks-elasticsearch", "aws-core"]
eventstoredb_metrics-integration-tests = ["sources-eventstoredb_metrics"]
fluent-integration-tests = ["docker", "sources-fluent"]
gcp-cloud-storage-integration-tests = ["sinks-gcp"]
gcp-cloud-storage-integration-tests = ["sinks-gcp", "sources-gcp_cloud_storage"]
gcp-integration-tests = ["sinks-gcp"]
gcp-pubsub-integration-tests = ["sinks-gcp", "sources-gcp_pubsub"]
greptimedb-integration-tests = ["sinks-greptimedb_metrics", "sinks-greptimedb_logs"]
Expand Down
197 changes: 197 additions & 0 deletions src/internal_events/gcp_cloud_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{error_stage, error_type};

#[derive(Debug)]
pub struct GcsObjectDownloadSucceeded<'a> {
pub bucket: &'a str,
pub object: &'a str,
pub byte_size: usize,
}

impl<'a> InternalEvent for GcsObjectDownloadSucceeded<'a> {
fn emit(self) {
debug!(
message = "Successfully downloaded GCS object.",
bucket = %self.bucket,
object = %self.object,
byte_size = %self.byte_size,
);
counter!("component_received_bytes_total")
.increment(self.byte_size as u64);
}
}

#[derive(Debug)]
pub struct GcsObjectDownloadError<'a> {
pub bucket: &'a str,
pub object: &'a str,
pub error: &'a dyn std::error::Error,
}

impl<'a> InternalEvent for GcsObjectDownloadError<'a> {
fn emit(self) {
error!(
message = "Failed to download GCS object.",
bucket = %self.bucket,
object = %self.object,
error = %self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::RECEIVING,
);
counter!(
"component_errors_total",
"error_type" => error_type::REQUEST_FAILED,
"stage" => error_stage::RECEIVING,
)
.increment(1);
}
}

#[derive(Debug)]
pub struct GcsObjectProcessingSucceeded<'a> {
pub bucket: &'a str,
pub object: &'a str,
pub events_count: usize,
}

impl<'a> InternalEvent for GcsObjectProcessingSucceeded<'a> {
fn emit(self) {
debug!(
message = "Successfully processed GCS object.",
bucket = %self.bucket,
object = %self.object,
events_count = %self.events_count,
);
counter!("component_received_events_total")
.increment(self.events_count as u64);
}
}

#[derive(Debug)]
pub struct GcsObjectProcessingError<'a> {
pub bucket: &'a str,
pub object: &'a str,
pub error: &'a dyn std::error::Error,
}

impl<'a> InternalEvent for GcsObjectProcessingError<'a> {
fn emit(self) {
error!(
message = "Failed to process GCS object.",
bucket = %self.bucket,
object = %self.object,
error = %self.error,
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,
);
counter!(
"component_errors_total",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::PROCESSING,
)
.increment(1);
}
}

#[derive(Debug)]
pub struct GcsPubsubMessageReceived<'a> {
pub subscription: &'a str,
pub message_count: usize,
}

impl<'a> InternalEvent for GcsPubsubMessageReceived<'a> {
fn emit(self) {
debug!(
message = "Received messages from GCS Pub/Sub subscription.",
subscription = %self.subscription,
message_count = %self.message_count,
);
counter!("component_received_messages_total")
.increment(self.message_count as u64);
}
}

#[derive(Debug)]
pub struct GcsPubsubMessageError<'a> {
pub subscription: &'a str,
pub error: &'a dyn std::error::Error,
}

impl<'a> InternalEvent for GcsPubsubMessageError<'a> {
fn emit(self) {
error!(
message = "Error receiving messages from GCS Pub/Sub subscription.",
subscription = %self.subscription,
error = %self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::RECEIVING,
);
counter!(
"component_errors_total",
"error_type" => error_type::REQUEST_FAILED,
"stage" => error_stage::RECEIVING,
)
.increment(1);
}
}

#[derive(Debug)]
pub struct GcsNotificationReceived<'a> {
pub bucket: &'a str,
pub object: &'a str,
pub event_type: &'a str,
}

impl<'a> InternalEvent for GcsNotificationReceived<'a> {
fn emit(self) {
debug!(
message = "Received GCS bucket notification.",
bucket = %self.bucket,
object = %self.object,
event_type = %self.event_type,
);
counter!("gcp_cloud_storage_notifications_received_total")
.increment(1);
}
}

#[derive(Debug)]
pub struct GcsNotificationInvalidEventIgnored<'a> {
pub bucket: &'a str,
pub object: &'a str,
pub event_type: &'a str,
}

impl<'a> InternalEvent for GcsNotificationInvalidEventIgnored<'a> {
fn emit(self) {
debug!(
message = "Ignored GCS notification for unsupported event type.",
bucket = %self.bucket,
object = %self.object,
event_type = %self.event_type,
);
counter!("gcp_cloud_storage_notifications_ignored_total")
.increment(1);
}
}

#[derive(Debug)]
pub struct GcsObjectFilteredByBucket<'a> {
pub bucket: &'a str,
pub object: &'a str,
pub configured_bucket: &'a str,
}

impl<'a> InternalEvent for GcsObjectFilteredByBucket<'a> {
fn emit(self) {
debug!(
message = "Filtered out GCS object due to bucket mismatch.",
bucket = %self.bucket,
object = %self.object,
configured_bucket = %self.configured_bucket,
);
counter!("gcp_cloud_storage_objects_filtered_total")
.increment(1);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add newline

4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ mod file_descriptor;
mod filter;
#[cfg(feature = "sources-fluent")]
mod fluent;
#[cfg(feature = "sources-gcp_cloud_storage")]
mod gcp_cloud_storage;
#[cfg(feature = "sources-gcp_pubsub")]
mod gcp_pubsub;
#[cfg(any(feature = "sources-vector", feature = "sources-opentelemetry"))]
Expand Down Expand Up @@ -197,6 +199,8 @@ pub(crate) use self::file_descriptor::*;
pub(crate) use self::filter::*;
#[cfg(feature = "sources-fluent")]
pub(crate) use self::fluent::*;
// #[cfg(feature = "sources-gcp_cloud_storage")]
// pub(crate) use self::gcp_cloud_storage::*;
Comment on lines +202 to +203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// #[cfg(feature = "sources-gcp_cloud_storage")]
// pub(crate) use self::gcp_cloud_storage::*;

#[cfg(feature = "sources-gcp_pubsub")]
pub(crate) use self::gcp_pubsub::*;
#[cfg(any(feature = "sources-vector", feature = "sources-opentelemetry"))]
Expand Down
Loading
Loading