Skip to content

Commit

Permalink
rsc: Implement basic TTL eviction for jobs (#1478)
Browse files Browse the repository at this point in the history
* Implement basic TTL eviction for jobs

* fix duration

* Add ttl test
  • Loading branch information
V-FEXrt authored Dec 4, 2023
1 parent 9cd92bf commit 9238ebf
Showing 1 changed file with 111 additions and 7 deletions.
118 changes: 111 additions & 7 deletions rust/rsc/src/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use std::io::{Error, ErrorKind};
use std::sync::Arc;
use tracing;

use sea_orm::{ColumnTrait, DeleteResult, EntityTrait, QueryFilter};

use chrono::{Duration, Utc};

mod add_job;
mod api_key_check;
mod read_job;
Expand Down Expand Up @@ -60,16 +64,35 @@ fn make_app(state: Arc<sea_orm::DatabaseConnection>) -> Router {
)
}

fn launch_eviction(state: Arc<sea_orm::DatabaseConnection>, tick_interval: u64, deadline: i64) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(tick_interval));
loop {
interval.tick().await;

let deadline = (Utc::now() - Duration::seconds(deadline)).naive_utc();

let res: DeleteResult = entity::job::Entity::delete_many()
.filter(entity::job::Column::CreatedAt.lte(deadline))
.exec(state.as_ref())
.await
.unwrap();

tracing::info!(%res.rows_affected, "Performed TTL eviction tick.");
}
});
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// setup a subscriber so that we always have logging
// setup a subscriber for logging
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber)?;

// Parse our arguments
// Parse the arguments
let args = ServerOptions::parse();

// Get our configuration
// Get the configuration
let config = config::GSCConfig::new(config::GSCConfigOverride {
config_override: args.config_override,
server_addr: args.server_addr,
Expand All @@ -81,7 +104,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
return Ok(());
}

// connect to our db
// connect to the db
let connection = sea_orm::Database::connect(&config.database_url).await?;
let pending_migrations = Migrator::get_pending_migrations(&connection).await?;
if pending_migrations.len() != 0 {
Expand All @@ -95,10 +118,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::error! {%err, "unperformed migrations, please apply these migrations before starting gsc"};
Err(err)?;
}
let state = Arc::new(connection);

let app = make_app(Arc::new(connection));
// Launch the eviction thread
launch_eviction(state.clone(), 60 * 10, 60 * 60 * 24 * 7);

// run it with hyper on localhost:3000
// Launch the server
let app = make_app(state.clone());
axum::Server::bind(&config.server_addr.parse()?)
.serve(app.into_make_service())
.await?;
Expand All @@ -111,7 +137,7 @@ mod tests {
use super::*;
use data_encoding::BASE64;
use migration::{Migrator, MigratorTrait};
use sea_orm::{ActiveModelTrait, ActiveValue::*, DatabaseConnection, DbErr};
use sea_orm::{ActiveModelTrait, ActiveValue::*, DatabaseConnection, DbErr, PaginatorTrait};
use std::sync::Arc;

use axum::{
Expand Down Expand Up @@ -302,4 +328,82 @@ mod tests {
})
);
}

#[tokio::test]
async fn ttl_eviction() {
let db = make_db().await.unwrap();
let state = Arc::new(db);

let hash: [u8; 32] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
];

// Create a job that is 5 days old
let insert_job = entity::job::ActiveModel {
id: NotSet,
created_at: Set((Utc::now() - Duration::days(5)).naive_utc()),
hash: Set(hash.into()),
cmd: Set("blarg".into()),
env: Set("PATH=/usr/bin".as_bytes().into()),
cwd: Set("/workspace".into()),
stdin: Set("".into()),
is_atty: Set(false),
hidden_info: Set("".into()),
stdout: Set("This is a test".into()),
stderr: Set("This is a very long string for a test".into()),
status: Set(0),
runtime: Set(1.0),
cputime: Set(1.0),
memory: Set(1000),
i_bytes: Set(100000),
o_bytes: Set(1000),
};

insert_job.save(state.clone().as_ref()).await.unwrap();

let hash: [u8; 32] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1,
];

// Create a job that is 1 day old
let insert_job = entity::job::ActiveModel {
id: NotSet,
created_at: Set((Utc::now() - Duration::days(1)).naive_utc()),
hash: Set(hash.into()),
cmd: Set("blarg2".into()),
env: Set("PATH=/usr/bin".as_bytes().into()),
cwd: Set("/workspace".into()),
stdin: Set("".into()),
is_atty: Set(false),
hidden_info: Set("".into()),
stdout: Set("This is a test".into()),
stderr: Set("This is a very long string for a test".into()),
status: Set(0),
runtime: Set(1.0),
cputime: Set(1.0),
memory: Set(1000),
i_bytes: Set(100000),
o_bytes: Set(1000),
};

insert_job.save(state.clone().as_ref()).await.unwrap();

let count = entity::job::Entity::find()
.count(state.clone().as_ref())
.await
.unwrap();
assert_eq!(count, 2);

// Setup eviction for jobs older than 3 days ticking every second
launch_eviction(state.clone(), 1, 60 * 60 * 24 * 3);
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;

let count = entity::job::Entity::find()
.count(state.clone().as_ref())
.await
.unwrap();
assert_eq!(count, 1);
}
}

0 comments on commit 9238ebf

Please sign in to comment.