Skip to content

Commit

Permalink
[rust]sync topic route every 30 seconds (#719)
Browse files Browse the repository at this point in the history
* [rust]sync topic route every 30 seconds

* fix lifetime issue

* fix lifetime issue

* remove extra clones

* fix indent
  • Loading branch information
glcrazier committed Apr 29, 2024
1 parent 8f16d83 commit 78a347e
Showing 1 changed file with 136 additions and 28 deletions.
164 changes: 136 additions & 28 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub(crate) struct Client<S> {
logger: Logger,
option: ClientOption,
session_manager: Arc<SessionManager>,
route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
route_table: Arc<Mutex<HashMap<String /* topic */, RouteStatus>>>,
id: String,
access_endpoints: Endpoints,
settings: Arc<RwLock<S>>,
Expand Down Expand Up @@ -100,7 +100,7 @@ where
logger: logger.new(o!("component" => "client")),
option,
session_manager: Arc::new(session_manager),
route_table: Mutex::new(HashMap::new()),
route_table: Arc::new(Mutex::new(HashMap::new())),
id,
access_endpoints: endpoints,
settings,
Expand Down Expand Up @@ -138,13 +138,17 @@ where
.await
.map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;

let route_table = Arc::clone(&self.route_table);
let endpoints = self.access_endpoints.clone();

let settings = Arc::clone(&self.settings);
tokio::spawn(async move {
rpc_client.is_started();
let seconds_30 = std::time::Duration::from_secs(30);
let mut heartbeat_interval = tokio::time::interval(seconds_30);
let mut sync_settings_interval =
tokio::time::interval_at(Instant::now() + seconds_30, seconds_30);
let mut sync_route_timer = tokio::time::interval(seconds_30);
loop {
select! {
_ = heartbeat_interval.tick() => {
Expand Down Expand Up @@ -203,6 +207,32 @@ where
}

},
_ = sync_route_timer.tick() => {
let topics: Vec<String>;
{
topics = route_table.lock().keys().cloned().collect();
}
debug!(logger, "update topic route of topics {:?}", topics);
for topic in topics {
let result = Self::topic_route_inner(
logger.clone(),
rpc_client.shadow_session(),
Arc::clone(&route_table),
namespace.clone(),
endpoints.clone(),
topic.as_str(),
)
.await;
if result.is_err() {
warn!(
logger,
"sync route of topic = {:?} failed, reason = {:?}",
topic,
result.err()
);
}
}
},
_ = &mut shutdown_rx => {
info!(logger, "receive shutdown signal, stop heartbeat and telemetry tasks.");
break;
Expand Down Expand Up @@ -320,20 +350,31 @@ where
}
}
let rpc_client = self.get_session().await?;
self.topic_route_inner(rpc_client, topic).await
let logger = self.logger.clone();
let route_table = Arc::clone(&self.route_table);
Self::topic_route_inner(
logger,
rpc_client,
route_table,
self.option.namespace.clone(),
self.access_endpoints.clone(),
topic,
)
.await
}

async fn query_topic_route<T: RPCClient + 'static>(
&self,
mut rpc_client: T,
namespace: String,
access_endpoints: Endpoints,
topic: &str,
) -> Result<Route, ClientError> {
let request = QueryRouteRequest {
topic: Some(Resource {
name: topic.to_owned(),
resource_namespace: self.option.namespace.to_string(),
resource_namespace: namespace,
}),
endpoints: Some(self.access_endpoints.inner().clone()),
endpoints: Some(access_endpoints.into_inner()),
};

let response = rpc_client.query_route(request).await?;
Expand All @@ -347,13 +388,15 @@ where
}

async fn topic_route_inner<T: RPCClient + 'static>(
&self,
logger: Logger,
rpc_client: T,
route_table: Arc<parking_lot::Mutex<HashMap<String, RouteStatus>>>,
namespace: String,
endpoints: Endpoints,
topic: &str,
) -> Result<Arc<Route>, ClientError> {
debug!(self.logger, "query route for topic={}", topic);
let rx = match self
.route_table
debug!(logger, "query route for topic={}", topic);
let rx = match route_table
.lock()
.entry(topic.to_owned())
.or_insert_with(|| RouteStatus::Querying(None))
Expand Down Expand Up @@ -388,16 +431,16 @@ where
};
}

let result = self.query_topic_route(rpc_client, topic).await;
let result = Self::query_topic_route(rpc_client, namespace, endpoints, topic).await;

