Skip to content

feat(memory): cherry-pick #19372 and #19059 to branch release-2.1 #19973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,15 @@ mod test {
let raw_opts = "
--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/
--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/
--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/
--frontend-opts=--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368
--prometheus-listener-addr=127.0.0.1:1234
--config-path=src/config/test.toml
";
let actual = StandaloneOpts::parse_from(raw_opts.lines());
let opts = StandaloneOpts {
compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 --temp-secret-file-dir ./compute/secrets/".into()),
meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --temp-secret-file-dir ./meta/secrets/".into()),
frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/".into()),
frontend_opts: Some("--config-path=src/config/original.toml --temp-secret-file-dir ./frontend/secrets/ --frontend-total-memory-bytes=34359738368".into() ),
compactor_opts: None,
prometheus_listener_addr: Some("127.0.0.1:1234".into()),
config_path: Some("src/config/test.toml".into()),
Expand Down Expand Up @@ -508,6 +508,7 @@ mod test {
metrics_level: None,
enable_barrier_read: None,
temp_secret_file_dir: "./frontend/secrets/",
frontend_total_memory_bytes: 34359738368,
},
),
compactor_opts: None,
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub async fn compute_node_serve(
compactor_context,
hummock_meta_client.clone(),
storage.sstable_object_id_manager().clone(),
storage.filter_key_extractor_manager().clone(),
storage.compaction_catalog_manager_ref().clone(),
);
sub_tasks.push((handle, shutdown_sender));
}
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub mod session;
mod stream_fragmenter;
use risingwave_common::config::{MetricLevel, OverrideConfig};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;
pub use stream_fragmenter::build_graph;
mod utils;
Expand Down Expand Up @@ -160,6 +161,10 @@ pub struct FrontendOpts {
default_value = "./secrets"
)]
pub temp_secret_file_dir: String,

/// Total available memory for the frontend node in bytes. Used by both computing and storage.
#[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
pub frontend_total_memory_bytes: usize,
}

impl risingwave_common::opts::Opts for FrontendOpts {
Expand Down Expand Up @@ -221,3 +226,7 @@ pub fn start(
.unwrap()
})
}

pub fn default_frontend_total_memory_bytes() -> usize {
system_memory_available_bytes()
}
4 changes: 2 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
use risingwave_common::types::DataType;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::cluster_limit;
use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::{cluster_limit, resource_util};
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager};
Expand Down Expand Up @@ -444,7 +444,7 @@ impl FrontendEnv {
.map_err(|err| anyhow!(err))?;
}

let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
let total_memory_bytes = opts.frontend_total_memory_bytes;
let heap_profiler =
HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
// Run a background heap profiler
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub async fn setup_compute_env_with_metric(
compactor_streams_change_tx,
)
.await;

let fake_host_address = HostAddress {
host: "127.0.0.1".to_string(),
port,
Expand Down
28 changes: 22 additions & 6 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -32,6 +33,7 @@ use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl};
use risingwave_pb::hummock::compact_task::PbTaskType;
use risingwave_pb::hummock::PbTableSchema;
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::compactor::compactor_runner::compact_and_build_sst;
use risingwave_storage::hummock::compactor::{
ConcatSstableIterator, DummyCompactionFilter, TaskConfig, TaskProgress,
Expand Down Expand Up @@ -133,8 +135,13 @@ async fn build_table(
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt);
let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch, TableId::new(0));
let table_key_len = full_key.user_key.table_key.len();
Expand Down Expand Up @@ -177,8 +184,14 @@ async fn build_table_2(
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt);

let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
);
let mut full_key = test_key_of(0, epoch, TableId::new(table_id));
let table_key_len = full_key.user_key.table_key.len();

Expand Down Expand Up @@ -273,8 +286,11 @@ async fn compact<I: HummockIterator<Direction = Forward>>(
bloom_false_positive: 0.001,
..Default::default()
};
let mut builder =
CapacitySplitTableBuilder::for_test(LocalTableBuilderFactory::new(32, sstable_store, opt));
let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
let mut builder = CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(32, sstable_store, opt),
compaction_catalog_agent_ref,
);

