Skip to content

Commit 481c27d

Browse files
committed
feat: encoded secret key
Signed-off-by: Ning Sun <[email protected]>
1 parent 941918b commit 481c27d

File tree

6 files changed

+47
-7
lines changed

6 files changed

+47
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/catalog/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ moka = { workspace = true, features = ["future", "sync"] }
4343
partition.workspace = true
4444
paste.workspace = true
4545
prometheus.workspace = true
46+
rand.workspace = true
4647
rustc-hash.workspace = true
4748
serde_json.workspace = true
4849
session.workspace = true

src/catalog/src/process_manager.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::hash_map::Entry;
1616
use std::collections::HashMap;
1717
use std::fmt::{Debug, Formatter};
18+
use std::hash::{DefaultHasher, Hash, Hasher};
1819
use std::sync::atomic::{AtomicU32, Ordering};
1920
use std::sync::{Arc, RwLock};
2021

@@ -101,6 +102,27 @@ impl ProcessManager {
101102
self.next_id.fetch_add(1, Ordering::Relaxed)
102103
}
103104

105+
/// Generate a Postgres specific secret key
106+
///
107+
/// According to Postgres this secret key is a 32-bit long integer for
108+
/// identify a connection. This implementation uses first 16bits hash value
109+
/// of server address and 16bit random data for this key.
110+
pub fn generate_secret_key(&self) -> i32 {
111+
let mut hasher = DefaultHasher::new();
112+
self.server_addr.hash(&mut hasher);
113+
let hostname_hash = hasher.finish();
114+
115+
// Take lower 16 bits of the hash for first part
116+
let first_part = (hostname_hash & 0xFFFF) as u16;
117+
118+
// Generate random 16 bits for second part
119+
let second_part = rand::random::<u16>();
120+
121+
// Combine both parts into i32
122+
let result = ((first_part as u32) << 16) | (second_part as u32);
123+
result as i32
124+
}
125+
104126
/// De-register a query from process list.
105127
pub fn deregister_query(&self, catalog: String, id: ProcessId) {
106128
if let Entry::Occupied(mut o) = self.catalogs.write().unwrap().entry(catalog) {

src/servers/src/postgres.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,13 @@ impl PgWireServerHandlers for PostgresServerHandler {
119119
impl MakePostgresServerHandler {
120120
fn make(&self, addr: Option<SocketAddr>) -> PostgresServerHandler {
121121
let process_id = self.process_manager.next_id();
122-
let session = Arc::new(Session::new(
123-
addr,
124-
Channel::Postgres,
125-
Default::default(),
126-
process_id,
127-
));
122+
// generate cluster unique secret key
123+
let secret_key = self.process_manager.generate_secret_key();
124+
125+
let session = Arc::new(
126+
Session::new(addr, Channel::Postgres, Default::default(), process_id)
127+
.with_secret_key(secret_key),
128+
);
128129
let handler = PostgresServerHandlerInner {
129130
query_handler: self.query_handler.clone(),
130131
process_manager: self.process_manager.clone(),

src/servers/src/postgres/auth_handler.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,10 @@ where
126126

127127
// pass generated process id and secret key to client, this information will
128128
// be sent to postgres client for query cancellation.
129-
client.set_pid_and_secret_key(session.process_id() as i32, rand::random::<i32>());
129+
client.set_pid_and_secret_key(
130+
session.process_id() as i32,
131+
session.secret_key().unwrap_or_default(),
132+
);
130133
// set userinfo outside
131134
}
132135

src/session/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ pub struct Session {
4343
configuration_variables: Arc<ConfigurationVariables>,
4444
// the process id to use when killing the query
4545
process_id: u32,
46+
// a postgres specific key for cancel request
47+
secret_key: Option<i32>,
4648
}
4749

4850
pub type SessionRef = Arc<Session>;
@@ -85,9 +87,15 @@ impl Session {
8587
configuration_variables: Arc::new(configuration_variables),
8688
mutable_inner: Arc::new(RwLock::new(MutableInner::default())),
8789
process_id,
90+
secret_key: None,
8891
}
8992
}
9093

94+
pub fn with_secret_key(mut self, secret_key: i32) -> Self {
95+
self.secret_key = Some(secret_key);
96+
self
97+
}
98+
9199
pub fn new_query_context(&self) -> QueryContextRef {
92100
QueryContextBuilder::default()
93101
// catalog is not allowed for update in query context so we use
@@ -155,4 +163,8 @@ impl Session {
155163
pub fn process_id(&self) -> u32 {
156164
self.process_id
157165
}
166+
167+
pub fn secret_key(&self) -> Option<i32> {
168+
self.secret_key
169+
}
158170
}

0 commit comments

Comments
 (0)