Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,32 @@ exclude = ["test/*"]
edition = "2018"

[dependencies]
bytes = "0.5.2"
crossbeam = "0.7"
bytes = "1.0"
futures-core = "0.3"
futures-util = "0.3"
futures-sink = "0.3"
lazy_static = "1"
lru = "0.6.0"
mio-named-pipes = "0.1.6"
mysql_common = "0.24.1"
mio = "0.7.7"
mysql_common = "0.26.0"
native-tls = "0.2"
pem = "0.8.1"
percent-encoding = "2.1.0"
pin-project = "0.4.17"
pin-project = "1.0.2"
serde = "1"
serde_json = "1"
thiserror = "1.0.4"
tokio = { version = "0.2.17", features = ["io-util", "net", "sync", "fs", "rt-core", "time", "stream", "macros"] }
tokio-util = { version = "0.3.1", features = ["codec"] }
tokio-tls = "0.3"
tokio = { version = "1.0", features = ["io-util", "fs", "net", "time"] }
tokio-util = { version = "0.6.0", features = ["codec"] }
tokio-native-tls = { git = "https://github.com/tokio-rs/tls.git" }
twox-hash = "1"
url = "2.1"
uuid = { version = "0.8.1", features = ["v4"] }

[dev-dependencies]
tempfile = "3.1.0"
socket2 = "0.3.17"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
rand = "0.8.0"

[features]
Expand Down
67 changes: 33 additions & 34 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,39 @@ jobs:
DATABASE_URL: mysql://root:root@127.0.0.1:3306/mysql
displayName: Run tests

- job: "TestBasicMacOs"
pool:
vmImage: "macOS-10.15"
strategy:
maxParallel: 10
matrix:
stable:
RUST_TOOLCHAIN: stable
steps:
- bash: |
brew update
brew install mysql
brew services start mysql
brew services stop mysql
sleep 3
echo 'local_infile=1' >> /usr/local/etc/my.cnf
echo 'socket=/tmp/mysql.sock' >> /usr/local/etc/my.cnf
brew services start mysql
/usr/local/Cellar/mysql/*/bin/mysqld --verbose --help
sleep 20
/usr/local/Cellar/mysql/*/bin/mysql -e "SET GLOBAL max_allowed_packet = 36700160;" -uroot
displayName: Install MySql
- bash: |
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUST_TOOLCHAIN
displayName: Install rust (MacOs)
- bash: |
SSL=false COMPRESS=false cargo test
SSL=true COMPRESS=false cargo test
SSL=false COMPRESS=true cargo test
SSL=true COMPRESS=true cargo test
env:
RUST_BACKTRACE: 1
DATABASE_URL: mysql://root@127.0.0.1/mysql
displayName: Run tests
# - job: "TestBasicMacOs"
# pool:
# vmImage: "macOS-10.15"
# strategy:
# maxParallel: 10
# matrix:
# stable:
# RUST_TOOLCHAIN: stable
# steps:
# - bash: |
# brew update
# brew install mysql
# brew services start mysql
# brew services stop mysql
# sleep 3
# echo 'local_infile=1' >> /usr/local/etc/my.cnf
# echo 'socket=/tmp/mysql.sock' >> /usr/local/etc/my.cnf
# brew services start mysql
# sleep 5
# /usr/local/Cellar/mysql/*/bin/mysql -e "SET GLOBAL max_allowed_packet = 36700160;" -uroot
# displayName: Install MySql
# - bash: |
# curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUST_TOOLCHAIN
# displayName: Install rust (MacOs)
# - bash: |
# SSL=false COMPRESS=false cargo test
# SSL=true COMPRESS=false cargo test
# SSL=false COMPRESS=true cargo test
# SSL=true COMPRESS=true cargo test
# env:
# RUST_BACKTRACE: 1
# DATABASE_URL: mysql://root@127.0.0.1/mysql
# displayName: Run tests

- job: "TestBasicWindows"
pool:
Expand Down
1 change: 0 additions & 1 deletion rustfmt.toml

This file was deleted.

53 changes: 27 additions & 26 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl ConnInner {
fn stream_mut(&mut self) -> Result<&mut Stream> {
self.stream
.as_mut()
.ok_or(DriverError::ConnectionClosed.into())
.ok_or_else(|| DriverError::ConnectionClosed.into())
}
}

