Skip to content

Commit

Permalink
Fixing broken CI signals
Browse files Browse the repository at this point in the history
Includes updating the generated test certificates for TLS integration tests and a minor addition for stats
  • Loading branch information
slawlor committed Dec 5, 2024
1 parent 480a326 commit b9cf456
Show file tree
Hide file tree
Showing 89 changed files with 547 additions and 1,278 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ jobs:
- name: Authentication Handshake
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/auth-handshake.env up --exit-code-from node-b
FEATURES=${{matrix.features}} docker compose --env-file ./ractor_cluster_integration_tests/envs/auth-handshake.env up --exit-code-from node-b
- name: Process Groups
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/pg-groups.env up --exit-code-from node-b
FEATURES=${{matrix.features}} docker compose --env-file ./ractor_cluster_integration_tests/envs/pg-groups.env up --exit-code-from node-b
- name: Encrypted communications
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/encryption.env up --exit-code-from node-b
FEATURES=${{matrix.features}} docker compose --env-file ./ractor_cluster_integration_tests/envs/encryption.env up --exit-code-from node-b
- name: Transitive connections
working-directory: .
run: |
docker compose --env-file ./ractor_cluster_integration_tests/envs/dist-connect.env up --exit-code-from node-c
FEATURES=${{matrix.features}} docker compose --env-file ./ractor_cluster_integration_tests/envs/dist-connect.env up --exit-code-from node-c
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.3'

services:
node-a:
container_name: "node-a"
Expand Down
2 changes: 2 additions & 0 deletions ractor/src/common_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::concurrency::sleep;
use crate::concurrency::Duration;
use crate::concurrency::Instant;

/// Periodic check for condition
pub async fn periodic_check<F>(check: F, timeout: Duration)
where
F: Fn() -> bool,
Expand All @@ -28,6 +29,7 @@ where
assert!(check(), "Periodic check failed.\n{:?}", backtrace);
}

/// Periodic check of Future for condition
pub async fn periodic_async_check<F, Fut>(check: F, timeout: Duration)
where
F: Fn() -> Fut,
Expand Down
23 changes: 18 additions & 5 deletions ractor/src/factory/factoryimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ where
dead_mans_switch: Option<DeadMansSwitchConfiguration>,
capacity_controller: Option<Box<dyn WorkerCapacityController>>,
lifecycle_hooks: Option<Box<dyn FactoryLifecycleHooks<TKey, TMsg>>>,
// Local counter to avoid having to sum over the worker states for more performant metrics capturing
// in large worker-count factories
processing_messages: usize,
}

impl<TKey, TMsg, TWorkerStart, TWorker, TRouter, TQueue> Debug
Expand Down Expand Up @@ -301,6 +304,7 @@ where
if let Some(worker) = self.pool.get_mut(&worker_hint).filter(|f| f.is_available()) {
if let Some(mut job) = self.queue.pop_front() {
job.accept();
self.processing_messages += 1;
worker.enqueue_job(job)?;
}
} else {
Expand All @@ -315,6 +319,7 @@ where
.and_then(|wid| self.pool.get_mut(&wid));
if let (Some(mut job), Some(worker)) = (self.queue.pop_front(), target_worker) {
job.accept();
self.processing_messages += 1;

Check warning on line 322 in ractor/src/factory/factoryimpl.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/factoryimpl.rs#L322

Added line #L322 was not covered by tests
worker.enqueue_job(job)?;
}
}
Expand Down Expand Up @@ -505,6 +510,9 @@ where
{
// workers are busy, we need to queue a job
self.maybe_enqueue(busy_job);
} else {
// message was routed
self.processing_messages += 1;
}
} else {
tracing::debug!("Factory is draining but a job was received");
Expand All @@ -517,6 +525,10 @@ where
}

