Skip to content

Commit 3af6d1e

Browse files
committed
test: add a proxy example, work in progress
1 parent 8d91b27 commit 3af6d1e

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

Diff for: Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ gluesql = "0.13"
4949
## for datafusion example
5050
datafusion = "33"
5151
sqlparser = "0.40"
52+
## for proxy example
53+
tokio-postgres = "0.7"
5254

5355
[features]
5456
default = ["tokio", "time-format"]

Diff for: examples/proxy.rs

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use std::sync::Arc;
2+
3+
use async_trait::async_trait;
4+
use futures::{stream, StreamExt};
5+
use tokio::net::TcpListener;
6+
use tokio_postgres::{Client, NoTls, SimpleQueryMessage};
7+
8+
use pgwire::api::auth::noop::NoopStartupHandler;
9+
use pgwire::api::query::{PlaceholderExtendedQueryHandler, SimpleQueryHandler};
10+
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response, Tag};
11+
use pgwire::api::{ClientInfo, MakeHandler, StatelessMakeHandler, Type};
12+
use pgwire::error::{PgWireError, PgWireResult};
13+
use pgwire::tokio::process_socket;
14+
15+
pub struct ProxyProcessor {
16+
upstream_client: Client,
17+
}
18+
19+
#[async_trait]
20+
impl SimpleQueryHandler for ProxyProcessor {
21+
async fn do_query<'a, C>(&self, _client: &C, query: &'a str) -> PgWireResult<Vec<Response<'a>>>
22+
where
23+
C: ClientInfo + Unpin + Send + Sync,
24+
{
25+
self.upstream_client
26+
.simple_query(query)
27+
.await
28+
.map_err(|e| PgWireError::ApiError(Box::new(e)))
29+
.map(|resp_msgs| {
30+
let mut downstream_response = Vec::new();
31+
let mut row_buf = Vec::new();
32+
for resp in resp_msgs {
33+
match resp {
34+
SimpleQueryMessage::CommandComplete(count) => {
35+
if row_buf.is_empty() {
36+
downstream_response.push(Response::Execution(
37+
Tag::new_for_execution("", Some(count as usize)),
38+
));
39+
} else {
40+
downstream_response.push(Response::Query(row_buf));
41+
}
42+
}
43+
SimpleQueryMessage::Row(row) => {
44+
// TODO: convert simple query row to Response::Query
45+
row_buf.push(row);
46+
}
47+
}
48+
}
49+
50+
downstream_response
51+
})
52+
}
53+
}
54+
55+
#[tokio::main]
56+
pub async fn main() {
57+
let (client, connection) = tokio_postgres::connect("host=127.0.0.1 user=postgres", NoTls)
58+
.await
59+
.expect("Cannot client upstream connection");
60+
tokio::spawn(async move {
61+
if let Err(e) = connection.await {
62+
eprintln!("Upstream connection error: {}", e);
63+
}
64+
});
65+
66+
let processor = Arc::new(StatelessMakeHandler::new(Arc::new(ProxyProcessor {
67+
upstream_client: client,
68+
})));
69+
70+
// We have not implemented extended query in this server, use placeholder instead
71+
let placeholder = Arc::new(StatelessMakeHandler::new(Arc::new(
72+
PlaceholderExtendedQueryHandler,
73+
)));
74+
let authenticator = Arc::new(StatelessMakeHandler::new(Arc::new(NoopStartupHandler)));
75+
76+
let server_addr = "127.0.0.1:5431";
77+
let listener = TcpListener::bind(server_addr).await.unwrap();
78+
println!("Listening to {}", server_addr);
79+
loop {
80+
let incoming_socket = listener.accept().await.unwrap();
81+
let authenticator_ref = authenticator.make();
82+
let processor_ref = processor.make();
83+
let placeholder_ref = placeholder.make();
84+
tokio::spawn(async move {
85+
process_socket(
86+
incoming_socket.0,
87+
None,
88+
authenticator_ref,
89+
processor_ref,
90+
placeholder_ref,
91+
)
92+
.await
93+
});
94+
}
95+
}

0 commit comments

Comments
 (0)