Expand Down Expand Up @@ -281,7 +281,9 @@ impl Conn {
result
}
Err(err) => {
self.take_stream().close().await?;
if self.inner.stream.is_some() {
self.take_stream().close().await?;
}
Err(err)
}
}
Expand Down Expand Up @@ -344,7 +346,6 @@ impl Conn {
fn setup_stream(&mut self) -> Result<()> {
debug_assert!(self.inner.stream.is_some());
if let Some(stream) = self.inner.stream.as_mut() {
stream.set_keepalive_ms(self.inner.opts.tcp_keepalive())?;
stream.set_tcp_nodelay(self.inner.opts.tcp_nodelay())?;
}
Ok(())
Expand Down Expand Up @@ -450,7 +451,8 @@ impl Conn {
}
AuthPlugin::Other(ref name) => Err(DriverError::UnknownAuthPlugin {
name: String::from_utf8_lossy(name.as_ref()).to_string(),
})?,
}
.into()),
}
})
}
Expand Down Expand Up @@ -500,20 +502,14 @@ impl Conn {
self.drop_packet().await?;
Ok(())
}
_ => Err(DriverError::UnexpectedPacket {
payload: packet.into(),
}
.into()),
_ => Err(DriverError::UnexpectedPacket { payload: packet }.into()),
},
Some(0xfe) if !self.inner.auth_switched => {
let auth_switch_request = parse_auth_switch_request(&*packet)?.into_owned();
self.perform_auth_switch(auth_switch_request).await?;
Ok(())
}
_ => Err(DriverError::UnexpectedPacket {
payload: packet.into(),
}
.into()),
_ => Err(DriverError::UnexpectedPacket { payload: packet }.into()),
}
}

Expand Down Expand Up @@ -541,7 +537,7 @@ impl Conn {
self.handle_ok(ok_packet.into_owned());
} else if let Ok(err_packet) = parse_err_packet(&*packet, self.capabilities()) {
self.handle_err(err_packet.clone().into_owned());
return Err(err_packet.into()).into();
return Err(err_packet.into());
}

Ok(())
Expand Down Expand Up @@ -583,7 +579,7 @@ impl Conn {

/// Returns future that sends full command body to a server.
pub(crate) async fn write_command_raw(&mut self, body: Vec<u8>) -> Result<()> {
debug_assert!(body.len() > 0);
debug_assert!(!body.is_empty());
self.clean_dirty().await?;
self.reset_seq_id();
self.write_packet(body).await
Expand All @@ -607,7 +603,7 @@ impl Conn {
}

async fn run_init_commands(&mut self) -> Result<()> {
let mut init: Vec<_> = self.inner.opts.init().iter().cloned().collect();
let mut init = self.inner.opts.init().to_vec();

while let Some(query) = init.pop() {
self.query_drop(query).await?;
Expand All @@ -622,10 +618,18 @@ impl Conn {
let fut = Box::pin(async move {
let mut conn = Conn::empty(opts.clone());

let stream = if let Some(path) = opts.socket() {
Stream::connect_socket(path.to_owned()).await?
let stream = if let Some(_path) = opts.socket() {
#[cfg(unix)]
{
Stream::connect_socket(_path.to_owned()).await?
}
#[cfg(target_os = "windows")]
return Err(crate::DriverError::NamedPipesDisabled.into());
} else {
Stream::connect_tcp(opts.hostport_or_url()).await?
let keepalive = opts
.tcp_keepalive()
.map(|x| std::time::Duration::from_millis(x.into()));
Stream::connect_tcp(opts.hostport_or_url(), keepalive).await?
};

conn.inner.stream = Some(stream);
Expand Down Expand Up @@ -659,13 +663,10 @@ impl Conn {
let opts = self.inner.opts.clone();
if opts.socket().is_none() {
let opts = OptsBuilder::from_opts(opts).socket(Some(&**socket));
match Conn::new(opts).await {
Ok(conn) => {
let old_conn = std::mem::replace(self, conn);
// tidy up the old connection
old_conn.close_conn().await?;
}
Err(_) => (),
if let Ok(conn) = Conn::new(opts).await {
let old_conn = std::mem::replace(self, conn);
// tidy up the old connection
old_conn.close_conn().await?;
}
}
}
Expand Down Expand Up @@ -925,7 +926,7 @@ mod test {
#[test]
fn should_not_panic_if_dropped_without_tokio_runtime() {
let fut = Conn::new(get_opts());
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
fut.await.unwrap();
});
Expand Down
Loading