fn worker_finished_job(&mut self, who: WorkerId, key: TKey) -> Result<(), ActorProcessingErr> {
if self.processing_messages > 0 {
self.processing_messages -= 1;
}

let (is_worker_draining, should_drop_worker) = if let Some(worker) = self.pool.get_mut(&who)
{
if let Some(job_options) = worker.worker_complete(key)? {
Expand Down Expand Up @@ -573,12 +585,12 @@ where
}
}

let qlen = self.queue.len();
self.stats.record_queue_depth(&self.factory_name, qlen);
self.stats
.record_processing_messages_count(&self.factory_name, self.processing_messages);
self.stats
.record_queue_depth(&self.factory_name, self.queue.len());
self.stats.record_processing_messages_count(
&self.factory_name,
self.pool.values().filter(|f| f.is_working()).count(),
);
.record_in_flight_messages_count(&self.factory_name, self.processing_messages + qlen);
self.stats
.record_worker_count(&self.factory_name, self.pool_size);

Expand Down Expand Up @@ -778,6 +790,7 @@ where
queue,
router,
stats,
processing_messages: 0,
})
}

Expand Down
13 changes: 13 additions & 0 deletions ractor/src/factory/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pub trait FactoryStatsLayer: Send + Sync + 'static {
/// Fixed-period recording of the factory's number of processed messages
fn record_processing_messages_count(&self, factory: &str, count: usize);

/// Fixed-period recording of the factory's in-flight message count (processing + queued)
///
/// Default empty implemention for backwards compatibility
#[allow(unused_variables)]
fn record_in_flight_messages_count(&self, factory: &str, count: usize) {}

Check warning on line 58 in ractor/src/factory/stats.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/stats.rs#L58

Added line #L58 was not covered by tests

/// Fixed-period recording of the factory's number of workers
fn record_worker_count(&self, factory: &str, count: usize);

Expand Down Expand Up @@ -129,6 +135,13 @@ impl FactoryStatsLayer for Option<Arc<dyn FactoryStatsLayer>> {
}
}

/// Fixed-period recording of the factory's in-flight message count (processing + queued)
fn record_in_flight_messages_count(&self, factory: &str, count: usize) {
if let Some(s) = self {
s.record_in_flight_messages_count(factory, count);

Check warning on line 141 in ractor/src/factory/stats.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/stats.rs#L141

Added line #L141 was not covered by tests
}
}

