Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing broken CI signals #296

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ jobs:
- name: Authentication Handshake
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/auth-handshake.env up --exit-code-from node-b
FEATURES=${{matrix.features}} docker compose --env-file ./ractor_cluster_integration_tests/envs/auth-handshake.env up --exit-code-from node-b

- name: Process Groups
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/pg-groups.env up --exit-code-from node-b
FEATURES=${{matrix.features}} docker compose --env-file ./ractor_cluster_integration_tests/envs/pg-groups.env up --exit-code-from node-b

- name: Encrypted communications
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/encryption.env up --exit-code-from node-b
FEATURES=${{matrix.features}} docker compose --env-file ./ractor_cluster_integration_tests/envs/encryption.env up --exit-code-from node-b

- name: Transitive connections
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/dist-connect.env up --exit-code-from node-c
FEATURES=${{matrix.features}} docker compose --env-file ./ractor_cluster_integration_tests/envs/dist-connect.env up --exit-code-from node-c

2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.3'

services:
node-a:
container_name: "node-a"
Expand Down
2 changes: 2 additions & 0 deletions ractor/src/common_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::concurrency::sleep;
use crate::concurrency::Duration;
use crate::concurrency::Instant;

/// Periodic check for condition
pub async fn periodic_check<F>(check: F, timeout: Duration)
where
F: Fn() -> bool,
Expand All @@ -28,6 +29,7 @@ where
assert!(check(), "Periodic check failed.\n{:?}", backtrace);
}

/// Periodic check of Future for condition
pub async fn periodic_async_check<F, Fut>(check: F, timeout: Duration)
where
F: Fn() -> Fut,
Expand Down
23 changes: 18 additions & 5 deletions ractor/src/factory/factoryimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@
dead_mans_switch: Option<DeadMansSwitchConfiguration>,
capacity_controller: Option<Box<dyn WorkerCapacityController>>,
lifecycle_hooks: Option<Box<dyn FactoryLifecycleHooks<TKey, TMsg>>>,
// Local counter to avoid having to sum over the worker states for more performant metrics capturing
// in large worker-count factories
processing_messages: usize,
}

impl<TKey, TMsg, TWorkerStart, TWorker, TRouter, TQueue> Debug
Expand Down Expand Up @@ -301,6 +304,7 @@
if let Some(worker) = self.pool.get_mut(&worker_hint).filter(|f| f.is_available()) {
if let Some(mut job) = self.queue.pop_front() {
job.accept();
self.processing_messages += 1;
worker.enqueue_job(job)?;
}
} else {
Expand All @@ -315,6 +319,7 @@
.and_then(|wid| self.pool.get_mut(&wid));
if let (Some(mut job), Some(worker)) = (self.queue.pop_front(), target_worker) {
job.accept();
self.processing_messages += 1;

Check warning on line 322 in ractor/src/factory/factoryimpl.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/factoryimpl.rs#L322

Added line #L322 was not covered by tests
worker.enqueue_job(job)?;
}
}
Expand Down Expand Up @@ -505,6 +510,9 @@
{
// workers are busy, we need to queue a job
self.maybe_enqueue(busy_job);
} else {
// message was routed
self.processing_messages += 1;
}
} else {
tracing::debug!("Factory is draining but a job was received");
Expand All @@ -517,6 +525,10 @@
}

fn worker_finished_job(&mut self, who: WorkerId, key: TKey) -> Result<(), ActorProcessingErr> {
if self.processing_messages > 0 {
self.processing_messages -= 1;
}

let (is_worker_draining, should_drop_worker) = if let Some(worker) = self.pool.get_mut(&who)
{
if let Some(job_options) = worker.worker_complete(key)? {
Expand Down Expand Up @@ -573,12 +585,12 @@
}
}

let qlen = self.queue.len();
self.stats.record_queue_depth(&self.factory_name, qlen);
self.stats
.record_processing_messages_count(&self.factory_name, self.processing_messages);
self.stats
.record_queue_depth(&self.factory_name, self.queue.len());
self.stats.record_processing_messages_count(
&self.factory_name,
self.pool.values().filter(|f| f.is_working()).count(),
);
.record_in_flight_messages_count(&self.factory_name, self.processing_messages + qlen);
self.stats
.record_worker_count(&self.factory_name, self.pool_size);

Expand Down Expand Up @@ -778,6 +790,7 @@
queue,
router,
stats,
processing_messages: 0,
})
}

Expand Down
13 changes: 13 additions & 0 deletions ractor/src/factory/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
/// Fixed-period recording of the factory's number of processed messages
fn record_processing_messages_count(&self, factory: &str, count: usize);

/// Fixed-period recording of the factory's in-flight message count (processing + queued)
///
/// Default empty implemention for backwards compatibility
#[allow(unused_variables)]
fn record_in_flight_messages_count(&self, factory: &str, count: usize) {}

Check warning on line 58 in ractor/src/factory/stats.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/stats.rs#L58

Added line #L58 was not covered by tests

/// Fixed-period recording of the factory's number of workers
fn record_worker_count(&self, factory: &str, count: usize);

Expand Down Expand Up @@ -129,6 +135,13 @@
}
}

/// Fixed-period recording of the factory's in-flight message count (processing + queued)
fn record_in_flight_messages_count(&self, factory: &str, count: usize) {
if let Some(s) = self {
s.record_in_flight_messages_count(factory, count);

Check warning on line 141 in ractor/src/factory/stats.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/stats.rs#L141

Added line #L141 was not covered by tests
}
}

