Skip to content
2 changes: 1 addition & 1 deletion ci/rust-toolchain
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints

[toolchain]
channel = "nightly-2024-06-06"
channel = "nightly-2024-07-19"
2 changes: 1 addition & 1 deletion lints/rust-toolchain
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# See `README.md` before bumping the version.

[toolchain]
channel = "nightly-2024-06-06"
channel = "nightly-2024-07-19"
components = ["llvm-tools-preview", "rustc-dev"]
1 change: 0 additions & 1 deletion src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#![feature(coroutines)]
#![feature(proc_macro_hygiene, stmt_expr_attributes)]
#![feature(iterator_try_collect)]
#![feature(lint_reasons)]
#![feature(is_sorted)]
#![recursion_limit = "256"]
#![feature(let_chains)]
Expand Down
2 changes: 0 additions & 2 deletions src/common/benches/bench_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]

use std::cell::RefCell;
use std::hint::black_box;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down
1 change: 0 additions & 1 deletion src/common/common_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

// This is a stub lib.rs.

#![feature(lint_reasons)]
#![feature(impl_trait_in_assoc_type)]
#![feature(error_generic_member_access)]

Expand Down
1 change: 0 additions & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#![feature(test)]
#![feature(trusted_len)]
#![feature(allocator_api)]
#![feature(lint_reasons)]
#![feature(coroutines)]
#![feature(map_try_insert)]
#![feature(error_generic_member_access)]
Expand Down
1 change: 0 additions & 1 deletion src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#![feature(coroutines)]
#![feature(type_alias_impl_trait)]
#![feature(let_chains)]
#![feature(lint_reasons)]
#![feature(impl_trait_in_assoc_type)]
#![cfg_attr(coverage, feature(coverage_attribute))]

Expand Down
1 change: 0 additions & 1 deletion src/connector/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#![feature(stmt_expr_attributes)]
#![feature(box_patterns)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![feature(box_into_inner)]
#![feature(type_alias_impl_trait)]
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#![feature(stmt_expr_attributes)]
#![feature(box_patterns)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![feature(box_into_inner)]
#![feature(type_alias_impl_trait)]
Expand Down
44 changes: 26 additions & 18 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
use std::collections::BTreeMap;

use anyhow::{anyhow, Context};
use futures::future::try_join_all;
use futures::prelude::future::FutureExt;
use futures::prelude::TryFuture;
use futures::TryFutureExt;
use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
Expand All @@ -26,7 +23,7 @@ use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
use google_cloud_pubsub::client::google_cloud_auth::project;
use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use google_cloud_pubsub::publisher::Publisher;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::Deserialize;
Expand All @@ -46,19 +43,33 @@ use crate::dispatch_sink_formatter_str_key_impl;
pub const PUBSUB_SINK: &str = "google_pubsub";
const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;

fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
mod delivery_future {
use anyhow::Context;
use futures::future::try_join_all;
use futures::{FutureExt, TryFuture, TryFutureExt};
use google_cloud_pubsub::publisher::Awaiter;

use crate::sink::SinkError;

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
}
}