// send result to all waiters
if let Ok(route) = result {
debug!(
self.logger,
logger,
"query route for topic={} success: route={:?}", topic, route
);
let route = Arc::new(route);
let mut route_table_lock = self.route_table.lock();
let mut route_table_lock = route_table.lock();

// if message queues in previous and new route are the same, just keep the previous.
if let Some(RouteStatus::Found(prev)) = route_table_lock.get(topic) {
Expand All @@ -408,7 +451,7 @@ where

let prev =
route_table_lock.insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
info!(self.logger, "update route for topic={}", topic);
info!(logger, "update route for topic={}", topic);

if let Some(RouteStatus::Querying(Some(mut v))) = prev {
for item in v.drain(..) {
Expand All @@ -419,10 +462,10 @@ where
} else {
let err = result.unwrap_err();
warn!(
self.logger,
logger,
"query route for topic={} failed: error={}", topic, err
);
let mut route_table_lock = self.route_table.lock();
let mut route_table_lock = route_table.lock();
// keep the existing route if error occurs.
if let Some(RouteStatus::Found(prev)) = route_table_lock.get(topic) {
return Ok(Arc::clone(prev));
Expand Down Expand Up @@ -697,7 +740,7 @@ pub(crate) mod tests {
..Default::default()
},
session_manager: Arc::new(SessionManager::default()),
route_table: Mutex::new(HashMap::new()),
route_table: Arc::new(Mutex::new(HashMap::new())),
id: Client::<MockSettings>::generate_client_id(),
access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
settings: Arc::new(RwLock::new(MockSettings::default())),
Expand All @@ -712,7 +755,7 @@ pub(crate) mod tests {
logger: terminal_logger(),
option: ClientOption::default(),
session_manager: Arc::new(session_manager),
route_table: Mutex::new(HashMap::new()),
route_table: Arc::new(Mutex::new(HashMap::new())),
id: Client::<MockSettings>::generate_client_id(),
access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
settings: Arc::new(RwLock::new(MockSettings::default())),
Expand Down Expand Up @@ -865,7 +908,16 @@ pub(crate) mod tests {
mock.expect_query_route()
.return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));

let result = client.topic_route_inner(mock, "DefaultCluster").await;
let logger = client.logger.clone();
let result = Client::<MockSettings>::topic_route_inner(
logger,
mock,
Arc::clone(&client.route_table),
client.option.namespace.clone(),
client.access_endpoints.clone(),
"DefaultCluster",
)
.await;
assert!(result.is_ok());

let route = result.unwrap();
Expand All @@ -892,14 +944,30 @@ pub(crate) mod tests {
Box::pin(futures::future::ready(new_topic_route_response()))
});

let result = client_clone.topic_route_inner(mock, "DefaultCluster").await;
let result = Client::<MockSettings>::topic_route_inner(
client_clone.logger.clone(),
mock,
Arc::clone(&client_clone.route_table),
client_clone.option.namespace.clone(),
client_clone.access_endpoints.clone(),
"DefaultCluster",
)
.await;
assert!(result.is_ok());
});

let handle = tokio::spawn(async move {
sleep(Duration::from_millis(100));
let mock = session::MockRPCClient::new();
let result = client.topic_route_inner(mock, "DefaultCluster").await;
let mut mock = session::MockRPCClient::new();
let result = Client::<MockSettings>::topic_route_inner(
client.logger.clone(),
mock,
Arc::clone(&client.route_table),
client.option.namespace.clone(),
client.access_endpoints.clone(),
"DefaultCluster",
)
.await;
assert!(result.is_ok());
});

Expand All @@ -923,14 +991,30 @@ pub(crate) mod tests {
))))
});

let result = client_clone.topic_route_inner(mock, "DefaultCluster").await;
let result = Client::<MockSettings>::topic_route_inner(
client_clone.logger.clone(),
mock,
Arc::clone(&client_clone.route_table),
client_clone.option.namespace.clone(),
client_clone.access_endpoints.clone(),
"DefaultCluster",
)
.await;
assert!(result.is_err());
});

let handle = tokio::spawn(async move {
sleep(Duration::from_millis(100));
let mock = session::MockRPCClient::new();
let result = client.topic_route_inner(mock, "DefaultCluster").await;
let mut mock = session::MockRPCClient::new();
let result = Client::<MockSettings>::topic_route_inner(
client.logger.clone(),
mock,
Arc::clone(&client.route_table),
client.option.namespace.clone(),
client.access_endpoints.clone(),
"DefaultCluster",
)
.await;
assert!(result.is_err());
});

Expand Down Expand Up @@ -967,7 +1051,15 @@ pub(crate) mod tests {
))))
});

let result = client.topic_route_inner(mock, "DefaultCluster").await;
let result = Client::<MockSettings>::topic_route_inner(
client.logger.clone(),
mock,
Arc::clone(&client.route_table),
client.option.namespace.clone(),
client.access_endpoints.clone(),
"DefaultCluster",
)
.await;
assert!(result.is_ok());
}

Expand All @@ -979,7 +1071,15 @@ pub(crate) mod tests {
mock.expect_query_route()
.return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));

let result = client.topic_route_inner(mock, "DefaultCluster").await;
let result = Client::<MockSettings>::topic_route_inner(
client.logger.clone(),
mock,
Arc::clone(&client.route_table),
client.option.namespace.clone(),
client.access_endpoints.clone(),
"DefaultCluster",
)
.await;
assert!(result.is_ok());

let route = result.unwrap();
Expand All @@ -997,7 +1097,15 @@ pub(crate) mod tests {
mock.expect_query_route()
.return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));

let result2 = client.topic_route_inner(mock, "DefaultCluster").await;
let result2 = Client::<MockSettings>::topic_route_inner(
client.logger.clone(),
mock,
Arc::clone(&client.route_table),
client.option.namespace.clone(),
client.access_endpoints.clone(),
"DefaultCluster",
)
.await;
assert!(result2.is_ok());

let route2 = result2.unwrap();
Expand Down

0 comments on commit 78a347e

Please sign in to comment.