/// Fixed-period recording of the factory's number of workers
fn record_worker_count(&self, factory: &str, count: usize) {
if let Some(s) = self {
Expand Down
1 change: 0 additions & 1 deletion ractor/src/port/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ impl OutputPortSubscription {
/// subscriber_actor_handle.await.unwrap();
/// }
/// ```
pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
/// A trait for subscribing to an [OutputPort]
pub trait OutputPortSubscriberTrait<I>: Send
Expand Down
3 changes: 2 additions & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ build = "src/build.rs"
rust-version = "1.64"

[build-dependencies]
protobuf-src = "2"
# protobuf-src = "2"
protoc-bin-vendored = "3"
prost-build = { version = "0.13" }

[dependencies]
Expand Down
3 changes: 2 additions & 1 deletion ractor_cluster/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol";
const PROTOBUF_FILES: [&str; 4] = ["meta", "node", "auth", "control"];

fn build_protobufs() {
std::env::set_var("PROTOC", protobuf_src::protoc());
let path = protoc_bin_vendored::protoc_bin_path().expect("Failed to find protoc installation");
std::env::set_var("PROTOC", path);

let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len());

Expand Down
1 change: 1 addition & 0 deletions ractor_cluster/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#[macro_export]
macro_rules! derive_serialization_for_prost_type {
{$ty:ty} => {
#[allow(non_local_definitions)]
impl $crate::BytesConvertable for $ty {
fn into_bytes(self) -> Vec<u8> {
<Self as prost::Message>::encode_length_delimited_to_vec(&self)
Expand Down
49 changes: 14 additions & 35 deletions ractor_cluster_integration_tests/src/tests/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
use std::convert::TryFrom;
use std::fs::File;
use std::io::{self, BufReader};
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::Arc;

use clap::Args;
use ractor::concurrency::{sleep, Duration, Instant};
use ractor::Actor;
use rustls_pemfile::{certs, rsa_private_keys};
use ractor::{Actor, ActorProcessingErr};
use tokio_rustls::rustls::pki_types::pem::PemObject;
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName, TrustAnchor};
use tokio_rustls::{TlsAcceptor, TlsConnector};

Expand All @@ -33,35 +33,14 @@ pub struct EncryptionConfig {
client_host: Option<String>,
}

fn load_certs(path_str: &'static str) -> io::Result<Vec<CertificateDer>> {
let path = PathBuf::from(path_str);
let certs: Vec<_> = certs(&mut BufReader::new(File::open(path)?))
.filter_map(|cert| if let Ok(c) = cert { Some(c) } else { None })
.collect();

if certs.is_empty() {
Err(io::Error::new(io::ErrorKind::InvalidData, "invalid cert"))
} else {
Ok(certs)
}
#[allow(elided_named_lifetimes)]
fn load_certs(path_str: &'static str) -> Result<Vec<CertificateDer>, ActorProcessingErr> {
Ok(CertificateDer::pem_file_iter(path_str)?.collect::<Result<Vec<_>, _>>()?)
}

fn load_keys(path_str: &'static str) -> io::Result<Vec<PrivateKeyDer>> {
let path = PathBuf::from(path_str);
let keys: Vec<PrivateKeyDer> = rsa_private_keys(&mut BufReader::new(File::open(path)?))
.filter_map(|key| {
if let Ok(k) = key {
Some(k.into())
} else {
None
}
})
.collect();
if keys.is_empty() {
Err(io::Error::new(io::ErrorKind::InvalidData, "invalid key"))
} else {
Ok(keys)
}
#[allow(elided_named_lifetimes)]
fn load_key(path_str: &'static str) -> Result<PrivateKeyDer, ActorProcessingErr> {
Ok(PrivateKeyDer::from_pem_file(path_str)?)
}

pub async fn test(config: EncryptionConfig) -> i32 {
Expand All @@ -72,20 +51,20 @@ pub async fn test(config: EncryptionConfig) -> i32 {
// Example `rustls` command: cargo run --bin tlsserver-mio -- --certs test-ca/rsa/end.fullchain --key test-ca/rsa/end.rsa -p 8443 echo
//
// combined with source code: https://github.com/tokio-rs/tls/blob/357bc562483dcf04c1f8d08bd1a831b144bf7d4c/tokio-rustls/examples/server/src/main.rs
let cert_path = "test-ca/rsa/end.fullchain";
let key_path = "test-ca/rsa/end.rsa";
let cert_path = "test-ca/rsa-2048/end.fullchain";
let key_path = "test-ca/rsa-2048/end.key";
let certs = load_certs(cert_path).expect("Failed to load encryption certificates");
let mut keys = load_keys(key_path).expect("Failed to load encryption keys");
let key = load_key(key_path).expect("Failed to load encryption keys");

let server_config = tokio_rustls::rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, keys.remove(0))
.with_single_cert(certs, key)
.expect("Failed to build server configuration");
let acceptor = TlsAcceptor::from(Arc::new(server_config));

// ================== Client TLS Configuration ================== //

let ca_path = PathBuf::from("test-ca/rsa/ca.cert");
let ca_path = PathBuf::from("test-ca/rsa-2048/ca.cert");
let mut ca_pem = BufReader::new(File::open(ca_path).expect("Failed to load CA certificate"));
let ca_certs = rustls_pemfile::certs(&mut ca_pem).filter_map(|cert| {
if let Ok(c) = cert {
Expand Down
13 changes: 9 additions & 4 deletions ractor_cluster_integration_tests/test-ca/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Encryption Certificates
# Rustls Test CA

This folder is copied as-is from `rustls`'s public github repository. Full attribution is to their
codebase and we give no guarantee on these certificates. They're for testing utilization only
This directory contains various test certificate authorities, intermediates,
end-entity, and client certificates that are used by Rustls integration tests.

https://github.com/rustls/rustls
You can regenerate the data in this directory by running the
`rustls/examples/internal/test_ca.rs` tool:

```bash
cargo run -p rustls --example test_ca
```
Loading

0 comments on commit b9cf456

Please sign in to comment.