/// Fixed-period recording of the factory's number of workers
fn record_worker_count(&self, factory: &str, count: usize) {
if let Some(s) = self {
Expand Down
1 change: 0 additions & 1 deletion ractor/src/port/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ impl OutputPortSubscription {
/// subscriber_actor_handle.await.unwrap();
/// }
/// ```
pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
/// A trait for subscribing to an [OutputPort]
pub trait OutputPortSubscriberTrait<I>: Send
Expand Down
3 changes: 2 additions & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ build = "src/build.rs"
rust-version = "1.64"

[build-dependencies]
protobuf-src = "2"
# protobuf-src = "2"
protoc-bin-vendored = "3"
prost-build = { version = "0.13" }

[dependencies]
Expand Down
3 changes: 2 additions & 1 deletion ractor_cluster/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol";
const PROTOBUF_FILES: [&str; 4] = ["meta", "node", "auth", "control"];

fn build_protobufs() {
std::env::set_var("PROTOC", protobuf_src::protoc());
let path = protoc_bin_vendored::protoc_bin_path().expect("Failed to find protoc installation");
std::env::set_var("PROTOC", path);

let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len());

Expand Down
1 change: 1 addition & 0 deletions ractor_cluster/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#[macro_export]
macro_rules! derive_serialization_for_prost_type {
{$ty:ty} => {
#[allow(non_local_definitions)]
impl $crate::BytesConvertable for $ty {
fn into_bytes(self) -> Vec<u8> {
<Self as prost::Message>::encode_length_delimited_to_vec(&self)
Expand Down
49 changes: 14 additions & 35 deletions ractor_cluster_integration_tests/src/tests/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
use std::convert::TryFrom;
use std::fs::File;
use std::io::{self, BufReader};
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::Arc;

use clap::Args;
use ractor::concurrency::{sleep, Duration, Instant};
use ractor::Actor;
use rustls_pemfile::{certs, rsa_private_keys};
use ractor::{Actor, ActorProcessingErr};
use tokio_rustls::rustls::pki_types::pem::PemObject;
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName, TrustAnchor};
use tokio_rustls::{TlsAcceptor, TlsConnector};

Expand All @@ -33,35 +33,14 @@ pub struct EncryptionConfig {
client_host: Option<String>,
}

fn load_certs(path_str: &'static str) -> io::Result<Vec<CertificateDer>> {
let path = PathBuf::from(path_str);
let certs: Vec<_> = certs(&mut BufReader::new(File::open(path)?))
.filter_map(|cert| if let Ok(c) = cert { Some(c) } else { None })
.collect();

if certs.is_empty() {
Err(io::Error::new(io::ErrorKind::InvalidData, "invalid cert"))
} else {
Ok(certs)
}
#[allow(elided_named_lifetimes)]
fn load_certs(path_str: &'static str) -> Result<Vec<CertificateDer>, ActorProcessingErr> {
Ok(CertificateDer::pem_file_iter(path_str)?.collect::<Result<Vec<_>, _>>()?)
}

fn load_keys(path_str: &'static str) -> io::Result<Vec<PrivateKeyDer>> {
let path = PathBuf::from(path_str);
let keys: Vec<PrivateKeyDer> = rsa_private_keys(&mut BufReader::new(File::open(path)?))
.filter_map(|key| {
if let Ok(k) = key {
Some(k.into())
} else {
None
}
})
.collect();
if keys.is_empty() {
Err(io::Error::new(io::ErrorKind::InvalidData, "invalid key"))
} else {
Ok(keys)
}
#[allow(elided_named_lifetimes)]
fn load_key(path_str: &'static str) -> Result<PrivateKeyDer, ActorProcessingErr> {
Ok(PrivateKeyDer::from_pem_file(path_str)?)
}

pub async fn test(config: EncryptionConfig) -> i32 {
Expand All @@ -72,20 +51,20 @@ pub async fn test(config: EncryptionConfig) -> i32 {
// Example `rustls` command: cargo run --bin tlsserver-mio -- --certs test-ca/rsa/end.fullchain --key test-ca/rsa/end.rsa -p 8443 echo
//
// combined with source code: https://github.com/tokio-rs/tls/blob/357bc562483dcf04c1f8d08bd1a831b144bf7d4c/tokio-rustls/examples/server/src/main.rs
let cert_path = "test-ca/rsa/end.fullchain";
let key_path = "test-ca/rsa/end.rsa";
let cert_path = "test-ca/rsa-2048/end.fullchain";
let key_path = "test-ca/rsa-2048/end.key";
let certs = load_certs(cert_path).expect("Failed to load encryption certificates");
let mut keys = load_keys(key_path).expect("Failed to load encryption keys");
let key = load_key(key_path).expect("Failed to load encryption keys");

let server_config = tokio_rustls::rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, keys.remove(0))
.with_single_cert(certs, key)
.expect("Failed to build server configuration");
let acceptor = TlsAcceptor::from(Arc::new(server_config));

// ================== Client TLS Configuration ================== //

let ca_path = PathBuf::from("test-ca/rsa/ca.cert");
let ca_path = PathBuf::from("test-ca/rsa-2048/ca.cert");
let mut ca_pem = BufReader::new(File::open(ca_path).expect("Failed to load CA certificate"));
let ca_certs = rustls_pemfile::certs(&mut ca_pem).filter_map(|cert| {
if let Ok(c) = cert {
Expand Down
13 changes: 9 additions & 4 deletions ractor_cluster_integration_tests/test-ca/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Encryption Certificates
# Rustls Test CA

This folder is copied as-is from `rustls`'s public github repository. Full attribution is to their
codebase and we give no guarantee on these certificates. They're for testing utilization only
This directory contains various test certificate authorities, intermediates,
end-entity, and client certificates that are used by Rustls integration tests.

https://github.com/rustls/rustls
You can regenerate the data in this directory by running the
`rustls/examples/internal/test_ca.rs` tool:

```bash
cargo run -p rustls --example test_ca
```
Loading
Loading