Skip to content

Commit 95b3440

Browse files
author
Marc-Andre Giroux
committed
poc: PooledPlanner
1 parent 35052d8 commit 95b3440

File tree

4 files changed

+353
-0
lines changed

4 files changed

+353
-0
lines changed

federation-2/router-bridge/src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,9 @@ pub enum Error {
3636
/// The deno response id we tried to deserialize.
3737
id: String,
3838
},
39+
/// An uncaught error was raised when invoking a custom script.
40+
///
41+
/// This contains the script invocation error message.
42+
#[error("internal error: `{0}`")]
43+
Internal(&'static str),
3944
}

federation-2/router-bridge/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ pub mod introspect;
1111
mod js;
1212
pub mod planner;
1313
mod worker;
14+
mod pool;

federation-2/router-bridge/src/planner.rs

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::fmt::Debug;
66
use std::fmt::Display;
77
use std::fmt::Formatter;
88
use std::marker::PhantomData;
9+
use std::num::NonZeroUsize;
910
use std::sync::Arc;
1011

1112
use serde::de::DeserializeOwned;
@@ -14,6 +15,7 @@ use serde::Serialize;
1415
use thiserror::Error;
1516

1617
use crate::introspect::IntrospectionResponse;
18+
use crate::pool::JsWorkerPool;
1719
use crate::worker::JsWorker;
1820

1921
// ------------------------------------
@@ -398,6 +400,226 @@ where
398400
}
399401
}
400402

