From 44898553a69284f306c511798640968a53e6d6ea Mon Sep 17 00:00:00 2001 From: Dan Date: Fri, 13 Sep 2024 05:22:55 -0700 Subject: [PATCH] dev leader deprecation (#2132) ## Description Initial PR the remove leader/follower related code so we can build upon implementing new `rover dev` command while keeping changes atomic. --- src/command/dev/do_dev.rs | 149 +--- src/command/dev/introspect.rs | 8 +- src/command/dev/mod.rs | 11 +- src/command/dev/netstat.rs | 62 -- src/command/dev/protocol/follower/message.rs | 211 ------ .../dev/protocol/follower/messenger.rs | 231 ------- src/command/dev/protocol/follower/mod.rs | 21 - src/command/dev/protocol/leader.rs | 647 ------------------ src/command/dev/protocol/mod.rs | 15 - src/command/dev/protocol/socket.rs | 64 -- src/command/dev/protocol/types.rs | 41 -- src/command/dev/schema.rs | 195 ------ src/command/dev/types.rs | 11 + src/command/dev/watcher.rs | 342 --------- 14 files changed, 18 insertions(+), 1990 deletions(-) delete mode 100644 src/command/dev/netstat.rs delete mode 100644 src/command/dev/protocol/follower/message.rs delete mode 100644 src/command/dev/protocol/follower/messenger.rs delete mode 100644 src/command/dev/protocol/follower/mod.rs delete mode 100644 src/command/dev/protocol/leader.rs delete mode 100644 src/command/dev/protocol/mod.rs delete mode 100644 src/command/dev/protocol/socket.rs delete mode 100644 src/command/dev/protocol/types.rs delete mode 100644 src/command/dev/schema.rs create mode 100644 src/command/dev/types.rs delete mode 100644 src/command/dev/watcher.rs diff --git a/src/command/dev/do_dev.rs b/src/command/dev/do_dev.rs index dc07577cc..2ffa214b4 100644 --- a/src/command/dev/do_dev.rs +++ b/src/command/dev/do_dev.rs @@ -1,18 +1,8 @@ -use anyhow::{anyhow, Context}; use camino::Utf8PathBuf; -use futures::channel::mpsc::channel; -use futures::future::join_all; -use futures::stream::StreamExt; -use futures::FutureExt; -use rover_std::warnln; -use crate::command::dev::protocol::FollowerMessage; use crate::utils::client::StudioClientConfig; -use crate::utils::supergraph_config::get_supergraph_config; use crate::{RoverError, RoverOutput, RoverResult}; -use super::protocol::{FollowerChannel, FollowerMessenger, LeaderChannel, LeaderSession}; -use super::router::RouterConfigHandler; use super::Dev; pub fn log_err_and_continue(err: RoverError) -> RoverError { @@ -26,143 +16,6 @@ impl Dev { override_install_path: Option, client_config: StudioClientConfig, ) -> RoverResult { - self.opts - .plugin_opts - .prompt_for_license_accept(&client_config)?; - - let router_config_handler = RouterConfigHandler::try_from(&self.opts.supergraph_opts)?; - let router_address = router_config_handler.get_router_address(); - let raw_socket_name = router_config_handler.get_raw_socket_name(); - let leader_channel = LeaderChannel::new(); - let follower_channel = FollowerChannel::new(); - - let supergraph_config = get_supergraph_config( - &self.opts.supergraph_opts.graph_ref, - &self.opts.supergraph_opts.supergraph_config_path, - self.opts.supergraph_opts.federation_version.as_ref(), - client_config.clone(), - &self.opts.plugin_opts.profile, - false, - ) - .await?; - - if let Some(mut leader_session) = LeaderSession::new( - override_install_path, - &client_config, - leader_channel.clone(), - follower_channel.clone(), - self.opts.plugin_opts.clone(), - &supergraph_config, - router_config_handler, - self.opts.supergraph_opts.license.clone(), - ) - .await? - { - warnln!( - "Do not run this command in production! It is intended for local development only." - ); - let (ready_sender, mut ready_receiver) = channel(1); - let follower_messenger = FollowerMessenger::from_main_session( - follower_channel.clone().sender, - leader_channel.receiver, - ); - - tokio::task::spawn_blocking(move || { - ctrlc::set_handler(move || { - eprintln!( - "\nshutting down the `rover dev` session and all attached processes..." - ); - let _ = follower_channel - .sender - .send(FollowerMessage::shutdown(true)) - .map_err(|e| { - let e = - RoverError::new(anyhow!("could not shut down router").context(e)); - log_err_and_continue(e) - }); - }) - .context("could not set ctrl-c handler for main `rover dev` process") - .unwrap(); - }); - - let subgraph_watcher_handle = tokio::task::spawn(async move { - let _ = leader_session - .listen_for_all_subgraph_updates(ready_sender) - .await - .map_err(log_err_and_continue); - }); - - ready_receiver.next().await.unwrap(); - - let subgraph_watchers = self - .opts - .supergraph_opts - .get_subgraph_watchers( - &client_config, - supergraph_config, - follower_messenger.clone(), - self.opts.subgraph_opts.subgraph_polling_interval, - &self.opts.plugin_opts.profile, - self.opts.subgraph_opts.subgraph_retries, - ) - .await - .transpose() - .unwrap_or_else(|| { - self.opts - .subgraph_opts - .get_subgraph_watcher( - router_address, - &client_config, - follower_messenger.clone(), - ) - .map(|watcher| vec![watcher]) - })?; - - let futs = subgraph_watchers.into_iter().map(|mut watcher| async move { - let _ = watcher - .watch_subgraph_for_changes(client_config.retry_period) - .await - .map_err(log_err_and_continue); - }); - tokio::join!(join_all(futs), subgraph_watcher_handle.map(|_| ())); - } else { - let follower_messenger = FollowerMessenger::from_attached_session(&raw_socket_name); - let mut subgraph_refresher = self.opts.subgraph_opts.get_subgraph_watcher( - router_address, - &client_config, - follower_messenger.clone(), - )?; - tracing::info!( - "connecting to existing `rover dev` process by communicating via the interprocess socket located at {raw_socket_name}", - ); - - // start the interprocess socket health check in the background - let health_messenger = follower_messenger.clone(); - tokio::task::spawn_blocking(move || { - let _ = health_messenger.health_check().map_err(|_| { - eprintln!("shutting down..."); - std::process::exit(1); - }); - }); - - // set up the ctrl+c handler to notify the main session to remove the killed subgraph - let kill_name = subgraph_refresher.get_name(); - ctrlc::set_handler(move || { - eprintln!("\nshutting down..."); - let _ = follower_messenger - .remove_subgraph(&kill_name) - .map_err(log_err_and_continue); - std::process::exit(1); - }) - .context("could not set ctrl-c handler")?; - - // watch for subgraph changes on the main thread - // it will take care of updating the main `rover dev` session - subgraph_refresher - .watch_subgraph_for_changes(client_config.retry_period) - .await?; - } - - unreachable!("watch_subgraph_for_changes never returns") + todo!() } } diff --git a/src/command/dev/introspect.rs b/src/command/dev/introspect.rs index 9d6586002..033836105 100644 --- a/src/command/dev/introspect.rs +++ b/src/command/dev/introspect.rs @@ -4,9 +4,11 @@ use anyhow::anyhow; use reqwest::Client; use rover_std::Style; -use crate::command::dev::protocol::{SubgraphSdl, SubgraphUrl}; -use crate::command::graph::Introspect as GraphIntrospect; -use crate::command::subgraph::Introspect as SubgraphIntrospect; +use crate::command::{ + dev::types::{SubgraphSdl, SubgraphUrl}, + graph::Introspect as GraphIntrospect, + subgraph::Introspect as SubgraphIntrospect, +}; use crate::options::IntrospectOpts; use crate::{RoverError, RoverErrorSuggestion, RoverResult}; diff --git a/src/command/dev/mod.rs b/src/command/dev/mod.rs index 5fbfaef86..10154e4cb 100644 --- a/src/command/dev/mod.rs +++ b/src/command/dev/mod.rs @@ -19,23 +19,14 @@ mod do_dev; #[cfg(feature = "composition-js")] mod introspect; -#[cfg(feature = "composition-js")] -mod protocol; - #[cfg(feature = "composition-js")] mod router; -#[cfg(feature = "composition-js")] -mod schema; - -#[cfg(feature = "composition-js")] -mod netstat; - #[cfg(not(feature = "composition-js"))] mod no_dev; #[cfg(feature = "composition-js")] -mod watcher; +mod types; #[derive(Debug, Serialize, Parser)] pub struct Dev { diff --git a/src/command/dev/netstat.rs b/src/command/dev/netstat.rs deleted file mode 100644 index ba86dd572..000000000 --- a/src/command/dev/netstat.rs +++ /dev/null @@ -1,62 +0,0 @@ -use reqwest::Url; -use std::{ - collections::HashSet, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, -}; -use url::Host; - -use crate::command::dev::protocol::SubgraphUrl; - -pub fn normalize_loopback_urls(url: &SubgraphUrl) -> Vec { - let hosts = match url.host() { - Some(host) => match host { - Host::Ipv4(ip) => { - if &ip.to_string() == "::" { - vec![ - IpAddr::V4(ip).to_string(), - IpAddr::V4(Ipv4Addr::LOCALHOST).to_string(), - ] - } else { - vec![IpAddr::V4(ip).to_string()] - } - } - Host::Ipv6(ip) => { - if &ip.to_string() == "::" || &ip.to_string() == "::1" { - vec![ - IpAddr::V6(ip).to_string(), - IpAddr::V6(Ipv6Addr::LOCALHOST).to_string(), - ] - } else { - vec![IpAddr::V6(ip).to_string()] - } - } - Host::Domain(domain) => { - if domain == "localhost" { - vec![ - IpAddr::V4(Ipv4Addr::LOCALHOST).to_string(), - IpAddr::V6(Ipv6Addr::LOCALHOST).to_string(), - "[::]".to_string(), - "0.0.0.0".to_string(), - ] - } else { - vec![domain.to_string()] - } - } - }, - None => Vec::new(), - }; - if hosts.is_empty() { - vec![url.clone()] - } else { - Vec::from_iter( - hosts - .iter() - .map(|host| { - let mut url = url.clone(); - let _ = url.set_host(Some(host)); - url - }) - .collect::>(), - ) - } -} diff --git a/src/command/dev/protocol/follower/message.rs b/src/command/dev/protocol/follower/message.rs deleted file mode 100644 index 11d951bca..000000000 --- a/src/command/dev/protocol/follower/message.rs +++ /dev/null @@ -1,211 +0,0 @@ -use anyhow::anyhow; -use apollo_federation_types::build::SubgraphDefinition; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; - -use crate::command::dev::protocol::{entry_from_definition, SubgraphEntry, SubgraphName}; -use crate::{RoverError, RoverResult, PKG_VERSION}; - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct FollowerMessage { - kind: FollowerMessageKind, - is_from_main_session: bool, -} - -impl FollowerMessage { - pub fn get_version(is_from_main_session: bool) -> Self { - Self { - kind: FollowerMessageKind::get_version(), - is_from_main_session, - } - } - - pub fn get_subgraphs(is_from_main_session: bool) -> Self { - Self { - kind: FollowerMessageKind::get_subgraphs(), - is_from_main_session, - } - } - - pub fn health_check(is_from_main_session: bool) -> RoverResult { - if is_from_main_session { - Err(RoverError::new(anyhow!( - "You cannot send a health check from the main `rover dev` process" - ))) - } else { - Ok(Self { - kind: FollowerMessageKind::health_check(), - is_from_main_session, - }) - } - } - - pub fn add_subgraph( - is_from_main_session: bool, - subgraph: &SubgraphDefinition, - ) -> RoverResult { - Ok(Self { - kind: FollowerMessageKind::add_subgraph(subgraph)?, - is_from_main_session, - }) - } - - pub fn update_subgraph( - is_from_main_session: bool, - subgraph: &SubgraphDefinition, - ) -> RoverResult { - Ok(Self { - kind: FollowerMessageKind::update_subgraph(subgraph)?, - is_from_main_session, - }) - } - - pub fn remove_subgraph( - is_from_main_session: bool, - subgraph_name: &SubgraphName, - ) -> RoverResult { - Ok(Self { - kind: FollowerMessageKind::remove_subgraph(subgraph_name), - is_from_main_session, - }) - } - - pub fn shutdown(is_from_main_session: bool) -> Self { - Self { - kind: FollowerMessageKind::shutdown(), - is_from_main_session, - } - } - - pub fn is_from_main_session(&self) -> bool { - self.is_from_main_session - } - - pub fn kind(&self) -> &FollowerMessageKind { - &self.kind - } - - pub fn print(&self) { - if self.is_from_main_session() { - tracing::debug!("sending message to self: {:?}", &self); - } else { - tracing::debug!( - "sending message to the main `rover dev` process: {:?}", - &self - ); - } - match self.kind() { - FollowerMessageKind::AddSubgraph { subgraph_entry } => { - if self.is_from_main_session() { - eprintln!( - "starting a session with the '{}' subgraph", - &subgraph_entry.0 .0 - ); - } else { - eprintln!( - "adding the '{}' subgraph to the session", - &subgraph_entry.0 .0 - ); - } - } - FollowerMessageKind::UpdateSubgraph { subgraph_entry } => { - eprintln!( - "updating the schema for the '{}' subgraph in the session", - &subgraph_entry.0 .0 - ); - } - FollowerMessageKind::RemoveSubgraph { subgraph_name } => { - if self.is_from_main_session() { - eprintln!( - "removing the '{}' subgraph from this session", - &subgraph_name - ); - } else { - tracing::debug!( - "removing the '{}' subgraph from the session", - &subgraph_name - ); - } - } - FollowerMessageKind::Shutdown => { - tracing::debug!("shutting down the router for this session"); - } - FollowerMessageKind::HealthCheck => { - tracing::debug!("sending health check ping to the main process"); - } - FollowerMessageKind::GetVersion { - follower_version: _, - } => { - tracing::debug!("requesting the version of the main process"); - } - FollowerMessageKind::GetSubgraphs => { - tracing::debug!("asking the main process about existing subgraphs"); - } - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum FollowerMessageKind { - GetVersion { follower_version: String }, - GetSubgraphs, - HealthCheck, - Shutdown, - AddSubgraph { subgraph_entry: SubgraphEntry }, - UpdateSubgraph { subgraph_entry: SubgraphEntry }, - RemoveSubgraph { subgraph_name: SubgraphName }, -} - -impl FollowerMessageKind { - fn get_version() -> Self { - Self::GetVersion { - follower_version: PKG_VERSION.to_string(), - } - } - - fn get_subgraphs() -> Self { - Self::GetSubgraphs - } - - fn health_check() -> Self { - Self::HealthCheck - } - - fn shutdown() -> Self { - Self::Shutdown - } - - fn add_subgraph(subgraph: &SubgraphDefinition) -> RoverResult { - Ok(Self::AddSubgraph { - subgraph_entry: entry_from_definition(subgraph)?, - }) - } - - fn update_subgraph(subgraph: &SubgraphDefinition) -> RoverResult { - Ok(Self::UpdateSubgraph { - subgraph_entry: entry_from_definition(subgraph)?, - }) - } - - fn remove_subgraph(subgraph_name: &SubgraphName) -> Self { - Self::RemoveSubgraph { - subgraph_name: subgraph_name.to_string(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn follower_message_can_request_version() { - let message = FollowerMessageKind::get_version(); - let expected_message_json = serde_json::to_string(&message).unwrap(); - assert_eq!( - expected_message_json, - serde_json::json!({"GetVersion": {"follower_version": PKG_VERSION.to_string()}}) - .to_string() - ) - } -} diff --git a/src/command/dev/protocol/follower/messenger.rs b/src/command/dev/protocol/follower/messenger.rs deleted file mode 100644 index 140fb7a01..000000000 --- a/src/command/dev/protocol/follower/messenger.rs +++ /dev/null @@ -1,231 +0,0 @@ -use std::{fmt::Debug, io::BufReader, time::Duration}; - -use anyhow::anyhow; -use apollo_federation_types::build::SubgraphDefinition; -use crossbeam_channel::{Receiver, Sender}; -use interprocess::local_socket::traits::Stream; - -use crate::command::dev::protocol::{ - create_socket_name, socket_read, socket_write, FollowerMessage, LeaderMessageKind, - SubgraphKeys, SubgraphName, -}; -use crate::{RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION}; - -#[derive(Clone, Debug)] -pub struct FollowerMessenger { - kind: FollowerMessengerKind, -} - -impl FollowerMessenger { - /// Create a [`FollowerMessenger`] for the main session that can talk to itself via a channel. - pub fn from_main_session( - follower_message_sender: Sender, - leader_message_receiver: Receiver, - ) -> Self { - Self { - kind: FollowerMessengerKind::from_main_session( - follower_message_sender, - leader_message_receiver, - ), - } - } - - /// Create a [`FollowerMessenger`] for an attached session that can talk to the main session via a socket. - pub fn from_attached_session(raw_socket_name: &str) -> Self { - Self { - kind: FollowerMessengerKind::from_attached_session(raw_socket_name.to_string()), - } - } - - /// Send a health check to the main session once every second to make sure it is alive. - /// - /// This is function will block indefinitely and should be run from a separate thread. - pub fn health_check(&self) -> RoverResult<()> { - loop { - if let Err(e) = - self.message_leader(FollowerMessage::health_check(self.is_from_main_session())?) - { - break Err(e); - } - std::thread::sleep(Duration::from_secs(1)); - } - } - - /// Send a version check to the main session - pub fn version_check(&self) -> RoverResult<()> { - self.message_leader(FollowerMessage::get_version(self.is_from_main_session()))?; - Ok(()) - } - - /// Request information about the current subgraphs in a session - pub fn session_subgraphs(&self) -> RoverResult> { - self.message_leader(FollowerMessage::get_subgraphs(self.is_from_main_session())) - } - - /// Add a subgraph to the main session - pub fn add_subgraph(&self, subgraph: &SubgraphDefinition) -> RoverResult<()> { - self.message_leader(FollowerMessage::add_subgraph( - self.is_from_main_session(), - subgraph, - )?)?; - Ok(()) - } - - /// Update a subgraph in the main session - pub fn update_subgraph(&self, subgraph: &SubgraphDefinition) -> RoverResult<()> { - self.message_leader(FollowerMessage::update_subgraph( - self.is_from_main_session(), - subgraph, - )?)?; - Ok(()) - } - - /// Remove a subgraph from the main session - pub fn remove_subgraph(&self, subgraph: &SubgraphName) -> RoverResult<()> { - self.message_leader(FollowerMessage::remove_subgraph( - self.is_from_main_session(), - subgraph, - )?)?; - Ok(()) - } - - /// Send a message to the leader - fn message_leader( - &self, - follower_message: FollowerMessage, - ) -> RoverResult> { - self.kind.message_leader(follower_message) - } - - fn is_from_main_session(&self) -> bool { - self.kind.is_from_main_session() - } -} - -#[derive(Clone, Debug)] -enum FollowerMessengerKind { - FromMainSession { - follower_message_sender: Sender, - leader_message_receiver: Receiver, - }, - FromAttachedSession { - raw_socket_name: String, - }, -} - -impl FollowerMessengerKind { - fn from_main_session( - follower_message_sender: Sender, - leader_message_receiver: Receiver, - ) -> Self { - Self::FromMainSession { - follower_message_sender, - leader_message_receiver, - } - } - - fn from_attached_session(raw_socket_name: String) -> Self { - Self::FromAttachedSession { raw_socket_name } - } - - fn message_leader( - &self, - follower_message: FollowerMessage, - ) -> RoverResult> { - use FollowerMessengerKind::*; - follower_message.print(); - let leader_message = match self { - FromMainSession { - follower_message_sender, - leader_message_receiver, - } => { - tracing::trace!("main session sending follower message on channel"); - follower_message_sender.send(follower_message)?; - tracing::trace!("main session reading leader message from channel"); - let leader_message = leader_message_receiver.recv().map_err(|e| { - RoverError::new(anyhow!("the main process failed to update itself").context(e)) - }); - - tracing::trace!("main session received leader message from channel"); - - leader_message - } - FromAttachedSession { raw_socket_name } => { - let socket_name = create_socket_name(raw_socket_name)?; - let stream = Stream::connect(socket_name).map_err(|_| { - let mut err = RoverError::new(anyhow!( - "there is not a main `rover dev` process to report updates to" - )); - err.set_suggestion(RoverErrorSuggestion::SubmitIssue); - err - })?; - - let mut stream = BufReader::new(stream); - - tracing::trace!("attached session sending follower message on socket"); - // send our message over the socket - socket_write(&follower_message, &mut stream)?; - - tracing::trace!("attached session reading leader message from socket"); - // wait for our message to be read by the other socket handler - // then read the response that was written back to the socket - socket_read(&mut stream).map_err(|e| { - RoverError::new( - anyhow!( - "this process did not receive a message from the main process after sending {:?}", - &follower_message - ) - .context(e), - ) - }) - } - }?; - - self.handle_leader_message(&leader_message) - } - - fn handle_leader_message( - &self, - leader_message: &LeaderMessageKind, - ) -> RoverResult> { - leader_message.print(); - match leader_message { - LeaderMessageKind::GetVersion { - leader_version, - follower_version: _, - } => { - self.require_same_version(leader_version)?; - Ok(None) - } - LeaderMessageKind::LeaderSessionInfo { subgraphs } => Ok(Some(subgraphs.to_vec())), - _ => Ok(None), - } - } - - fn require_same_version(&self, leader_version: &str) -> RoverResult<()> { - if leader_version != PKG_VERSION { - let mut err = RoverError::new(anyhow!( - "The main process is running version {}, and this process is running version {}.", - &leader_version, - PKG_VERSION - )); - err.set_suggestion(RoverErrorSuggestion::Adhoc( - "You should use the same version of `rover` to run `rover dev` sessions" - .to_string(), - )); - Err(err) - } else { - Ok(()) - } - } - - fn is_from_main_session(&self) -> bool { - matches!( - self, - Self::FromMainSession { - follower_message_sender: _, - leader_message_receiver: _ - } - ) - } -} diff --git a/src/command/dev/protocol/follower/mod.rs b/src/command/dev/protocol/follower/mod.rs deleted file mode 100644 index d2b3685cd..000000000 --- a/src/command/dev/protocol/follower/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -mod message; -mod messenger; - -pub use message::*; -pub use messenger::*; - -use crossbeam_channel::{bounded, Receiver, Sender}; - -#[derive(Debug, Clone)] -pub struct FollowerChannel { - pub sender: Sender, - pub receiver: Receiver, -} - -impl FollowerChannel { - pub fn new() -> Self { - let (sender, receiver) = bounded(0); - - Self { sender, receiver } - } -} diff --git a/src/command/dev/protocol/leader.rs b/src/command/dev/protocol/leader.rs deleted file mode 100644 index d4e2ae6e3..000000000 --- a/src/command/dev/protocol/leader.rs +++ /dev/null @@ -1,647 +0,0 @@ -use std::str::FromStr; -use std::{ - collections::{hash_map::Entry::Vacant, HashMap}, - fmt::Debug, - io::BufReader, - net::TcpListener, -}; - -use anyhow::{anyhow, Context}; -use apollo_federation_types::{ - build::SubgraphDefinition, - config::{FederationVersion, SupergraphConfig}, -}; -use camino::Utf8PathBuf; -use crossbeam_channel::{bounded, Receiver, Sender}; -use futures::TryFutureExt; -use interprocess::local_socket::traits::{ListenerExt, Stream}; -use interprocess::local_socket::ListenerOptions; -use serde::{Deserialize, Serialize}; -use tracing::{info, warn}; - -use crate::{ - command::dev::{ - compose::ComposeRunner, - do_dev::log_err_and_continue, - router::{RouterConfigHandler, RouterRunner}, - OVERRIDE_DEV_COMPOSITION_VERSION, - }, - options::PluginOpts, - utils::client::StudioClientConfig, - RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION, -}; - -use super::{ - create_socket_name, - socket::{handle_socket_error, socket_read, socket_write}, - types::{ - CompositionResult, SubgraphEntry, SubgraphKey, SubgraphKeys, SubgraphName, SubgraphSdl, - }, - FollowerChannel, FollowerMessage, FollowerMessageKind, -}; - -#[derive(Debug)] -pub struct LeaderSession { - subgraphs: HashMap, - raw_socket_name: String, - compose_runner: ComposeRunner, - router_runner: Option, - follower_channel: FollowerChannel, - leader_channel: LeaderChannel, - federation_version: FederationVersion, - supergraph_config: Option, -} - -impl LeaderSession { - /// Create a new [`LeaderSession`] that is responsible for running composition and the router - /// It listens on a socket for incoming messages for subgraph changes, in addition to watching - /// its own subgraph - /// Returns: - /// Ok(Some(Self)) when successfully initiated - /// Ok(None) when a LeaderSession already exists for that address - /// Err(RoverError) when something went wrong. - #[allow(clippy::too_many_arguments)] - pub async fn new( - override_install_path: Option, - client_config: &StudioClientConfig, - leader_channel: LeaderChannel, - follower_channel: FollowerChannel, - plugin_opts: PluginOpts, - supergraph_config: &Option, - router_config_handler: RouterConfigHandler, - license: Option, - ) -> RoverResult> { - let raw_socket_name = router_config_handler.get_raw_socket_name(); - let router_socket_addr = router_config_handler.get_router_address(); - let socket_name = create_socket_name(&raw_socket_name)?; - - if let Ok(stream) = Stream::connect(socket_name.clone()) { - // write to the socket, so we don't make the other session deadlock waiting on a message - let mut stream = BufReader::new(stream); - socket_write(&FollowerMessage::health_check(false)?, &mut stream)?; - let _ = LeaderSession::socket_read(&mut stream); - // return early so an attached session can be created instead - return Ok(None); - } - - tracing::info!("initializing main `rover dev process`"); - // if we can't connect to the socket, we should start it and listen for incoming - // subgraph events - // - // remove the socket file before starting in case it was here from last time - // if we can't connect to it, it's safe to remove - let _ = std::fs::remove_file(&raw_socket_name); - - if TcpListener::bind(router_socket_addr).is_err() { - let mut err = - RoverError::new(anyhow!("You cannot bind the router to '{}' because that address is already in use by another process on this machine.", &router_socket_addr)); - err.set_suggestion(RoverErrorSuggestion::Adhoc( - format!("Try setting a different port for the router to bind to with the `--supergraph-port` argument, or shut down the process bound to '{}'.", &router_socket_addr) - )); - return Err(err); - } - - // create a [`ComposeRunner`] that will be in charge of composing our supergraph - let mut compose_runner = ComposeRunner::new( - plugin_opts.clone(), - override_install_path.clone(), - client_config.clone(), - router_config_handler.get_supergraph_schema_path(), - ); - - // create a [`RouterRunner`] that we will use to spawn the router when we have a successful composition - let mut router_runner = RouterRunner::new( - router_config_handler.get_supergraph_schema_path(), - router_config_handler.get_router_config_path(), - plugin_opts.clone(), - router_socket_addr, - router_config_handler.get_router_listen_path(), - override_install_path, - client_config.clone(), - license, - ); - - let config_fed_version = supergraph_config - .clone() - .and_then(|sc| sc.get_federation_version()); - - let federation_version = Self::get_federation_version( - config_fed_version, - OVERRIDE_DEV_COMPOSITION_VERSION.clone(), - )?; - - // install plugins before proceeding - router_runner.maybe_install_router().await?; - compose_runner - .maybe_install_supergraph(federation_version.clone()) - .await?; - - router_config_handler.start()?; - - Ok(Some(Self { - subgraphs: HashMap::new(), - raw_socket_name, - compose_runner, - router_runner: Some(router_runner), - follower_channel, - leader_channel, - federation_version, - supergraph_config: supergraph_config.clone(), - })) - } - - /// Calculates what the correct version of Federation should be, based on the - /// value of the given environment variable and the supergraph_schema - /// - /// The order of precedence is: - /// Environment Variable -> Schema -> Default (Latest) - fn get_federation_version( - sc_config_version: Option, - env_var: Option, - ) -> RoverResult { - let env_var_version = if let Some(version) = env_var { - match FederationVersion::from_str(&format!("={}", version)) { - Ok(v) => Some(v), - Err(e) => { - warn!("could not parse version from environment variable '{:}'", e); - info!("will check supergraph schema next..."); - None - } - } - } else { - None - }; - - env_var_version.map(Ok).unwrap_or_else(|| { - Ok(sc_config_version.unwrap_or_else(|| { - warn!("federation version not found in supergraph schema"); - info!("using latest version instead"); - FederationVersion::LatestFedTwo - })) - }) - } - - /// Start the session by watching for incoming subgraph updates and re-composing when needed - pub async fn listen_for_all_subgraph_updates( - &mut self, - ready_sender: futures::channel::mpsc::Sender<()>, - ) -> RoverResult<()> { - self.receive_messages_from_attached_sessions()?; - self.receive_all_subgraph_updates(ready_sender).await; - Ok(()) - } - - /// Listen for incoming subgraph updates and re-compose the supergraph - async fn receive_all_subgraph_updates( - &mut self, - mut ready_sender: futures::channel::mpsc::Sender<()>, - ) -> ! { - ready_sender.try_send(()).unwrap(); - loop { - tracing::trace!("main session waiting for follower message"); - let follower_message = self.follower_channel.receiver.recv().unwrap(); - let leader_message = self - .handle_follower_message_kind(follower_message.kind()) - .await; - - if !follower_message.is_from_main_session() { - leader_message.print(); - } - let debug_message = format!("could not send message {:?}", &leader_message); - tracing::trace!("main session sending leader message"); - - self.leader_channel - .sender - .send(leader_message) - .expect(&debug_message); - tracing::trace!("main session sent leader message"); - } - } - - /// Listen on the socket for incoming [`FollowerMessageKind`] messages. - fn receive_messages_from_attached_sessions(&self) -> RoverResult<()> { - let socket_name = create_socket_name(&self.raw_socket_name)?; - let listener = ListenerOptions::new() - .name(socket_name) - .create_sync() - .with_context(|| { - format!( - "could not start local socket server at {:?}", - &self.raw_socket_name - ) - })?; - tracing::info!( - "connected to socket {}, waiting for messages", - &self.raw_socket_name - ); - - let follower_message_sender = self.follower_channel.sender.clone(); - let leader_message_receiver = self.leader_channel.receiver.clone(); - tokio::task::spawn_blocking(move || { - listener - .incoming() - .filter_map(handle_socket_error) - .for_each(|stream| { - let mut stream = BufReader::new(stream); - let follower_message = Self::socket_read(&mut stream); - let _ = match follower_message { - Ok(message) => { - let debug_message = format!("{:?}", &message); - tracing::debug!("the main `rover dev` process read a message from the socket, sending an update message on the channel"); - follower_message_sender.send(message).unwrap_or_else(|_| { - panic!("failed to send message on channel: {}", &debug_message) - }); - tracing::debug!("the main `rover dev` process is processing the message from the socket"); - let leader_message = leader_message_receiver.recv().expect("failed to receive message on the channel"); - tracing::debug!("the main `rover dev` process is sending the result on the socket"); - Self::socket_write(leader_message, &mut stream) - } - Err(e) => { - tracing::debug!("the main `rover dev` process could not read incoming socket message, skipping channel update"); - Err(e) - } - }.map_err(log_err_and_continue); - }); - }); - - Ok(()) - } - - /// Adds a subgraph to the internal supergraph representation. - async fn add_subgraph(&mut self, subgraph_entry: &SubgraphEntry) -> LeaderMessageKind { - let is_first_subgraph = self.subgraphs.is_empty(); - let ((name, url), sdl) = subgraph_entry; - - if let Vacant(e) = self.subgraphs.entry((name.to_string(), url.clone())) { - e.insert(sdl.to_string()); - - // Followers add subgraphs, but sometimes those subgraphs depend on each other - // (e.g., through extending a type in another subgraph). When that happens, - // composition fails until _all_ subgraphs are loaded in. This acknowledges the - // follower's message when we haven't loaded in all the subgraphs, deferring - // composition until we have at least the number of subgraphs represented in the - // supergraph.yaml file - // - // This applies only when the supergraph.yaml file is present. Without it, we will - // try composition each time we add a subgraph - if let Some(supergraph_config) = self.supergraph_config.clone() { - let subgraphs_from_config = supergraph_config.into_iter(); - if self.subgraphs.len() < subgraphs_from_config.len() { - return LeaderMessageKind::MessageReceived; - } - } - - let composition_result = self.compose().await; - if let Err(composition_err) = composition_result { - LeaderMessageKind::error(composition_err) - } else if composition_result.transpose().is_some() && !is_first_subgraph { - LeaderMessageKind::add_subgraph_composition_success(name) - } else { - LeaderMessageKind::MessageReceived - } - } else { - LeaderMessageKind::error( - RoverError::new(anyhow!( - "subgraph with name '{}' and url '{}' already exists", - &name, - &url - )) - .to_string(), - ) - } - } - - /// Updates a subgraph in the internal supergraph representation. - async fn update_subgraph(&mut self, subgraph_entry: &SubgraphEntry) -> LeaderMessageKind { - let ((name, url), sdl) = &subgraph_entry; - if let Some(prev_sdl) = self.subgraphs.get_mut(&(name.to_string(), url.clone())) { - if prev_sdl != sdl { - *prev_sdl = sdl.to_string(); - let composition_result = self.compose().await; - if let Err(composition_err) = composition_result { - LeaderMessageKind::error(composition_err) - } else if composition_result.transpose().is_some() { - LeaderMessageKind::update_subgraph_composition_success(name) - } else { - LeaderMessageKind::message_received() - } - } else { - LeaderMessageKind::message_received() - } - } else { - self.add_subgraph(subgraph_entry).await - } - } - - /// Removes a subgraph from the internal subgraph representation. - async fn remove_subgraph(&mut self, subgraph_name: &SubgraphName) -> LeaderMessageKind { - let found = self - .subgraphs - .keys() - .find(|(name, _)| name == subgraph_name) - .cloned(); - - if let Some((name, url)) = found { - self.subgraphs.remove(&(name.to_string(), url)); - let composition_result = self.compose().await; - if let Err(composition_err) = composition_result { - LeaderMessageKind::error(composition_err) - } else if composition_result.transpose().is_some() { - LeaderMessageKind::remove_subgraph_composition_success(&name) - } else { - LeaderMessageKind::message_received() - } - } else { - LeaderMessageKind::message_received() - } - } - - /// Reruns composition, which triggers the router to reload. - async fn compose(&mut self) -> CompositionResult { - match self - .compose_runner - .run(&mut self.supergraph_config_internal_representation()) - .and_then(|maybe_new_schema| async { - if maybe_new_schema.is_some() { - if let Some(runner) = self.router_runner.as_mut() { - if let Err(err) = runner.spawn().await { - return Err(err.to_string()); - } - } - } - Ok(maybe_new_schema) - }) - .await - { - Ok(res) => Ok(res), - Err(e) => { - if let Some(runner) = self.router_runner.as_mut() { - let _ = runner.kill().await.map_err(log_err_and_continue); - } - Err(e) - } - } - } - - /// Reads a [`FollowerMessage`] from an open socket connection. - fn socket_read( - stream: &mut BufReader, - ) -> RoverResult { - socket_read(stream) - .inspect(|message| { - tracing::debug!("leader received message {:?}", &message); - }) - .map_err(|e| { - e.context("the main `rover dev` process did not receive a valid incoming message") - .into() - }) - } - - /// Writes a [`LeaderMessageKind`] to an open socket connection. - fn socket_write( - message: LeaderMessageKind, - stream: &mut BufReader, - ) -> RoverResult<()> { - tracing::debug!("leader sending message {:?}", message); - socket_write(&message, stream) - } - - /// Gets the supergraph configuration from the internal state. This can different from the - /// supergraph.yaml file as it represents intermediate states of composition while adding - /// subgraphs to the internal representation of that file - fn supergraph_config_internal_representation(&self) -> SupergraphConfig { - let mut supergraph_config: SupergraphConfig = self - .subgraphs - .iter() - .map(|((name, url), sdl)| SubgraphDefinition::new(name, url.to_string(), sdl)) - .collect::>() - .into(); - - supergraph_config.set_federation_version(self.federation_version.clone()); - supergraph_config - } - - /// Gets the list of subgraphs running in this session - fn get_subgraphs(&self) -> SubgraphKeys { - tracing::debug!("notifying new `rover dev` process about existing subgraphs"); - self.subgraphs.keys().cloned().collect() - } - - pub async fn shutdown(&mut self) { - let router_runner = self.router_runner.take(); - let raw_socket_name = self.raw_socket_name.clone(); - if let Some(mut runner) = router_runner { - let _ = runner.kill().await.map_err(log_err_and_continue); - } - let _ = std::fs::remove_file(&raw_socket_name); - std::process::exit(1) - } - - /// Handles a follower message by updating the internal subgraph representation if needed, - /// and returns a [`LeaderMessageKind`] that can be sent over a socket or printed by the main session - async fn handle_follower_message_kind( - &mut self, - follower_message: &FollowerMessageKind, - ) -> LeaderMessageKind { - use FollowerMessageKind::*; - match follower_message { - AddSubgraph { subgraph_entry } => self.add_subgraph(subgraph_entry).await, - - UpdateSubgraph { subgraph_entry } => self.update_subgraph(subgraph_entry).await, - - RemoveSubgraph { subgraph_name } => self.remove_subgraph(subgraph_name).await, - - GetSubgraphs => LeaderMessageKind::current_subgraphs(self.get_subgraphs()), - - Shutdown => { - self.shutdown().await; - LeaderMessageKind::message_received() - } - - HealthCheck => LeaderMessageKind::message_received(), - - GetVersion { follower_version } => LeaderMessageKind::get_version(follower_version), - } - } -} - -impl Drop for LeaderSession { - fn drop(&mut self) { - let router_runner = self.router_runner.take(); - let socket_addr = self.raw_socket_name.clone(); - tokio::task::spawn(async move { - if let Some(mut runner) = router_runner { - let _ = runner.kill().await.map_err(log_err_and_continue); - } - let _ = std::fs::remove_file(&socket_addr); - std::process::exit(1) - }); - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum LeaderMessageKind { - GetVersion { - follower_version: String, - leader_version: String, - }, - LeaderSessionInfo { - subgraphs: SubgraphKeys, - }, - CompositionSuccess { - action: String, - }, - ErrorNotification { - error: String, - }, - MessageReceived, -} - -impl LeaderMessageKind { - pub fn get_version(follower_version: &str) -> Self { - Self::GetVersion { - follower_version: follower_version.to_string(), - leader_version: PKG_VERSION.to_string(), - } - } - - pub fn current_subgraphs(subgraphs: SubgraphKeys) -> Self { - Self::LeaderSessionInfo { subgraphs } - } - - pub fn error(error: String) -> Self { - Self::ErrorNotification { error } - } - - pub fn add_subgraph_composition_success(subgraph_name: &SubgraphName) -> Self { - Self::CompositionSuccess { - action: format!("adding the '{}' subgraph", subgraph_name), - } - } - - pub fn update_subgraph_composition_success(subgraph_name: &SubgraphName) -> Self { - Self::CompositionSuccess { - action: format!("updating the '{}' subgraph", subgraph_name), - } - } - - pub fn remove_subgraph_composition_success(subgraph_name: &SubgraphName) -> Self { - Self::CompositionSuccess { - action: format!("removing the '{}' subgraph", subgraph_name), - } - } - - pub fn message_received() -> Self { - Self::MessageReceived - } - - pub fn print(&self) { - match self { - LeaderMessageKind::ErrorNotification { error } => { - eprintln!("{}", error); - } - LeaderMessageKind::CompositionSuccess { action } => { - eprintln!("successfully composed after {}", &action); - } - LeaderMessageKind::LeaderSessionInfo { subgraphs } => { - let subgraphs = match subgraphs.len() { - 0 => "no subgraphs".to_string(), - 1 => "1 subgraph".to_string(), - l => format!("{} subgraphs", l), - }; - tracing::info!("the main `rover dev` process currently has {}", subgraphs); - } - LeaderMessageKind::GetVersion { - leader_version, - follower_version: _, - } => { - tracing::debug!( - "the main `rover dev` process is running version {}", - &leader_version - ); - } - LeaderMessageKind::MessageReceived => { - tracing::debug!( - "the main `rover dev` process acknowledged the message, but did not take an action" - ) - } - } - } -} - -#[derive(Debug, Clone)] -pub struct LeaderChannel { - pub sender: Sender, - pub receiver: Receiver, -} - -impl LeaderChannel { - pub fn new() -> Self { - let (sender, receiver) = bounded(0); - - Self { sender, receiver } - } -} - -#[cfg(test)] -mod tests { - use apollo_federation_types::config::FederationVersion::{ExactFedOne, ExactFedTwo}; - use rstest::rstest; - use semver::Version; - use speculoos::assert_that; - use speculoos::prelude::ResultAssertions; - - use super::*; - - #[rstest] - fn leader_message_can_get_version() { - let follower_version = PKG_VERSION.to_string(); - let message = LeaderMessageKind::get_version(&follower_version); - let expected_message_json = serde_json::to_string(&message).unwrap(); - assert_eq!( - expected_message_json, - serde_json::json!({ - "GetVersion": { - "follower_version": follower_version, - "leader_version": follower_version, - } - }) - .to_string() - ) - } - - #[rstest] - #[case::env_var_no_yaml_fed_two(Some(String::from("2.3.4")), None, ExactFedTwo(Version::parse("2.3.4").unwrap()), false)] - #[case::env_var_no_yaml_fed_one(Some(String::from("0.40.0")), None, ExactFedOne(Version::parse("0.40.0").unwrap()), false)] - #[case::env_var_no_yaml_unsupported_fed_version( - Some(String::from("1.0.1")), - None, - FederationVersion::LatestFedTwo, - false - )] - #[case::nonsense_env_var_no_yaml( - Some(String::from("crackers")), - None, - FederationVersion::LatestFedTwo, - false - )] - #[case::env_var_with_yaml_fed_two(Some(String::from("2.3.4")), Some(ExactFedTwo(Version::parse("2.3.4").unwrap())), ExactFedTwo(Version::parse("2.3.4").unwrap()), false)] - #[case::env_var_with_yaml_fed_one(Some(String::from("0.50.0")), Some(ExactFedTwo(Version::parse("2.3.5").unwrap())), ExactFedOne(Version::parse("0.50.0").unwrap()), false)] - #[case::nonsense_env_var_with_yaml(Some(String::from("cheese")), Some(ExactFedTwo(Version::parse("2.3.5").unwrap())), ExactFedTwo(Version::parse("2.3.5").unwrap()), false)] - #[case::yaml_no_env_var_fed_two(None, Some(ExactFedTwo(Version::parse("2.3.5").unwrap())), ExactFedTwo(Version::parse("2.3.5").unwrap()), false)] - #[case::yaml_no_env_var_fed_one(None, Some(ExactFedOne(Version::parse("0.69.0").unwrap())), ExactFedOne(Version::parse("0.69.0").unwrap()), false)] - #[case::nothing_grabs_latest(None, None, FederationVersion::LatestFedTwo, false)] - fn federation_version_respects_precedence_order( - #[case] env_var_value: Option, - #[case] config_value: Option, - #[case] expected_value: FederationVersion, - #[case] error_expected: bool, - ) { - let res = LeaderSession::get_federation_version(config_value, env_var_value); - if error_expected { - assert_that(&res).is_err(); - } else { - assert_that(&res.unwrap()).is_equal_to(expected_value); - } - } -} diff --git a/src/command/dev/protocol/mod.rs b/src/command/dev/protocol/mod.rs deleted file mode 100644 index 72eb1572c..000000000 --- a/src/command/dev/protocol/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -use interprocess::local_socket::{GenericFilePath, Name, ToFsName}; - -pub use follower::*; -pub use leader::*; -pub(crate) use socket::*; -pub use types::*; - -mod follower; -mod leader; -mod socket; -mod types; - -pub(crate) fn create_socket_name(raw_socket_name: &str) -> std::io::Result { - raw_socket_name.to_fs_name::() -} diff --git a/src/command/dev/protocol/socket.rs b/src/command/dev/protocol/socket.rs deleted file mode 100644 index ccb1ba2eb..000000000 --- a/src/command/dev/protocol/socket.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{ - fmt::Debug, - io::{self, BufRead, BufReader, Write}, -}; - -use anyhow::{anyhow, Context, Error}; -use interprocess::local_socket::Stream; -use serde::{de::DeserializeOwned, Serialize}; - -use crate::RoverResult; - -pub(crate) fn handle_socket_error(conn: io::Result) -> Option { - match conn { - Ok(val) => Some(val), - Err(error) => { - eprintln!("incoming connection failed: {}", error); - None - } - } -} - -pub(crate) fn socket_read(stream: &mut BufReader) -> std::result::Result -where - B: Serialize + DeserializeOwned + Debug, -{ - let mut incoming_message = String::new(); - - match stream.read_line(&mut incoming_message) { - Ok(_) => { - if incoming_message.is_empty() { - Err(anyhow!("incoming message was empty")) - } else { - let incoming_message: B = - serde_json::from_str(&incoming_message).with_context(|| { - format!( - "incoming message '{}' was not valid JSON", - &incoming_message - ) - })?; - Ok(incoming_message) - } - } - Err(e) => Err(Error::new(e).context("could not read incoming message")), - } -} - -pub(crate) fn socket_write(message: &A, stream: &mut BufReader) -> RoverResult<()> -where - A: Serialize + DeserializeOwned + Debug, -{ - let outgoing_json = serde_json::to_string(message) - .with_context(|| format!("could not convert outgoing message {:?} to json", &message))?; - let outgoing_string = format!("{}\n", &outgoing_json); - stream - .get_mut() - .write_all(outgoing_string.as_bytes()) - .with_context(|| { - format!( - "could not write outgoing message {:?} to socket", - &outgoing_json - ) - })?; - Ok(()) -} diff --git a/src/command/dev/protocol/types.rs b/src/command/dev/protocol/types.rs deleted file mode 100644 index 8bb2b52d6..000000000 --- a/src/command/dev/protocol/types.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::command::supergraph::compose::CompositionOutput; - -use anyhow::Result; -use apollo_federation_types::build::SubgraphDefinition; -use reqwest::Url; - -pub type SubgraphName = String; -pub type SubgraphUrl = Url; -pub type SubgraphSdl = String; -pub type SubgraphKey = (SubgraphName, SubgraphUrl); -pub type SubgraphKeys = Vec; -pub type SubgraphEntry = (SubgraphKey, SubgraphSdl); -pub type CompositionResult = std::result::Result, String>; - -pub(crate) fn sdl_from_definition(subgraph_definition: &SubgraphDefinition) -> SubgraphSdl { - subgraph_definition.sdl.to_string() -} - -pub(crate) fn name_from_definition(subgraph_definition: &SubgraphDefinition) -> SubgraphName { - subgraph_definition.name.to_string() -} - -pub(crate) fn url_from_definition(subgraph_definition: &SubgraphDefinition) -> Result { - Ok(subgraph_definition.url.parse()?) -} - -pub(crate) fn key_from_definition(subgraph_definition: &SubgraphDefinition) -> Result { - Ok(( - name_from_definition(subgraph_definition), - url_from_definition(subgraph_definition)?, - )) -} - -pub(crate) fn entry_from_definition( - subgraph_definition: &SubgraphDefinition, -) -> Result { - Ok(( - key_from_definition(subgraph_definition)?, - sdl_from_definition(subgraph_definition), - )) -} diff --git a/src/command/dev/schema.rs b/src/command/dev/schema.rs deleted file mode 100644 index 999791268..000000000 --- a/src/command/dev/schema.rs +++ /dev/null @@ -1,195 +0,0 @@ -use std::{net::SocketAddr, time::Duration}; - -use anyhow::anyhow; -use apollo_federation_types::config::{SchemaSource, SupergraphConfig}; -use reqwest::Url; - -use rover_client::blocking::StudioClient; - -use crate::options::ProfileOpt; -use crate::{ - command::dev::{ - netstat::normalize_loopback_urls, protocol::FollowerMessenger, - watcher::SubgraphSchemaWatcher, SupergraphOpts, - }, - options::OptionalSubgraphOpts, - utils::client::StudioClientConfig, - RoverError, RoverErrorSuggestion, RoverResult, -}; - -impl OptionalSubgraphOpts { - pub fn get_subgraph_watcher( - &self, - router_socket_addr: SocketAddr, - client_config: &StudioClientConfig, - follower_messenger: FollowerMessenger, - ) -> RoverResult { - tracing::info!("checking version"); - follower_messenger.version_check()?; - tracing::info!("checking for existing subgraphs"); - let session_subgraphs = follower_messenger.session_subgraphs()?; - let url = self.prompt_for_url()?; - let normalized_user_urls = normalize_loopback_urls(&url); - let normalized_supergraph_urls = normalize_loopback_urls( - &Url::parse(&format!("http://{}", router_socket_addr)).unwrap(), - ); - - for normalized_user_url in &normalized_user_urls { - for normalized_supergraph_url in &normalized_supergraph_urls { - if normalized_supergraph_url == normalized_user_url { - let mut err = RoverError::new(anyhow!("The subgraph argument `--url {}` conflicts with the supergraph argument `--supergraph-port {}`", &url, normalized_supergraph_url.port().unwrap())); - if session_subgraphs.is_none() { - err.set_suggestion(RoverErrorSuggestion::Adhoc("Set the `--supergraph-port` flag to a different port to start the local supergraph.".to_string())) - } else { - err.set_suggestion(RoverErrorSuggestion::Adhoc("Start your subgraph on a different port and re-run this command with the new `--url`.".to_string())) - } - return Err(err); - } - } - } - - let name = self.prompt_for_name()?; - let schema = self.prompt_for_schema()?; - - if let Some(session_subgraphs) = session_subgraphs { - for (session_subgraph_name, session_subgraph_url) in session_subgraphs { - if session_subgraph_name == name { - return Err(RoverError::new(anyhow!( - "subgraph with name '{}' is already running in this `rover dev` session", - &name - ))); - } - let normalized_session_urls = normalize_loopback_urls(&session_subgraph_url); - for normalized_user_url in &normalized_user_urls { - for normalized_session_url in &normalized_session_urls { - if normalized_session_url == normalized_user_url { - return Err(RoverError::new(anyhow!( - "subgraph with url '{}' is already running in this `rover dev` session", - &url - ))); - } - } - } - } - } - - if let Some(schema) = schema { - SubgraphSchemaWatcher::new_from_file_path( - (name, url), - schema, - follower_messenger, - self.subgraph_retries, - ) - } else { - let client = client_config - .get_builder() - .with_timeout(Duration::from_secs(5)) - .build()?; - SubgraphSchemaWatcher::new_from_url( - (name, url.clone()), - client, - follower_messenger, - self.subgraph_polling_interval, - None, - self.subgraph_retries, - url, - ) - } - } -} - -impl SupergraphOpts { - pub async fn get_subgraph_watchers( - &self, - client_config: &StudioClientConfig, - supergraph_config: Option, - follower_messenger: FollowerMessenger, - polling_interval: u64, - profile_opt: &ProfileOpt, - subgraph_retries: u64, - ) -> RoverResult>> { - if supergraph_config.is_none() { - return Ok(None); - } - - tracing::info!("checking version"); - follower_messenger.version_check()?; - - let client = client_config - .get_builder() - .with_timeout(Duration::from_secs(5)) - .build()?; - let mut studio_client: Option = None; - - // WARNING: from here on I took the asynch branch's code; should be validated against main - let mut res = Vec::new(); - for (yaml_subgraph_name, subgraph_config) in supergraph_config.unwrap().into_iter() { - let routing_url = subgraph_config - .routing_url - .map(|url_str| Url::parse(&url_str).map_err(RoverError::from)) - .transpose()?; - let elem = match subgraph_config.schema { - SchemaSource::File { file } => { - let routing_url = routing_url.ok_or_else(|| { - anyhow!("`routing_url` must be set when using a local schema file") - })?; - - SubgraphSchemaWatcher::new_from_file_path( - (yaml_subgraph_name, routing_url), - file, - follower_messenger.clone(), - subgraph_retries, - ) - } - SchemaSource::SubgraphIntrospection { - subgraph_url, - introspection_headers, - } => SubgraphSchemaWatcher::new_from_url( - (yaml_subgraph_name, subgraph_url.clone()), - client.clone(), - follower_messenger.clone(), - polling_interval, - introspection_headers, - subgraph_retries, - subgraph_url, - ), - SchemaSource::Sdl { sdl } => { - let routing_url = routing_url.ok_or_else(|| { - anyhow!("`routing_url` must be set when providing SDL directly") - })?; - SubgraphSchemaWatcher::new_from_sdl( - (yaml_subgraph_name, routing_url), - sdl, - follower_messenger.clone(), - subgraph_retries, - ) - } - SchemaSource::Subgraph { - graphref, - subgraph: graphos_subgraph_name, - } => { - let studio_client = if let Some(studio_client) = studio_client.as_ref() { - studio_client - } else { - let client = client_config.get_authenticated_client(profile_opt)?; - studio_client = Some(client); - studio_client.as_ref().unwrap() - }; - - SubgraphSchemaWatcher::new_from_graph_ref( - &graphref, - graphos_subgraph_name, - routing_url, - yaml_subgraph_name, - follower_messenger.clone(), - studio_client, - subgraph_retries, - ) - .await - } - }; - res.push(elem?); - } - Ok(Some(res)) - } -} diff --git a/src/command/dev/types.rs b/src/command/dev/types.rs new file mode 100644 index 000000000..89668912e --- /dev/null +++ b/src/command/dev/types.rs @@ -0,0 +1,11 @@ +use reqwest::Url; + +use crate::command::supergraph::compose::CompositionOutput; + +pub type SubgraphName = String; +pub type SubgraphUrl = Url; +pub type SubgraphSdl = String; +pub type SubgraphKey = (SubgraphName, SubgraphUrl); +pub type SubgraphKeys = Vec; +pub type SubgraphEntry = (SubgraphKey, SubgraphSdl); +pub type CompositionResult = std::result::Result, String>; diff --git a/src/command/dev/watcher.rs b/src/command/dev/watcher.rs deleted file mode 100644 index 623dc6d31..000000000 --- a/src/command/dev/watcher.rs +++ /dev/null @@ -1,342 +0,0 @@ -use std::str::FromStr; -use std::{collections::HashMap, time::Duration}; - -use anyhow::{anyhow, Context}; -use apollo_federation_types::build::SubgraphDefinition; -use camino::{Utf8Path, Utf8PathBuf}; -use reqwest::Client; -use tokio::time::MissedTickBehavior::Delay; -use url::Url; - -use rover_client::blocking::StudioClient; -use rover_client::operations::subgraph::fetch; -use rover_client::operations::subgraph::fetch::SubgraphFetchInput; -use rover_client::shared::GraphRef; -use rover_std::{errln, Fs}; - -use crate::{ - command::dev::{ - introspect::{IntrospectRunnerKind, UnknownIntrospectRunner}, - protocol::{FollowerMessenger, SubgraphKey}, - }, - RoverError, RoverErrorSuggestion, RoverResult, -}; - -#[derive(Debug)] -pub struct SubgraphSchemaWatcher { - schema_watcher_kind: SubgraphSchemaWatcherKind, - subgraph_key: SubgraphKey, - message_sender: FollowerMessenger, - subgraph_retries: u64, - subgraph_retry_countdown: u64, -} - -impl SubgraphSchemaWatcher { - pub fn new_from_file_path

( - subgraph_key: SubgraphKey, - path: P, - message_sender: FollowerMessenger, - subgraph_retries: u64, - ) -> RoverResult - where - P: AsRef, - { - Ok(Self { - schema_watcher_kind: SubgraphSchemaWatcherKind::File(path.as_ref().to_path_buf()), - subgraph_key, - message_sender, - subgraph_retries, - subgraph_retry_countdown: 0, - }) - } - - pub fn new_from_url( - subgraph_key: SubgraphKey, - client: Client, - message_sender: FollowerMessenger, - polling_interval: u64, - headers: Option>, - subgraph_retries: u64, - subgraph_url: Url, - ) -> RoverResult { - let headers = headers.map(|header_map| header_map.into_iter().collect()); - let introspect_runner = IntrospectRunnerKind::Unknown(UnknownIntrospectRunner::new( - subgraph_url, - client, - headers, - )); - Self::new_from_introspect_runner( - subgraph_key, - introspect_runner, - message_sender, - polling_interval, - subgraph_retries, - ) - } - - pub fn new_from_sdl( - subgraph_key: SubgraphKey, - sdl: String, - message_sender: FollowerMessenger, - subgraph_retries: u64, - ) -> RoverResult { - Ok(Self { - schema_watcher_kind: SubgraphSchemaWatcherKind::Once(sdl), - subgraph_key, - message_sender, - subgraph_retries, - subgraph_retry_countdown: 0, - }) - } - - pub async fn new_from_graph_ref( - graph_ref: &str, - graphos_subgraph_name: String, - routing_url: Option, - yaml_subgraph_name: String, - message_sender: FollowerMessenger, - client: &StudioClient, - subgraph_retries: u64, - ) -> RoverResult { - // given a graph_ref and subgraph, run subgraph fetch to - // obtain SDL and add it to subgraph_definition. - let response = fetch::run( - SubgraphFetchInput { - graph_ref: GraphRef::from_str(graph_ref)?, - subgraph_name: graphos_subgraph_name.clone(), - }, - client, - ) - .await - .map_err(RoverError::from)?; - let routing_url = match (routing_url, response.sdl.r#type) { - (Some(routing_url), _) => routing_url, - ( - None, - rover_client::shared::SdlType::Subgraph { - routing_url: Some(graph_registry_routing_url), - }, - ) => graph_registry_routing_url.parse().context(format!( - "Could not parse graph registry routing url {}", - graph_registry_routing_url - ))?, - (None, _) => { - return Err(RoverError::new(anyhow!( - "Could not find routing URL in GraphOS for subgraph {graphos_subgraph_name}" - )) - .with_suggestion(RoverErrorSuggestion::AddRoutingUrlToSupergraphYaml) - .with_suggestion( - RoverErrorSuggestion::PublishSubgraphWithRoutingUrl { - subgraph_name: yaml_subgraph_name, - graph_ref: graph_ref.to_string(), - }, - )); - } - }; - Self::new_from_sdl( - (yaml_subgraph_name, routing_url), - response.sdl.contents, - message_sender, - subgraph_retries, - ) - } - - pub fn new_from_introspect_runner( - subgraph_key: SubgraphKey, - introspect_runner: IntrospectRunnerKind, - message_sender: FollowerMessenger, - polling_interval: u64, - subgraph_retries: u64, - ) -> RoverResult { - Ok(Self { - schema_watcher_kind: SubgraphSchemaWatcherKind::Introspect( - introspect_runner, - polling_interval, - ), - subgraph_key, - message_sender, - subgraph_retries, - subgraph_retry_countdown: 0, - }) - } - - pub async fn get_subgraph_definition_and_maybe_new_runner( - &self, - retry_period: Option, - ) -> RoverResult<(SubgraphDefinition, Option)> { - let (name, url) = self.subgraph_key.clone(); - let (sdl, refresher) = match &self.schema_watcher_kind { - SubgraphSchemaWatcherKind::Introspect(introspect_runner_kind, polling_interval) => { - match introspect_runner_kind { - IntrospectRunnerKind::Graph(graph_runner) => { - let sdl = graph_runner.run().await?; - (sdl, None) - } - IntrospectRunnerKind::Subgraph(subgraph_runner) => { - let sdl = subgraph_runner.run().await?; - (sdl, None) - } - IntrospectRunnerKind::Unknown(unknown_runner) => { - let (sdl, specific_runner) = unknown_runner.run(retry_period).await?; - ( - sdl, - Some(SubgraphSchemaWatcherKind::Introspect( - specific_runner, - *polling_interval, - )), - ) - } - } - } - SubgraphSchemaWatcherKind::File(file_path) => { - let sdl = Fs::read_file(file_path)?; - (sdl, None) - } - SubgraphSchemaWatcherKind::Once(sdl) => (sdl.clone(), None), - }; - - let subgraph_definition = SubgraphDefinition::new(name, url, sdl); - - Ok((subgraph_definition, refresher)) - } - - async fn update_subgraph( - &mut self, - last_message: Option<&String>, - retry_period: Option, - ) -> RoverResult> { - let maybe_update_message = match self - .get_subgraph_definition_and_maybe_new_runner(retry_period) - .await - { - Ok((subgraph_definition, maybe_new_refresher)) => { - if let Some(new_refresher) = maybe_new_refresher { - self.set_schema_refresher(new_refresher); - } - match last_message { - Some(last_message) => { - if &subgraph_definition.sdl != last_message { - if self.subgraph_retry_countdown < self.subgraph_retries { - eprintln!( - "subgraph connectivity restored for {}", - self.subgraph_key.0 - ) - } - self.message_sender.update_subgraph(&subgraph_definition)?; - } - } - None => { - self.message_sender.add_subgraph(&subgraph_definition)?; - } - } - self.subgraph_retry_countdown = self.subgraph_retries; - Some(subgraph_definition.sdl) - } - Err(e) => { - // `subgraph-retries` can be set by the user away from the default value of 0, - // this defaults to Rover's current behaviour. - // - // If a user does set this value to a non-zero one, and we get a non-retryable error - // from one of our subgraphs, we'll retain the old schema we had and continue - // operation. This will happen until the countdown hits 0 at which point the - // subgraph will be disconnected from the supergraph. - // - // Every time we successfully communicate with the subgraph we set the countdown - // back to the maximum value. - // - if self.subgraph_retry_countdown > 0 { - self.subgraph_retry_countdown -= 1; - errln!("error detected communicating with subgraph '{}', schema changes will not be reflected.\nWill retry but subgraph logs should be inspected", &self.subgraph_key.0); - errln!("{:}", e); - Some(e.to_string()) - } else { - eprintln!( - "retries exhausted for subgraph {}. To add more run `rover dev` with the --subgraph-retries flag.", - &self.subgraph_key.0, - ); - self.message_sender.remove_subgraph(&self.subgraph_key.0)?; - None - } - } - }; - - Ok(maybe_update_message) - } - - /// Start checking for subgraph updates and sending them to the main process. - /// - /// This function will block forever for `SubgraphSchemaWatcherKind` that poll for changes—so it - /// should be started in a separate thread. - pub async fn watch_subgraph_for_changes( - &mut self, - retry_period: Option, - ) -> RoverResult<()> { - let mut last_message = None; - match self.schema_watcher_kind.clone() { - SubgraphSchemaWatcherKind::Introspect(introspect_runner_kind, polling_interval) => { - let endpoint = introspect_runner_kind.endpoint(); - eprintln!( - "polling {} every {} {}", - &endpoint, - polling_interval, - match polling_interval { - 1 => "second", - _ => "seconds", - } - ); - let mut interval = tokio::time::interval(Duration::from_secs(polling_interval)); - interval.set_missed_tick_behavior(Delay); - loop { - last_message = self - .update_subgraph(last_message.as_ref(), retry_period) - .await?; - interval.tick().await; - } - } - SubgraphSchemaWatcherKind::File(path) => { - // populate the schema for the first time (last_message is always None to start) - last_message = self - .update_subgraph(last_message.as_ref(), retry_period) - .await?; - - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - - let watch_path = path.clone(); - - Fs::watch_file(watch_path, tx); - - while let Some(res) = rx.recv().await { - match res { - Ok(()) => (), - Err(err) => return Err(anyhow::Error::from(err).into()), - } - last_message = self - .update_subgraph(last_message.as_ref(), retry_period) - .await?; - } - } - SubgraphSchemaWatcherKind::Once(_) => { - self.update_subgraph(None, retry_period).await?; - } - } - Ok(()) - } - - pub fn set_schema_refresher(&mut self, new_refresher: SubgraphSchemaWatcherKind) { - self.schema_watcher_kind = new_refresher; - } - - pub fn get_name(&self) -> String { - self.subgraph_key.0.to_string() - } -} - -#[derive(Debug, Clone)] -pub enum SubgraphSchemaWatcherKind { - /// Poll an endpoint via introspection - Introspect(IntrospectRunnerKind, u64), - /// Watch a file on disk - File(Utf8PathBuf), - /// Don't ever update, schema is only pulled once - Once(String), -}