Skip to content

Commit

Permalink
compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Aug 1, 2024
1 parent 52df2e7 commit 907abe3
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 34 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ reqwest = "0.12.4"
serde = { version = "1.0.114", features = ["derive"] }
sled = "0.34.2"
structopt = "0.3.15"
tokio = "1.39.2"
tokio = { version = "1.39.2", features = ["full"] }
yup-oauth2 = "9.0.0"

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn main() -> Result<()> {

if let Some(metrics_address) = opt.metrics.clone() {
let state = state.clone();
async_std::task::spawn(async move { metrics::start(state, metrics_address).await });
tokio::task::spawn(async move { metrics::start(state, metrics_address).await });
}

// Setup mulitple parallel notifiers.
Expand All @@ -72,7 +72,7 @@ async fn main() -> Result<()> {
// and use the same HTTP/2 clients, one for production and one for sandbox server.
for _ in 0..50 {
let state = state.clone();
async_std::task::spawn(async move { notifier::start(state, interval).await });
tokio::task::spawn(async move { notifier::start(state, interval).await });
}

server::start(state, host, port).await?;
Expand Down
8 changes: 5 additions & 3 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ pub async fn start(state: State, server: String) -> Result<()> {
Ok(())
}

async fn metrics(req: tide::Request<State>) -> impl IntoResponse {
async fn metrics(axum::extract::State(state): axum::extract::State<State>) -> impl IntoResponse {
let mut encoded = String::new();
encode(&mut encoded, &req.state().metrics().registry).unwrap();
encode(&mut encoded, &state.metrics().registry).unwrap();
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
"application/openmetrics-text; version=1.0.0; charset=utf-8"
.parse()
.unwrap(),
);
(headers, encoded)
}
77 changes: 49 additions & 28 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use a2::{
};
use anyhow::{bail, Error, Result};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use log::*;
use serde::Deserialize;
Expand All @@ -14,7 +15,7 @@ use crate::state::State;

pub async fn start(state: State, server: String, port: u16) -> Result<()> {
let app = axum::Router::new()
.route("/", get(|_| async { "Hello, world!" }))
.route("/", get(|| async { "Hello, world!" }))
.route("/register", post(register_device))
.route("/notify", post(notify_device))
.with_state(state);
Expand All @@ -28,11 +29,32 @@ struct DeviceQuery {
token: String,
}

struct AppError(anyhow::Error);

impl<E> From<E> for AppError
where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
Self(err.into())
}
}

impl IntoResponse for AppError {
fn into_response(self) -> Response {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Something went wrong: {}", self.0),
)
.into_response()
}
}

/// Registers a device for heartbeat notifications.
async fn register_device(
axum::extract::State(state): axum::extract::State(State),
axum::extract::State(state): axum::extract::State<State>,
axum::extract::Json(query): axum::extract::Json<DeviceQuery>,
) -> Result<StatusCode> {
) -> Result<(), AppError> {
info!("register_device {}", query.token);

let schedule = state.schedule();
Expand All @@ -43,7 +65,7 @@ async fn register_device(

state.metrics().heartbeat_registrations_total.inc();

Ok(StatusCode::OK)
Ok(())
}

enum NotificationToken {
Expand Down Expand Up @@ -94,7 +116,7 @@ async fn notify_fcm(
_package_name: &str,
token: &str,
metrics: &Metrics,
) -> tide::Result<tide::Response> {
) -> Result<StatusCode> {
let Some(fcm_api_key) = fcm_api_key else {
warn!("Cannot notify FCM because key is not set");
return Ok(StatusCode::INTERNAL_SERVER_ERROR);
Expand Down Expand Up @@ -133,12 +155,8 @@ async fn notify_fcm(
Ok(StatusCode::OK)
}

async fn notify_apns(
req: tide::Request<State>,
client: a2::Client,
device_token: String,
) -> tide::Result<tide::Response> {
let schedule = req.state().schedule();
async fn notify_apns(state: State, client: a2::Client, device_token: String) -> Result<StatusCode> {
let schedule = state.schedule();
let payload = DefaultNotificationBuilder::new()
.set_title("New messages")
.set_title_loc_key("new_messages") // Localization key for the title.
Expand All @@ -152,7 +170,7 @@ async fn notify_apns(
// High priority (10).
// <https://developer.apple.com/documentation/usernotifications/sending-notification-requests-to-apns>
apns_priority: Some(Priority::High),
apns_topic: req.state().topic(),
apns_topic: state.topic(),
apns_push_type: Some(PushType::Alert),
..Default::default()
},
Expand All @@ -163,14 +181,14 @@ async fn notify_apns(
match res.code {
200 => {
info!("delivered notification for {}", device_token);
req.state().metrics().direct_notifications_total.inc();
state.metrics().direct_notifications_total.inc();
}
_ => {
warn!("unexpected status: {:?}", res);
}
}

Ok(tide::Response::new(StatusCode::OK))
Ok(StatusCode::OK)
}
Err(ResponseError(res)) => {
info!("Removing token {} due to error {:?}.", &device_token, res);
Expand All @@ -183,21 +201,23 @@ async fn notify_apns(
error!("failed to remove {}: {:?}", &device_token, err);
}
// Return 410 Gone response so email server can remove the token.
Ok(tide::Response::new(StatusCode::GONE))
Ok(StatusCode::GONE)
} else {
Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR))
Ok(StatusCode::INTERNAL_SERVER_ERROR)
}
}
Err(err) => {
error!("failed to send notification: {}, {:?}", device_token, err);
Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR))
Ok(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

/// Notifies a single device with a visible notification.
async fn notify_device(mut req: tide::Request<State>) -> Result<tide::Response> {
let device_token = req.body_string().await?;
async fn notify_device(
axum::extract::State(state): axum::extract::State<State>,
device_token: String,
) -> Result<StatusCode, AppError> {
info!("Got direct notification for {device_token}.");

let device_token: NotificationToken = device_token.as_str().parse()?;
Expand All @@ -207,27 +227,28 @@ async fn notify_device(mut req: tide::Request<State>) -> Result<tide::Response>
package_name,
token,
} => {
let client = req.state().fcm_client().clone();
let Ok(fcm_token) = req.state().fcm_token().await else {
return Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR));
let client = state.fcm_client().clone();
let Ok(fcm_token) = state.fcm_token().await else {
return Ok(StatusCode::INTERNAL_SERVER_ERROR);
};
let metrics = req.state().metrics();
let metrics = state.metrics();
notify_fcm(
&client,
fcm_token.as_deref(),
&package_name,
&token,
metrics,
)
.await
.await?;
}
NotificationToken::ApnsSandbox(token) => {
let client = req.state().sandbox_client().clone();
notify_apns(req, client, token).await
let client = state.sandbox_client().clone();
notify_apns(state, client, token).await?;
}
NotificationToken::ApnsProduction(token) => {
let client = req.state().production_client().clone();
notify_apns(req, client, token).await
let client = state.production_client().clone();
notify_apns(state, client, token).await?;
}
}
Ok(StatusCode::OK)
}

0 comments on commit 907abe3

Please sign in to comment.