403+
/// A Deno worker backed query Planner,
404+
/// using a pool of JsRuntimes load balanced
405+
/// using Power of Two Choices.
406+
pub struct PooledPlanner<T>
407+
where
408+
T: DeserializeOwned + Send + Debug + 'static,
409+
{
410+
pool: Arc<JsWorkerPool>,
411+
schema_id: u64,
412+
t: PhantomData<T>,
413+
}
414+
415+
impl<T> Debug for PooledPlanner<T>
416+
where
417+
T: DeserializeOwned + Send + Debug + 'static,
418+
{
419+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
420+
f.debug_struct("PooledPlanner")
421+
.field("schema_id", &self.schema_id)
422+
.finish()
423+
}
424+
}
425+
426+
impl<T> PooledPlanner<T>
427+
where
428+
T: DeserializeOwned + Send + Debug + 'static,
429+
{
430+
/// Instantiate a `Planner` from a schema string
431+
pub async fn new(
432+
schema: String,
433+
config: QueryPlannerConfig,
434+
pool_size: NonZeroUsize,
435+
) -> Result<Self, Vec<PlannerError>> {
436+
let schema_id: u64 = rand::random();
437+
438+
let pool = JsWorkerPool::new(include_str!("../bundled/plan_worker.js"), pool_size);
439+
440+
let workers_are_setup = pool
441+
.broadcast_request::<PlanCmd, BridgeSetupResult<serde_json::Value>>(PlanCmd::UpdateSchema {
442+
schema,
443+
config,
444+
schema_id,
445+
})
446+
.await
447+
.map_err(|e| {
448+
vec![WorkerError {
449+
name: Some("planner setup error".to_string()),
450+
message: Some(e.to_string()),
451+
stack: None,
452+
extensions: None,
453+
locations: Default::default(),
454+
}
455+
.into()]
456+
});
457+
458+
// Both cases below the mean schema update failed.
459+
// We need to pay attention here.
460+
// returning early will drop the worker, which will join the jsruntime thread.
461+
// however the event loop will run for ever. We need to let the worker know it needs to exit,
462+
// before we drop the worker
463+
match workers_are_setup {
464+
Err(setup_error) => {
465+
let _ = pool
466+
.broadcast_request::<PlanCmd, serde_json::Value>(PlanCmd::Exit { schema_id })
467+
.await;
468+
return Err(setup_error);
469+
}
470+
Ok(responses) => {
471+
for r in responses {
472+
if let Some(error) = r.errors {
473+
let _ = pool.broadcast_send(None, PlanCmd::Exit { schema_id }).await;
474+
return Err(error);
475+
}
476+
}
477+
}
478+
}
479+
480+
let pool = Arc::new(pool);
481+
482+
Ok(Self {
483+
pool,
484+
schema_id,
485+
t: PhantomData,
486+
})
487+
}
488+
489+
/// Update `Planner` from a schema string
490+
pub async fn update(
491+
&self,
492+
schema: String,
493+
config: QueryPlannerConfig,
494+
) -> Result<Self, Vec<PlannerError>> {
495+
let schema_id: u64 = rand::random();
496+
497+
let workers_are_setup = self
498+
.pool
499+
.broadcast_request::<PlanCmd, BridgeSetupResult<serde_json::Value>>(PlanCmd::UpdateSchema {
500+
schema,
501+
config,
502+
schema_id,
503+
})
504+
.await
505+
.map_err(|e| {
506+
vec![WorkerError {
507+
name: Some("planner setup error".to_string()),
508+
message: Some(e.to_string()),
509+
stack: None,
510+
extensions: None,
511+
locations: Default::default(),
512+
}
513+
.into()]
514+
});
515+
516+
// If the update failed, we keep the existing schema in place
517+
match workers_are_setup {
518+
Err(setup_error) => {
519+
return Err(setup_error);
520+
}
521+
Ok(responses) => {
522+
for r in responses {
523+
if let Some(error) = r.errors {
524+
let _ = self.pool.broadcast_send(None, PlanCmd::Exit { schema_id }).await;
525+
return Err(error);
526+
}
527+
}
528+
}
529+
}
530+
531+
Ok(Self {
532+
pool: self.pool.clone(),
533+
schema_id,
534+
t: PhantomData,
535+
})
536+
}
537+
538+
/// Plan a query against an instantiated query planner
539+
pub async fn plan(
540+
&self,
541+
query: String,
542+
operation_name: Option<String>,
543+
options: PlanOptions,
544+
) -> Result<PlanResult<T>, crate::error::Error> {
545+
self.pool
546+
.request(PlanCmd::Plan {
547+
query,
548+
operation_name,
549+
schema_id: self.schema_id,
550+
options,
551+
})
552+
.await
553+
}
554+
555+
/// Generate the API schema from the current schema
556+
pub async fn api_schema(&self) -> Result<ApiSchema, crate::error::Error> {
557+
self.pool
558+
.request(PlanCmd::ApiSchema {
559+
schema_id: self.schema_id,
560+
})
561+
.await
562+
}
563+
564+
/// Generate the introspection response for this query
565+
pub async fn introspect(
566+
&self,
567+
query: String,
568+
) -> Result<IntrospectionResponse, crate::error::Error> {
569+
self.pool
570+
.request(PlanCmd::Introspect {
571+
query,
572+
schema_id: self.schema_id,
573+
})
574+
.await
575+
}
576+
577+
/// Get the operation signature for a query
578+
pub async fn operation_signature(
579+
&self,
580+
query: String,
581+
operation_name: Option<String>,
582+
) -> Result<String, crate::error::Error> {
583+
self.pool
584+
.request(PlanCmd::Signature {
585+
query,
586+
operation_name,
587+
schema_id: self.schema_id,
588+
})
589+
.await
590+
}
591+
592+
/// Extract the subgraph schemas from the supergraph schema
593+
pub async fn subgraphs(&self) -> Result<HashMap<String, String>, crate::error::Error> {
594+
self.pool
595+
.request(PlanCmd::Subgraphs {
596+
schema_id: self.schema_id,
597+
})
598+
.await
599+
}
600+
}
601+
602+
impl<T> Drop for PooledPlanner<T>
603+
where
604+
T: DeserializeOwned + Send + Debug + 'static,
605+
{
606+
fn drop(&mut self) {
607+
// Send a PlanCmd::Exit signal
608+
let pool_clone = self.pool.clone();
609+
let schema_id = self.schema_id;
610+
let _ = std::thread::spawn(move || {
611+
let runtime = tokio::runtime::Builder::new_current_thread()
612+
.build()
613+
.unwrap();
614+
615+
let _ = runtime.block_on(async move {
616+
pool_clone.broadcast_send(None, PlanCmd::Exit { schema_id }).await
617+
});
618+
})
619+
.join();
620+
}
621+
}
622+
401623
/// A Deno worker backed query Planner.
402624
403625
pub struct Planner<T>
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use std::sync::atomic::Ordering;
2+
use std::{num::NonZeroUsize, sync::atomic::AtomicUsize};
3+
use std::fmt::Debug;
4+
use rand::Rng;
5+
use serde::de::DeserializeOwned;
6+
use serde::Serialize;
7+
8+
use tokio::task::JoinSet;
9+
use std::sync::Arc;
10+
11+
use crate::{error::Error, worker::JsWorker};
12+
13+
pub(crate) struct JsWorkerPool {
14+
workers: Vec<Arc<JsWorker>>,
15+
pending_requests: Vec<AtomicUsize>
16+
}
17+
18+
impl JsWorkerPool {
19+
pub(crate) fn new(worker_source_code: &'static str, size: NonZeroUsize) -> Self {
20+
let workers: Vec<Arc<JsWorker>> = (0..size.into())
21+
.map(|_| Arc::new(JsWorker::new(worker_source_code)))
22+
.collect();
23+
24+
let pending_requests: Vec<AtomicUsize> = (0..size.into()).map(|_| AtomicUsize::new(0)).collect();
25+
26+
Self { workers, pending_requests }
27+
}
28+
29+
pub(crate) async fn request<Request, Response>(
30+
&self,
31+
command: Request,
32+
) -> Result<Response, Error>
33+
where
34+
Request: std::hash::Hash + Serialize + Send + Debug + 'static,
35+
Response: DeserializeOwned + Send + Debug + 'static,
36+
{
37+
let (i, worker) = self.choice_of_two();
38+
39+
self.pending_requests[i].fetch_add(1, Ordering::SeqCst);
40+
let result = worker.request(command).await;
41+
self.pending_requests[i].fetch_add(1, Ordering::SeqCst);
42+
43+
result
44+
}
45+
46+
pub(crate) async fn broadcast_request<Request, Response>(
47+
&self,
48+
command: Request
49+
) -> Result<Vec<Response>, Error>
50+
where
51+
Request: std::hash::Hash + Serialize + Send + Debug + Clone + 'static,
52+
Response: DeserializeOwned + Send + Debug + 'static,
53+
{
54+
let mut join_set = JoinSet::new();
55+
56+
for worker in self.workers.iter().cloned() {
57+
let command_clone = command.clone();
58+
59+
join_set.spawn(async move {
60+
worker.request(command_clone).await
61+
});
62+
}
63+
64+
let mut responses = Vec::new();
65+
66+
while let Some(result) = join_set.join_next().await {
67+
let response = result.map_err(|_e| Error::Internal("could not join spawned task"))?;
68+
responses.push(response?);
69+
}
70+
71+
Ok(responses)
72+
}
73+
74+
pub(crate) async fn broadcast_send<Request>(
75+
&self,
76+
id_opt: Option<String>,
77+
request: Request,
78+
) -> Result<(), Error>
79+
where
80+
Request: std::hash::Hash + Serialize + Send + Debug + Clone + 'static,
81+
{
82+
let mut join_set = JoinSet::new();
83+
84+
for worker in self.workers.iter().cloned() {
85+
let request_clone = request.clone();
86+
let id_opt_clone = id_opt.clone();
87+
88+
join_set.spawn(async move {
89+
worker.send(id_opt_clone, request_clone).await
90+
});
91+
}
92+
93+
let mut results = Vec::new();
94+
95+
while let Some(result) = join_set.join_next().await {
96+
let result = result.map_err(|_e| Error::Internal("could not join spawned task"))?;
97+
results.push(result?);
98+
}
99+
100+
Ok(())
101+
}
102+
103+
fn choice_of_two(&self) -> (usize, &JsWorker) {
104+
let mut rng = rand::thread_rng();
105+
106+
let len = self.workers.len();
107+
108+
let index1 = rng.gen_range(0..len);
109+
let mut index2 = rng.gen_range(0..len);
110+
while index2 == index1 {
111+
index2 = rng.gen_range(0..len);
112+
}
113+
114+
let index1_load = &self.pending_requests[index1].load(Ordering::SeqCst);
115+
let index2_load = &self.pending_requests[index2].load(Ordering::SeqCst);
116+
117+
let choice = if index1_load < index2_load {
118+
index1
119+
} else {
120+
index2
121+
};
122+
123+
(choice, &self.workers[choice])
124+
}
125+
}

0 commit comments

Comments
 (0)