let task_config = task_config.unwrap_or_else(|| TaskConfig {
key_range: KeyRange::inf(),
Expand Down
16 changes: 15 additions & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::env;
use std::ops::Range;
use std::sync::atomic::AtomicU64;
Expand All @@ -24,11 +25,13 @@ use foyer::{Engine, HybridCacheBuilder};
use rand::random;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_object_store::object::{
InMemObjectStore, ObjectStore, ObjectStoreImpl, S3ObjectStore,
};
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::iterator::{ConcatIterator, ConcatIteratorInner, HummockIterator};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::value::HummockValue;
Expand Down Expand Up @@ -83,7 +86,11 @@ impl<F: SstableWriterFactory> TableBuilderFactory for LocalTableBuilderFactory<F
.create_sst_writer(id, writer_options)
.await
.unwrap();
let builder = SstableBuilder::for_test(id, writer, self.options.clone());
let table_id_to_vnode = HashMap::from_iter(vec![(
TableId::default().into(),
VirtualNode::COUNT_FOR_TEST,
)]);
let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode);

Ok(builder)
}
Expand Down Expand Up @@ -192,6 +199,8 @@ fn bench_builder(

let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);

let mut group = c.benchmark_group("bench_multi_builder");
group
.sample_size(SAMPLE_COUNT)
Expand All @@ -205,6 +214,7 @@ fn bench_builder(
StreamingSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
})
});
Expand All @@ -217,6 +227,7 @@ fn bench_builder(
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
})
});
Expand Down Expand Up @@ -249,13 +260,16 @@ fn bench_table_scan(c: &mut Criterion) {
let object_store = Arc::new(ObjectStoreImpl::InMem(store));
let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);

let ssts = runtime.block_on(async {
build_tables(CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(
1,
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
.await
});
Expand Down
29 changes: 9 additions & 20 deletions src/storage/compactor/src/compactor_observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::ObserverState;
use risingwave_pb::catalog::Table;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::SubscribeResponse;
use risingwave_storage::filter_key_extractor::{
FilterKeyExtractorImpl, FilterKeyExtractorManagerRef,
};
use risingwave_storage::compaction_catalog_manager::CompactionCatalogManagerRef;

pub struct CompactorObserverNode {
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
compaction_catalog_manager: CompactionCatalogManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
version: u64,
}
Expand Down Expand Up @@ -83,36 +78,30 @@ impl ObserverState for CompactorObserverNode {

impl CompactorObserverNode {
pub fn new(
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
compaction_catalog_manager: CompactionCatalogManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
) -> Self {
Self {
filter_key_extractor_manager,
compaction_catalog_manager,
system_params_manager,
version: 0,
}
}

fn handle_catalog_snapshot(&mut self, tables: Vec<Table>) {
let all_filter_key_extractors: HashMap<u32, Arc<FilterKeyExtractorImpl>> = tables
.iter()
.map(|t| (t.id, Arc::new(FilterKeyExtractorImpl::from_table(t))))
.collect();
self.filter_key_extractor_manager
.sync(all_filter_key_extractors);
self.compaction_catalog_manager
.sync(tables.into_iter().map(|t| (t.id, t)).collect());
}

fn handle_catalog_notification(&mut self, operation: Operation, table_catalog: Table) {
match operation {
Operation::Add | Operation::Update => {
self.filter_key_extractor_manager.update(
table_catalog.id,
Arc::new(FilterKeyExtractorImpl::from_table(&table_catalog)),
);
self.compaction_catalog_manager
.update(table_catalog.id, table_catalog);
}

Operation::Delete => {
self.filter_key_extractor_manager.remove(table_catalog.id);
self.compaction_catalog_manager.remove(table_catalog.id);
}

_ => panic!("receive an unsupported notify {:?}", operation),
Expand Down
9 changes: 9 additions & 0 deletions src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::config::{
AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig,
};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;

use crate::server::{compactor_serve, shared_compactor_serve};
Expand Down Expand Up @@ -92,6 +93,10 @@ pub struct CompactorOpts {

#[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
pub proxy_rpc_endpoint: String,

/// Total available memory for the frontend node in bytes. Used by compactor.
#[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
pub compactor_total_memory_bytes: usize,
}

impl risingwave_common::opts::Opts for CompactorOpts {
Expand Down Expand Up @@ -143,3 +148,7 @@ pub fn start(
}),
}
}

pub fn default_compactor_total_memory_bytes() -> usize {
system_memory_available_bytes()
}
Loading
Loading