use delivery_future::*;

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct GooglePubSubConfig {
Expand Down Expand Up @@ -172,9 +183,6 @@ struct GooglePubSubPayloadWriter<'w> {
add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
}

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl GooglePubSubSinkWriter {
pub async fn new(
config: GooglePubSubConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct MonitoredFanoutPartitionedWriterBuilder<B: IcebergWriterBuilder> {
}

impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriterBuilder<B> {
#[expect(dead_code)]
pub fn new(
inner: FanoutPartitionedWriterBuilder<B>,
partition_num: LabelGuardedIntGauge<2>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub struct MonitoredWriteWriterBuilder<B: IcebergWriterBuilder> {

impl<B: IcebergWriterBuilder> MonitoredWriteWriterBuilder<B> {
/// Create writer context.
#[expect(dead_code)]
pub fn new(
inner: B,
write_qps: LabelGuardedIntCounter<2>,
Expand Down
1 change: 0 additions & 1 deletion src/dml/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(coroutines)]
#![feature(hash_extract_if)]
#![feature(type_alias_impl_trait)]
Expand Down
1 change: 0 additions & 1 deletion src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//! access if `risingwave_common` is already a dependency.

#![feature(error_generic_member_access)]
#![feature(lint_reasons)]
#![feature(register_tool)]
#![register_tool(rw)]
#![feature(trait_alias)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#![feature(let_chains)]
#![feature(lint_reasons)]
#![feature(iterator_try_collect)]
#![feature(coroutines)]
#![feature(never_type)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#![allow(non_snake_case)] // for `ctor` generated code
#![feature(let_chains)]
#![feature(assert_matches)]
#![feature(lint_reasons)]
#![feature(iterator_try_collect)]
#![feature(coroutines)]
#![feature(test)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]
#![feature(let_chains)]

use std::vec;
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#![feature(if_let_guard)]
#![feature(let_chains)]
#![feature(assert_matches)]
#![feature(lint_reasons)]
#![feature(box_patterns)]
#![feature(macro_metavar_expr)]
#![feature(min_specialization)]
Expand Down
1 change: 0 additions & 1 deletion src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]
#![feature(let_chains)]
#![cfg_attr(coverage, feature(coverage_attribute))]

Expand Down
1 change: 0 additions & 1 deletion src/meta/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]
#![feature(let_chains)]
#![feature(impl_trait_in_assoc_type)]
#![cfg_attr(coverage, feature(coverage_attribute))]
Expand Down
64 changes: 39 additions & 25 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,47 @@ struct ControlStreamNode {
sender: UnboundedSender<StreamingControlStreamRequest>,
}

fn into_future(
worker_id: WorkerId,
stream: BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
) -> ResponseStreamFuture {
stream.into_future().map(move |(opt, stream)| {
(
worker_id,
stream,
opt.ok_or_else(|| anyhow!("end of stream").into())
.and_then(|result| result.map_err(|e| e.into())),
)
})
mod response_stream_future {
use std::future::Future;

use anyhow::anyhow;
use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt};
use risingwave_pb::stream_service::StreamingControlStreamResponse;

use crate::manager::WorkerId;
use crate::MetaResult;

pub(super) fn into_future(
worker_id: WorkerId,
stream: BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
) -> ResponseStreamFuture {
stream.into_future().map(move |(opt, stream)| {
(
worker_id,
stream,
opt.ok_or_else(|| anyhow!("end of stream").into())
.and_then(|result| result.map_err(|e| e.into())),
)
})
}

pub(super) type ResponseStreamFuture = impl Future<
Output = (
WorkerId,
BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
MetaResult<StreamingControlStreamResponse>,
),
> + 'static;
}

type ResponseStreamFuture = impl Future<
Output = (
WorkerId,
BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
MetaResult<StreamingControlStreamResponse>,
),
> + 'static;
use response_stream_future::*;

pub(super) struct ControlStreamManager {
context: GlobalBarrierManagerContext,
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(trait_alias)]
#![feature(type_alias_impl_trait)]
#![feature(lint_reasons)]
#![feature(map_try_insert)]
#![feature(extract_if)]
#![feature(hash_extract_if)]
Expand Down
1 change: 0 additions & 1 deletion src/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#![feature(trait_alias)]
#![feature(type_alias_impl_trait)]
#![feature(lint_reasons)]
#![feature(error_generic_member_access)]
#![feature(let_chains)]

Expand Down
1 change: 0 additions & 1 deletion src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// for derived code of `Message`
#![expect(clippy::all)]
#![expect(clippy::doc_markdown)]
#![feature(lint_reasons)]

use std::str::FromStr;

Expand Down
1 change: 0 additions & 1 deletion src/risedevtool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(exit_status_error)]
#![feature(let_chains)]
#![feature(lint_reasons)]

mod config;
pub use config::*;
Expand Down
2 changes: 1 addition & 1 deletion src/risedevtool/src/task/task_kafka_ready_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Task for KafkaReadyCheckTask {
let mut config = ClientConfig::new();
config.set(
"bootstrap.servers",
&format!("{}:{}", self.config.address, self.config.port),
format!("{}:{}", self.config.address, self.config.port),
);

let rt = tokio::runtime::Builder::new_current_thread()
Expand Down
1 change: 0 additions & 1 deletion src/sqlparser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
//! ```

#![cfg_attr(not(feature = "std"), no_std)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![expect(clippy::doc_markdown)]
#![expect(clippy::upper_case_acronyms)]
Expand Down
1 change: 0 additions & 1 deletion src/storage/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#![feature(type_alias_impl_trait)]
#![feature(extract_if)]
#![feature(custom_test_frameworks)]
#![feature(lint_reasons)]
#![feature(map_try_insert)]
#![feature(hash_extract_if)]
#![feature(btree_extract_if)]
Expand Down
2 changes: 0 additions & 2 deletions src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]

mod compactor_observer;
mod rpc;
pub mod server;
Expand Down
1 change: 0 additions & 1 deletion src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#![feature(async_closure)]
#![feature(extract_if)]
#![feature(hash_extract_if)]
#![feature(lint_reasons)]
#![feature(map_many_mut)]
#![feature(type_alias_impl_trait)]
#![feature(impl_trait_in_assoc_type)]
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/sstable/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl BloomFilterReader {
true
} else {
let nbits = self.data.bit_len();
let delta = (h >> 17) | (h << 15);
let delta = h.rotate_left(15);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we forget the rotate_right(17)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, with u32 and 17 + 15 = 32, this operation equals to directly rotate_left(15). (clippy also recommends it.

for _ in 0..self.k {
let bit_pos = h % (nbits as u32);
if !self.data.get_bit(bit_pos as usize) {
Expand Down Expand Up @@ -171,7 +171,7 @@ impl FilterBuilder for BloomFilterBuilder {
filter.resize(nbytes, 0);
for h in &self.key_hash_entries {
let mut h = *h;
let delta = (h >> 17) | (h << 15);
let delta = h.rotate_left(15);
for _ in 0..k {
let bit_pos = (h as usize) % nbits;
filter.set_bit(bit_pos, true);
Expand Down
1 change: 0 additions & 1 deletion src/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#![feature(extract_if)]
#![feature(coroutines)]
#![feature(hash_extract_if)]
#![feature(lint_reasons)]
#![feature(proc_macro_hygiene)]
#![feature(stmt_expr_attributes)]
#![feature(strict_provenance)]
Expand Down
Loading