diff --git a/.gitmodules b/.gitmodules index d85acea564..7f8cb88da4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "cpp/third-party/googletest"] path = cpp/third-party/googletest url = git@github.com:google/googletest.git +[submodule "cpp/FlameGraph"] + path = cpp/FlameGraph + url = git@github.com:brendangregg/FlameGraph.git diff --git a/cpp/.gitignore b/cpp/.gitignore index 6021de86a8..eedd6d3627 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -5,3 +5,8 @@ plot **/cmake-build-debug **/CMakeCache.txt **/CMakeFiles +# remove perf svg +cpp/testcase/perf-*/*.svg/*.svg +*.csv +*.txt +*.svg diff --git a/cpp/FlameGraph b/cpp/FlameGraph new file mode 160000 index 0000000000..41fee1f99f --- /dev/null +++ b/cpp/FlameGraph @@ -0,0 +1 @@ +Subproject commit 41fee1f99f9276008b7cd112fca19dc3ea84ac32 diff --git a/cpp/pixels-common/include/physical/BufferPool.h b/cpp/pixels-common/include/physical/BufferPool.h index ec810e29a9..85bd5673b0 100644 --- a/cpp/pixels-common/include/physical/BufferPool.h +++ b/cpp/pixels-common/include/physical/BufferPool.h @@ -65,7 +65,7 @@ class BufferPool static thread_local bool isInitialized; static thread_local std::map> buffers[2]; - static std::shared_ptr directIoLib; + static thread_local std::shared_ptr directIoLib; static thread_local int currBufferIdx; static thread_local int nextBufferIdx; friend class DirectUringRandomAccessFile; diff --git a/cpp/pixels-common/lib/physical/BufferPool.cpp b/cpp/pixels-common/lib/physical/BufferPool.cpp index 4dcc7bdeab..d899e6ce1b 100644 --- a/cpp/pixels-common/lib/physical/BufferPool.cpp +++ b/cpp/pixels-common/lib/physical/BufferPool.cpp @@ -34,7 +34,7 @@ BufferPool::buffers[2]; // since we call switch function first. thread_local int BufferPool::currBufferIdx = 1; thread_local int BufferPool::nextBufferIdx = 0; -std::shared_ptr BufferPool::directIoLib; +thread_local std::shared_ptr BufferPool::directIoLib; void BufferPool::Initialize(std::vector colIds, std::vector bytes, std::vector columnNames) diff --git a/cpp/pixels-core/lib/PixelsFilter.cpp b/cpp/pixels-core/lib/PixelsFilter.cpp index 90248a3937..643715eedc 100644 --- a/cpp/pixels-core/lib/PixelsFilter.cpp +++ b/cpp/pixels-core/lib/PixelsFilter.cpp @@ -298,6 +298,9 @@ void PixelsFilter::ApplyFilter(std::shared_ptr vector, duckdb::Ta case duckdb::TableFilterType::IS_NULL: // TODO: support is null break; + case duckdb::TableFilterType::OPTIONAL_FILTER: + // nothing to do + return; default: D_ASSERT(0); break; diff --git a/cpp/pixels-cpp.properties b/cpp/pixels-cpp.properties index 0a65c450ac..82f6be2402 100644 --- a/cpp/pixels-cpp.properties +++ b/cpp/pixels-cpp.properties @@ -44,3 +44,7 @@ column.chunk.alignment=32 # for DuckDB, it is only effective when column.chunk.alignment also meets the alignment of the isNull bitmap isnull.bitmap.alignment=8 + + +# pixels.doublebuffer +pixels.doublebuffer=false diff --git a/cpp/pixels-duckdb/PixelsScanFunction.cpp b/cpp/pixels-duckdb/PixelsScanFunction.cpp index c8aec8b7ef..e7e0a7ba16 100644 --- a/cpp/pixels-duckdb/PixelsScanFunction.cpp +++ b/cpp/pixels-duckdb/PixelsScanFunction.cpp @@ -29,7 +29,7 @@ namespace duckdb { -bool PixelsScanFunction::enable_filter_pushdown = false; +bool PixelsScanFunction::enable_filter_pushdown = true; static idx_t PixelsScanGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, LocalTableFunctionState *local_state, @@ -63,8 +63,8 @@ TableFunctionSet PixelsScanFunction::GetFunctionSet() TableFunction table_function("pixels_scan", {LogicalType::VARCHAR}, PixelsScanImplementation, PixelsScanBind, PixelsScanInitGlobal, PixelsScanInitLocal); table_function.projection_pushdown = true; -// table_function.filter_pushdown = true; - //table_function.filter_prune = true; + table_function.filter_pushdown = true; + // table_function.filter_prune = true; enable_filter_pushdown = table_function.filter_pushdown; MultiFileReader::AddParameters(table_function); table_function.cardinality = PixelsCardinality; @@ -501,7 +501,12 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P scan_data.currReader->close(); } - ::BufferPool::Switch(); + if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="true") + { + ::BufferPool::Switch(); + } + // double/single buffer + scan_data.currReader = scan_data.nextReader; scan_data.currPixelsRecordReader = scan_data.nextPixelsRecordReader; // asyncReadComplete is not invoked in the first run (is_init_state = true) @@ -509,6 +514,12 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P { auto currPixelsRecordReader = std::static_pointer_cast( scan_data.currPixelsRecordReader); + if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="false") + { + //single buffer + currPixelsRecordReader->read(); + } + currPixelsRecordReader->asyncReadComplete((int) scan_data.column_names.size()); } if (scan_data.next_file_index < StorageInstance->getFileSum(scan_data.deviceID)) @@ -526,7 +537,13 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P scan_data.nextPixelsRecordReader = scan_data.nextReader->read(option); auto nextPixelsRecordReader = std::static_pointer_cast( scan_data.nextPixelsRecordReader); - nextPixelsRecordReader->read(); + + if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="true") + { + //double buffer + nextPixelsRecordReader->read(); + } + } else { scan_data.nextReader = nullptr; diff --git a/cpp/pixels-duckdb/duckdb b/cpp/pixels-duckdb/duckdb index c3dc6d34c9..e0af5da3aa 160000 --- a/cpp/pixels-duckdb/duckdb +++ b/cpp/pixels-duckdb/duckdb @@ -1 +1 @@ -Subproject commit c3dc6d34c905bc44f311bf670b1bbddef1c0c776 +Subproject commit e0af5da3aaddf16c7b54bc1acaba94d5c2bcdd73 diff --git a/cpp/testcase/README-zh.md b/cpp/testcase/README-zh.md new file mode 100644 index 0000000000..0d5942b20c --- /dev/null +++ b/cpp/testcase/README-zh.md @@ -0,0 +1,39 @@ +# 测试 +本目录存放了所有测试 + +## 运行脚本 +`process_sqls.py` 运行查询,需要传入benchmark参数指定要运行的benchmark,也需要指定要运行的查询 +```bash +usage: process_sqls.py [-h] [--runs RUNS] [--duckdb-bin DUCKDB_BIN] [--sql-dir SQL_DIR] + [--output-csv OUTPUT_CSV] [--wait-after-run WAIT_AFTER_RUN] + [--threads THREADS] [--benchmark BENCHMARK] [--benchmark-json BENCHMARK_JSON] + +DuckDB ClickBench Batch Test Script (Multi-column CSV, ensures resource release) + +options: + -h, --help show this help message and exit + --runs RUNS Number of runs per SQL file (default: 3) + --duckdb-bin DUCKDB_BIN + Path to duckdb executable + --sql-dir SQL_DIR Directory containing SQL files (only processes .sql files starting with 'q') + --output-csv OUTPUT_CSV + Path to output result CSV + --wait-after-run WAIT_AFTER_RUN + Seconds to wait after each run (ensures resource release, default: 2s) + --threads THREADS Number of threads to use in DuckDB (default: 96) + --benchmark BENCHMARK + Name of benchmark to use (must exist in benchmark JSON, e.g. clickbench- + pixels-e0) + --benchmark-json BENCHMARK_JSON + Path to benchmark configuration JSON file (default: ./benchmark.json) + +``` + +## I/O粒度测试 +`blk_stat.py`在执行`process_sqls.py`的同时,调用blktrace和blkprase读取底层块设备的I/O粒度,同时也需要注意运行的查询由`process_sql.py`内置 + +## 单/双buffer性能测试 +`single_doublebuffer_async_sync_test.py` 设置运行参数,执行单双buffer测试 + +## perf实验 + diff --git a/cpp/testcase/benchmark.json b/cpp/testcase/benchmark.json new file mode 100644 index 0000000000..092f183201 --- /dev/null +++ b/cpp/testcase/benchmark.json @@ -0,0 +1,15 @@ +{ + "tpch-pixels-e0":"", + "tpch-pixels-e1":"", + "tpch-pixels-e2":"", + "tpch-parquet-e0":"", + "tpch-parquet-e2":"", + "clickbench-parquet-e2":"", + "clickbench-parquet-e0":"CREATE VIEW hits AS SELECT * FROM parquet_scan([\n \"/data/9a3-01/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-02/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-03/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-04/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-05/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-06/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-07/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-08/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-09/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-10/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-11/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-12/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-13/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-14/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-15/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-16/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-17/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-18/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-19/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-20/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-21/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-22/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-23/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-24/clickbench/parquet-e0/hits/*\"\n ]\n);", + "clickbench-pixels-e2":"", + "clickbench-pixels-e0-24ssd":"CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-07/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-08/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-09/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-10/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-11/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-12/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-13/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-14/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-15/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-16/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-17/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-18/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-19/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-20/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-21/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-22/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-23/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-24/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);", + "clickbench-pixels-e1":"", + "clickbench-pixels-e0-1ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\"]);\n", + "clickbench-pixels-e0-6ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);", + "clickbench-pixels-e0-12ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-07/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-08/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-09/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-10/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-11/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-12/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);" +} \ No newline at end of file diff --git a/cpp/testcase/blk_stat.py b/cpp/testcase/blk_stat.py new file mode 100644 index 0000000000..cc53621d17 --- /dev/null +++ b/cpp/testcase/blk_stat.py @@ -0,0 +1,96 @@ +import subprocess +import time +import re +import csv +import argparse +from collections import Counter +import os # <-- 导入 os 模块 + + +def clear_page_cache(): + """Clear Linux page cache to ensure fair benchmarking""" + try: + print("🧹 Clearing Linux page cache...") + # Synchronize filesystem caches + subprocess.run(["sync"], check=True) + # Drop caches (3 clears pagecache, dentries, and inodes) + subprocess.run(["sudo", "bash", "-c", "echo 3 > /proc/sys/vm/drop_caches"], check=True) + print("✅ Page cache cleared successfully") + except subprocess.CalledProcessError as e: + print(f"⚠️ Failed to clear page cache: {e}") + + +# -------------------- 1️⃣ Parse Command Line Arguments -------------------- +parser = argparse.ArgumentParser(description="Monitor I/O granularity using blktrace and blkparse") +parser.add_argument("--benchmark", required=True, help="Benchmark name, used as output file prefix") +args = parser.parse_args() +benchmark_name = args.benchmark + +# -------------------- 2️⃣ Define Regex Pattern -------------------- +# Pattern for capturing I/O size (in sectors) and the process name +# The current pattern targets 'G' (Get request) operations. +pattern = re.compile(r"\sG\s+RA?\s+\d+\s+\+\s+(\d+)\s+\[(duckdb|iou-sqp-\d+)\]") + +# -------------------- 3️⃣ Start blktrace and blkparse Pipeline -------------------- +# blktrace monitors block device I/O on nvme0n1 and outputs raw data to stdout +blktrace_cmd = ["sudo", "blktrace", "-d", "/dev/nvme0n1","-o", "-"] +# blkparse reads raw data from stdin ('-') +blkparse_cmd = ["blkparse", "-i", "-"] + +p1 = subprocess.Popen(blktrace_cmd, stdout=subprocess.PIPE) +p2 = subprocess.Popen(blkparse_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, text=True) + +# -------------------- 4️⃣ Clear Page Cache -------------------- +clear_page_cache() + +# -------------------- 5️⃣ Start Benchmark Script (process_sqls.py) -------------------- +proc = subprocess.Popen(["python3", "process_sqls.py", "--runs", "1", "--benchmark", benchmark_name]) + +# -------------------- 6️⃣ Real-time I/O Granularity Collection -------------------- +counter = Counter() +print(f"📊 Collecting I/O traces while benchmark '{benchmark_name}' is running...") + +try: + # Read blkparse output line by line + for line in p2.stdout: + # Search for I/O size and process name using the defined pattern + match = pattern.search(line) + + if match: + # Group 1 is the I/O size in sectors + size = int(match.group(1)) + counter[size] += 1 + + # Check if the benchmark process (process_sqls) has finished + if proc.poll() is not None: + break +except KeyboardInterrupt: + print("⏹️ Stopped manually") + +# -------------------- 7️⃣ Terminate blktrace/blkparse -------------------- +p1.terminate() +p2.terminate() + +# -------------------- 8️⃣ Create Output Directory and Save Results -------------------- +output_dir = "io_results" +output_filename = os.path.join(output_dir, f"io_granularity_stats-{benchmark_name}.csv") # 使用 os.path.join 组合路径 + +# --- 检查并创建目录 --- +if not os.path.exists(output_dir): + print(f"📁 Output directory '{output_dir}' not found. Creating it...") + # recursively create directories if they don't exist + os.makedirs(output_dir) +# ---------------------- + +with open(output_filename, "w", newline="") as f: + writer = csv.writer(f) + # Write header: IO size in sectors, count of requests, and IO size converted to bytes (512 bytes/sector) + writer.writerow(["IO_Size_Sectors", "Count", "IO_Size_Bytes"]) + # Write sorted results + for s, c in sorted(counter.items()): + writer.writerow([s, c, s * 512]) + +print(f"✅ Results saved to {output_filename}") + + + diff --git a/cpp/testcase/generate_flamegraphs.sh b/cpp/testcase/generate_flamegraphs.sh new file mode 100755 index 0000000000..4676d81e4f --- /dev/null +++ b/cpp/testcase/generate_flamegraphs.sh @@ -0,0 +1,135 @@ +#!/bin/bash + +# ==================================================== +# Script Function: Batch generation of CPU/I/O/Scheduling related Flame Graphs (Perf + FlameGraph) +# Analysis Events: 1. cpu-clock, 2. page-faults, 3. branch-misses, 4. sched_switch/sched_stat_wait +# Distinction: Each event uses a different color palette (hot, mem, perf, chain) +# Usage: $0 +# Example: $0 test_q01.sql ../../build/release/duckdb ./results +# ==================================================== + +# --- Configure FlameGraph Path --- +FLAMEGRAPH_DIR="$HOME/FlameGraph" +STACKCOLLAPSE="${FLAMEGRAPH_DIR}/stackcollapse-perf.pl" +FLAMEGRAPH_PL="${FLAMEGRAPH_DIR}/flamegraph.pl" + +# ---------------------------------------------------- +# 1. Check Environment and Arguments +# ---------------------------------------------------- + +# Check FlameGraph dependencies +if [ ! -x "$STACKCOLLAPSE" ] || [ ! -x "$FLAMEGRAPH_PL" ]; then + echo "Error: Cannot find or execute FlameGraph tools (stackcollapse-perf.pl or flamegraph.pl)." + echo "Please ensure the FlameGraph repository is cloned to $HOME/FlameGraph." + exit 1 +fi + +# Check argument count +if [ "$#" -ne 3 ]; then + echo "Usage: $0 " + echo "Example: $0 test_q01.sql ../build/release/duckdb ./results" + exit 1 +fi + +# Receive arguments +SQL_FILE=$1 +DUCKDB_BINARY=$2 +OUTPUT_DIR=$3 + +# Check if DuckDB executable exists +if [ ! -x "$DUCKDB_BINARY" ]; then + echo "Error: Cannot find or execute ${DUCKDB_BINARY}" + exit 1 +fi + +# Create output directory (if it doesn't exist) +mkdir -p "$OUTPUT_DIR" +if [ ! -d "$OUTPUT_DIR" ]; then + echo "Error: Cannot create output directory ${OUTPUT_DIR}" + exit 1 +fi + +# Extract filename as prefix (e.g., test_q01) +FILENAME_PREFIX=$(basename "$SQL_FILE" .sql) + +echo "--- Starting Query Analysis: ${FILENAME_PREFIX} ---" +echo "Using executable: ${DUCKDB_BINARY}" +echo "Output directory: ${OUTPUT_DIR}" + +# ---------------------------------------------------- +# 2. Core Function Definition +# ---------------------------------------------------- + +# Function: Generate Flame Graph for specified event +# $1: Event Name (perf event name, e.g., cpu-clock, page-faults) +# $2: Friendly Event Name (e.g., CPU Time) +# $3: Output File Suffix (e.g., cpu_time, page_faults) +# $4: Color Palette Name (e.g., hot, mem, perf, chain) +function generate_flamegraph { + local EVENT_NAME="$1" + local FRIENDLY_NAME="$2" + local SUFFIX="$3" + local COLOR_PALETTE="$4" + + echo "" + echo "----------------------------------------------------" + echo "Start recording event: ${FRIENDLY_NAME} (${EVENT_NAME}) - Color: ${COLOR_PALETTE}" + echo "----------------------------------------------------" + + local DATA_FILE="${OUTPUT_DIR}/${FILENAME_PREFIX}_${SUFFIX}.data" + local PERF_TXT="${OUTPUT_DIR}/${FILENAME_PREFIX}_${SUFFIX}.perf.txt" + local FOLDED_FILE="${OUTPUT_DIR}/${FILENAME_PREFIX}_${SUFFIX}.folded" + local SVG_FILE="${OUTPUT_DIR}/${FILENAME_PREFIX}_${SUFFIX}.svg" + + # --- Record Data --- + # -e specifies event, --call-graph=dwarf captures call stack, -g enables call graph + sudo -E perf record --call-graph=dwarf -e "$EVENT_NAME" -g -o "$DATA_FILE" -F 10\ + -- "$DUCKDB_BINARY" < "$SQL_FILE" + + if [ $? -ne 0 ]; then + echo "[ERROR] perf record failed. Check permissions or perf installation." + return 1 + fi + + # --- Convert Data and Generate Flame Graph --- + echo "Converting ${FRIENDLY_NAME} data and generating Flame Graph..." + sudo perf script -i "$DATA_FILE" > "$PERF_TXT" + "$STACKCOLLAPSE" "$PERF_TXT" > "$FOLDED_FILE" + + # Add --color argument to specify color palette + "$FLAMEGRAPH_PL" --title="${FILENAME_PREFIX} ${FRIENDLY_NAME} Hotspots" --countname="$FRIENDLY_NAME" \ + --color="$COLOR_PALETTE" \ + "$FOLDED_FILE" > "$SVG_FILE" + echo "✅ ${FRIENDLY_NAME} Flame Graph generated: ${SVG_FILE}" + + # --- Cleanup Intermediate Files --- + echo "Cleaning up ${FRIENDLY_NAME} intermediate files..." + rm -f "$PERF_TXT" "$FOLDED_FILE" + sudo rm -f "$DATA_FILE" +} + +# ---------------------------------------------------- +# 3. Run Analysis (4 Flame Graphs total) +# ---------------------------------------------------- + +# 1. CPU Time Analysis (Standard CPU bottleneck analysis) +generate_flamegraph "cpu-clock" "CPU Time" "cpu_time" "hot" + +# 2. I/O Bottleneck Analysis (Related to memory access) +generate_flamegraph "page-faults" "Page Faults" "page_faults" "mem" + +# 3. Computational Efficiency Analysis (Related to pipeline) +generate_flamegraph "branch-misses" "Branch Misses" "branch_misses" "hot" + +# 4. Scheduling/Wait Bottleneck Analysis (Related to lock contention, context switching) +# Note: This event requires two perf event names +generate_flamegraph "sched:sched_switch,sched:sched_stat_wait" "Thread Scheduling" "sched" "chain" + + +# ---------------------------------------------------- +# 4. Task Summary +# ---------------------------------------------------- +echo "" +echo "--- Task Complete ---" +echo "Final result files (SVG/HTML) are in the ${OUTPUT_DIR} directory:" +find "$OUTPUT_DIR" -name "${FILENAME_PREFIX}_*.svg" \ No newline at end of file diff --git a/cpp/testcase/process_sqls.py b/cpp/testcase/process_sqls.py new file mode 100755 index 0000000000..38ce63b9e6 --- /dev/null +++ b/cpp/testcase/process_sqls.py @@ -0,0 +1,261 @@ +import os +import re +import subprocess +import csv +import time +import psutil +import json # Added: For parsing benchmark configuration files +from typing import List +import argparse + +# -------------------------- 1. Basic Configuration (Added default benchmark JSON path) -------------------------- +# Default path to benchmark configuration file (can be overridden via CLI parameter) +DEFAULT_BENCHMARK_JSON = "./benchmark.json" + + +def clear_page_cache(): + """Clear Linux page cache to ensure fair benchmarking""" + try: + print("🧹 Clearing Linux page cache...") + subprocess.run(["sync"], check=True) + subprocess.run(["sudo", "bash", "-c", "echo 3 > /proc/sys/vm/drop_caches"], check=True) + print("✅ Page cache cleared successfully") + except subprocess.CalledProcessError as e: + print(f"⚠️ Failed to clear page cache: {e}") + + +# -------------------------- 2. CLI Argument Parsing (Added benchmark-related parameters) -------------------------- +def parse_args(): + parser = argparse.ArgumentParser(description="DuckDB ClickBench Batch Test Script (Multi-column CSV, ensures resource release)") + parser.add_argument( + "--runs", + type=int, + default=3, + help="Number of runs per SQL file (default: 3)" + ) + parser.add_argument( + "--duckdb-bin", + type=str, + default="/home/whz/test/pixels/cpp/build/release/duckdb", + help="Path to duckdb executable" + ) + parser.add_argument( + "--sql-dir", + type=str, + default="/home/whz/test/pixels/cpp/pixels-duckdb/duckdb/benchmark/clickbench/queries-test", + help="Directory containing SQL files (only processes .sql files starting with 'q')" + ) + parser.add_argument( + "--output-csv", + type=str, + default="/home/whz/test/pixels/cpp/duckdb_benchmark_result.csv", + help="Path to output result CSV" + ) + parser.add_argument( + "--wait-after-run", + type=float, + default=2.0, + help="Seconds to wait after each run (ensures resource release, default: 2s)" + ) + parser.add_argument( + "--threads", + type=int, + default=96, + help="Number of threads to use in DuckDB (default: 96)" + ) + parser.add_argument( + "--benchmark", + type=str, + default="clickbench-pixels-e0-1ssd", + help="Name of benchmark to use (must exist in benchmark JSON, e.g. clickbench-pixels-e0)" + ) + parser.add_argument( + "--benchmark-json", + type=str, + default=DEFAULT_BENCHMARK_JSON, + help=f"Path to benchmark configuration JSON file (default: {DEFAULT_BENCHMARK_JSON})" + ) + return parser.parse_args() + + +# -------------------------- 3. Core Utility Functions -------------------------- +def get_sql_files(sql_dir: str) -> List[str]: + sql_files = [] + for filename in os.listdir(sql_dir): + if filename.endswith(".sql") and filename.startswith("q"): + sql_files.append(os.path.join(sql_dir, filename)) + sql_files.sort() + if not sql_files: + raise ValueError(f"No .sql files starting with 'q' found in {sql_dir}!") + return sql_files + + +def extract_real_time(duckdb_output: str) -> float: + pattern = r"Run Time \(s\): real (\d+\.\d+)" + match = re.search(pattern, duckdb_output, re.MULTILINE) + if not match: + raise ValueError(f"Failed to extract real time! Partial output:\n{duckdb_output[:500]}...") + return round(float(match.group(1)), 3) + + +def kill_remaining_duckdb(duckdb_bin: str): + duckdb_name = os.path.basename(duckdb_bin) + for proc in psutil.process_iter(['name', 'cmdline']): + try: + if (proc.info['name'] == duckdb_name) or (duckdb_bin in ' '.join(proc.info['cmdline'] or [])): + print(f"⚠️ Found residual {duckdb_name} process (PID: {proc.pid}), killing...") + proc.terminate() + try: + proc.wait(timeout=1) + except psutil.TimeoutExpired: + proc.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + +def load_benchmark_create_view(benchmark_json_path: str, benchmark_name: str) -> str: + if not os.path.exists(benchmark_json_path): + raise FileNotFoundError(f"Benchmark JSON file not found: {benchmark_json_path}") + + with open(benchmark_json_path, "r", encoding="utf-8") as f: + try: + benchmark_config = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse benchmark JSON: {str(e)}") + + if benchmark_name not in benchmark_config: + available_benchmarks = ", ".join(benchmark_config.keys()) + raise KeyError(f"Benchmark '{benchmark_name}' not found. Available benchmarks: {available_benchmarks}") + + create_view_sql = benchmark_config[benchmark_name].strip() + if not create_view_sql: + raise ValueError(f"CREATE VIEW SQL for benchmark '{benchmark_name}' is empty in JSON") + + return create_view_sql + + +def run_single_sql(duckdb_bin: str, create_view_sql: str, sql_content: str, wait_after_run: float, threads: int) -> float: + duckdb_commands = f"{create_view_sql}\nset threads={threads};\n\n.timer on\n{sql_content.strip()}\n.exit" + process = None + + try: + process = subprocess.Popen( + [duckdb_bin], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + + input_data = duckdb_commands.encode("utf-8") + stdout, _ = process.communicate(input=input_data, timeout=3600) + + output = stdout.decode("utf-8", errors="ignore") + + if process.returncode != 0: + raise RuntimeError(f"duckdb execution failed (code {process.returncode}):\n{output[:1000]}...") + + real_time = extract_real_time(output) + time.sleep(wait_after_run) + kill_remaining_duckdb(duckdb_bin) + return real_time + + except subprocess.TimeoutExpired: + if process: + process.kill() + raise RuntimeError("duckdb execution timed out (exceeded 1 hour)") from None + finally: + if process and process.poll() is None: + process.kill() + print("⚠️ Forcibly terminated non-exiting duckdb process") + + +def init_csv(output_csv: str, runs: int): + headers = ["SQL File Name"] + [f"Run {idx} Time (s)" for idx in range(1, runs + 1)] + with open(output_csv, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=headers) + writer.writeheader() + print(f"✅ Initialized multi-column CSV with headers: {','.join(headers)}") + + +def write_single_row(output_csv: str, sql_filename: str, run_times: List[float], runs: int): + row_data = {"SQL File Name": sql_filename} + for idx in range(1, runs + 1): + time_val = run_times[idx - 1] if (idx - 1) < len(run_times) else "" + row_data[f"Run {idx} Time (s)"] = time_val + with open(output_csv, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=row_data.keys()) + writer.writerow(row_data) + + +# -------------------------- 4. Main Logic -------------------------- +def main(): + args = parse_args() + print("=" * 70) + print("DuckDB ClickBench Batch Test Script (Resource Release Ensured)") + print(f"Config: {args.runs} runs per SQL, {args.wait_after_run}s wait after each run") + print(f"Benchmark: {args.benchmark} (from {args.benchmark_json})") + print(f"DuckDB path: {args.duckdb_bin}") + print(f"Threads: {args.threads}") + print(f"SQL directory: {args.sql_dir}") + print(f"Output CSV: {args.output_csv}") + print("=" * 70) + + # clear_page_cache() + + kill_remaining_duckdb(args.duckdb_bin) + try: + create_view_sql = load_benchmark_create_view(args.benchmark_json, args.benchmark) + print(f"✅ Loaded CREATE VIEW SQL for benchmark '{args.benchmark}'") + except (FileNotFoundError, KeyError, ValueError) as e: + print(f"\n❌ Benchmark initialization failed: {str(e)}") + return + + init_csv(args.output_csv, args.runs) + try: + sql_files = get_sql_files(args.sql_dir) + print(f"\n✅ Found {len(sql_files)} eligible SQL files:") + for i, f in enumerate(sql_files, 1): + print(f" {i:2d}. {os.path.basename(f)}") + except ValueError as e: + print(f"\n❌ Error: {e}") + return + + for sql_file in sql_files: + sql_filename = os.path.basename(sql_file).replace(".sql", "") + print(f"\n{'=' * 60}") + print(f"Processing: {sql_filename}.sql") + print(f"{'=' * 60}") + + try: + with open(sql_file, "r", encoding="utf-8") as f: + sql_content = f.read() + print(f"✅ Successfully read SQL file (content length: {len(sql_content)} chars)") + except Exception as e: + print(f"❌ Failed to read SQL file: {e}") + write_single_row(args.output_csv, sql_filename, [], args.runs) + continue + + run_times = [] + for run_idx in range(1, args.runs + 1): + print(f"\n--- Run {run_idx:2d}/{args.runs} ---") + clear_page_cache() + try: + real_time = run_single_sql(args.duckdb_bin, create_view_sql, sql_content, args.wait_after_run, args.threads) + run_times.append(real_time) + print(f"✅ Run successful, time: {real_time}s") + except (RuntimeError, ValueError) as e: + print(f"❌ Run failed: {e}") + continue + + write_single_row(args.output_csv, sql_filename, run_times, args.runs) + print(f"\n✅ Written to CSV: {sql_filename}.sql → Valid runs: {len(run_times)}/{args.runs}") + + kill_remaining_duckdb(args.duckdb_bin) + print(f"\n{'=' * 70}") + print("All SQL files processed!") + print(f"Multi-column CSV: {args.output_csv}") + print("=" * 70) + + +if __name__ == "__main__": + main() diff --git a/cpp/testcase/run_perf.py b/cpp/testcase/run_perf.py new file mode 100644 index 0000000000..38503691e1 --- /dev/null +++ b/cpp/testcase/run_perf.py @@ -0,0 +1,192 @@ +import subprocess +import os +import re +import argparse # Import argparse for command-line arguments + +# --- Configuration (Defaults & Constants) --- +# Default lists (can be overridden by command-line arguments) +# THREADS = [1, 2, 4, 8, 16, 24, 32, 48, 64, 96] +# THREADS=[24] +# QUERIES = ["q01", "q24", "q33"] +# SSD_MODES = ["1ssd", "24ssd"] +# SSD_MODES=["1ssd"] +# THREADS = [32, 48] +# # THREADS = [24] +# # QUERIES = ["q01"] +# QUERIES = ["q01"] +# SSD_MODES = ["1ssd"] +# THREADS = [ 96] +# THREADS=[24] +# QUERIES = ["q24"] +# SSD_MODES = ["24ssd"] +THREADS = [24] +QUERIES = ["q01", "q24", "q33"] +SSD_MODES = ["1ssd"] + +# Base 'perf stat' command: focusing on CPU and scheduling metrics +PERF_CMD_BASE = [ + "sudo", "-E", "perf", "stat", + "-e", "cycles,instructions,cache-references,cache-misses,branches,branch-misses", + "-e", "page-faults,minor-faults,major-faults", + "-e", "task-clock,context-switches" +] + +# Path to the DuckDB binary +DUCKDB_BINARY = "../build/release/duckdb" + +# FlameGraph Bash script path (updated for scheduling events) +FLAMEGRAPH_SCRIPT = "./generate_flamegraphs.sh" + + +def ensure_result_dir(result_dir): + """Create the results output directory""" + if not os.path.exists(result_dir): + os.makedirs(result_dir) + print(f"Created directory: {result_dir}") + + +def update_sql_thread(sql_dir, sql_filename, thread_value, result_dir): + """ + Replaces 'set threads=x;' in the SQL file with the specified value + and writes the content to a temporary file in the result directory. + """ + # Construct the full path to the base SQL file + sql_path = os.path.join(sql_dir, sql_filename) + + with open(sql_path, "r") as f: + content = f.read() + + # Replace or add set threads=x; + # Try to replace existing one, if not found, assume it goes at the start + if re.search(r"set\s+threads\s*=\s*\d+;", content): + new_content = re.sub(r"set\s+threads\s*=\s*\d+;", f"set threads={thread_value};", content) + else: + # If no 'set threads' line is found, add it to the beginning + new_content = f"set threads={thread_value};\n{content}" + + + # Temporary file name uses the result directory + tmp_path = os.path.join(result_dir, f"{os.path.basename(sql_filename)}.tmp_threads_{thread_value}.sql") + with open(tmp_path, "w") as f: + f.write(new_content) + + return tmp_path + + +def run_perf_stat_switches(query_file, query_name, ssd_mode, thread_value, result_dir): + """ + Executes perf stat, collects context switch metrics, and outputs the result + to a file in the results directory. + """ + output_name = f"{query_name}-{ssd_mode}-threads{thread_value}-context-stat.txt" + output_path = os.path.join(result_dir, output_name) + + # PERF_CMD_BASE is defined at the top of the script + cmd = PERF_CMD_BASE + ["-o", output_path, DUCKDB_BINARY] + + print(f"\n--- 1. Running perf stat for context switches: {output_name} ---") + + try: + with open(query_file, "r") as sql_f: + subprocess.run(cmd, stdin=sql_f, check=True) # Use check=True to ensure command execution success + print(f"==> perf stat output saved to: {output_path}") + except subprocess.CalledProcessError as e: + print(f"[ERROR] perf stat failed for {output_name}: {e}") + return False + except FileNotFoundError: + print(f"[ERROR] DuckDB binary not found at {DUCKDB_BINARY}") + return False + + return True + +def run_sched_flamegraph(tmp_sql_path, query_name, ssd_mode, thread_value, result_dir): + """ + Calls an external Bash script to generate a flame graph focused on scheduling events. + """ + if not os.path.exists(FLAMEGRAPH_SCRIPT): + print(f"[WARN] Schedule FlameGraph script not found: {FLAMEGRAPH_SCRIPT}. Skipping analysis.") + return + + print(f"\n--- 2. Running Schedule FlameGraph analysis ---") + + # Ensure the Bash script has execute permissions + if not os.access(FLAMEGRAPH_SCRIPT, os.X_OK): + print(f"[WARN] Adding execute permission to {FLAMEGRAPH_SCRIPT}") + os.chmod(FLAMEGRAPH_SCRIPT, 0o755) + + output_html_name = f"{query_name}-{ssd_mode}-threads{thread_value}-sched.svg" + output_html_path = os.path.join(result_dir, output_html_name) + + # Call the Bash script, passing the temporary SQL file, DuckDB binary path, and output HTML path + flamegraph_cmd = [FLAMEGRAPH_SCRIPT, tmp_sql_path, DUCKDB_BINARY, output_html_path] + + try: + subprocess.run(flamegraph_cmd, check=True) + print(f"==> Schedule FlameGraph saved to: {output_html_path}") + except subprocess.CalledProcessError as e: + print(f"[ERROR] Schedule FlameGraph script failed for {tmp_sql_path}: {e}") + except FileNotFoundError: + print(f"[ERROR] FlameGraph script file not found at {FLAMEGRAPH_SCRIPT}") + + +def main(): + parser = argparse.ArgumentParser(description="Run DuckDB benchmarks with perf stat and FlameGraph analysis.") + + # --- New required arguments for directories --- + parser.add_argument("--sql-dir", required=False, default="perf-pixels" ,help="Directory containing the base SQL query files (e.g., test-q01-1ssd.sql).") + parser.add_argument("--result-dir", required=False,default="perf-pixels", help="Directory where temporary files and final results (perf stats, SVG) will be saved.") + THREADS = [24] + QUERIES = ["q01", "q24", "q33"] + SSD_MODES = ["1ssd"] + # --- Optional arguments with defaults --- + parser.add_argument("--threads", type=int, nargs='+', default=THREADS, help=f"List of thread counts to test (default: {THREADS}).") + parser.add_argument("--queries", nargs='+', default=QUERIES, help=f"List of query IDs to test (default: {QUERIES}).") + parser.add_argument("--ssd-modes", nargs='+', default=SSD_MODES, help=f"List of SSD configurations (default: {SSD_MODES}).") + + args = parser.parse_args() + + # Assign parsed arguments to variables + RESULT_DIR = args.result_dir + SQL_DIR = args.sql_dir + THREADS = args.threads + QUERIES = args.queries + SSD_MODES = args.ssd_modes + + # Ensure the result directory exists + ensure_result_dir(RESULT_DIR) + + for q in QUERIES: + for mode in SSD_MODES: + + sql_file_base = f"test-{q}-{mode}.sql" + # Check for the file in the specified SQL_DIR + sql_file_path_check = os.path.join(SQL_DIR, sql_file_base) + + if not os.path.exists(sql_file_path_check): + print(f"[WARN] Base SQL file not found: {sql_file_path_check}, skipping") + continue + + for t in THREADS: + print("=" * 50) + print(f"Starting analysis for Q={q}, Mode={mode}, Threads={t}") + + # 1. Update threads and create temporary SQL file in RESULT_DIR + tmp_sql = update_sql_thread( + SQL_DIR, sql_file_base, t, RESULT_DIR + ) + + # 2. Run perf stat to collect context switch metrics + success = run_perf_stat_switches(tmp_sql, q, mode, t, RESULT_DIR) + + # 3. If perf stat succeeded, run scheduling FlameGraph analysis + if success: + run_sched_flamegraph(tmp_sql, q, mode, t, RESULT_DIR) + + # 4. Clean up temporary SQL file + os.remove(tmp_sql) + print(f"Cleaned up temporary SQL file: {tmp_sql}") + print("=" * 50) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cpp/testcase/single_doublebuffer_async_sync_test.py b/cpp/testcase/single_doublebuffer_async_sync_test.py new file mode 100644 index 0000000000..bbf1e575f6 --- /dev/null +++ b/cpp/testcase/single_doublebuffer_async_sync_test.py @@ -0,0 +1,139 @@ +import subprocess +import os +import re +import shutil + +# ------------------------------------- +# 1. Configuration Parameters +# ------------------------------------- +# threads_list = [1, 2, 4, 8, 16, 24, 48, 64, 96] +threads_list = [16, 24] +benchmarks = [ + "clickbench-pixels-e0-1ssd", + # "clickbench-pixels-e0-6ssd", + # "clickbench-pixels-e0-12ssd", + # "clickbench-pixels-e0-24ssd" +] + +runs = 1 +# Define all Buffer Modes to be tested +buffer_modes = ["doublebuffer", "singlebuffer"] +properties_path = os.path.expanduser("~/opt/pixels/etc/pixels-cpp.properties") +process_script = "process_sqls.py" + +# Root directory for saving results +output_root = "single_double_buffer_results" + +# ------------------------------------- +# 2. Core Modification: Buffer Mode Switching Function +# ------------------------------------- +def set_buffer_mode(mode): + """Modify the 'pixels.doublebuffer' parameter in pixels-cpp.properties""" + assert mode in ("doublebuffer", "singlebuffer") + + if not os.path.exists(properties_path): + raise FileNotFoundError(f"Configuration file not found: {properties_path}") + + with open(properties_path, "r") as f: + lines = f.readlines() + + new_lines = [] + changed = False + + # Determine the value to set + new_value = "true" if mode == "doublebuffer" else "false" + + for line in lines: + if line.strip().startswith("pixels.doublebuffer"): + # Find and replace this line + new_lines.append(f"pixels.doublebuffer={new_value}\n") + changed = True + else: + new_lines.append(line) + + # If the line was not found in the file, append it at the end + if not changed: + new_lines.append(f"pixels.doublebuffer={new_value}\n") + + with open(properties_path, "w") as f: + f.writelines(new_lines) + + print(f"🔄 Buffer mode switched to: {mode.upper()}") + + +# ------------------------------------- +# 3. IO Mode Switching Function (Unchanged logic) +# ------------------------------------- +def set_io_mode(mode): + """Modify the 'localfs.enable.async.io' parameter in pixels-cpp.properties""" + assert mode in ("async", "sync") + + if not os.path.exists(properties_path): + raise FileNotFoundError(f"Configuration file not found: {properties_path}") + + with open(properties_path, "r") as f: + lines = f.readlines() + + new_lines = [] + changed = False + for line in lines: + if line.startswith("localfs.enable.async.io"): + new_value = "true" if mode == "async" else "false" + new_lines.append(f"localfs.enable.async.io={new_value}\n") + changed = True + else: + new_lines.append(line) + + if not changed: + new_value = "true" if mode == "async" else "false" + new_lines.append(f"localfs.enable.async.io={new_value}\n") + + with open(properties_path, "w") as f: + f.writelines(new_lines) + + print(f"🔧 IO mode switched to: {mode.upper()}") + +# ------------------------------------- +# 4. Nested Loop for Test Execution +# ------------------------------------- +for buffer_mode in buffer_modes: + print(f"\n===========================================") + print(f"🚀 Starting Test for Buffer Mode: {buffer_mode.upper()}") + print(f"===========================================") + set_buffer_mode(buffer_mode) # <-- Set the current Buffer Mode + + for io_mode in ["sync", "async"]: + print(f"\n======= Switching to {io_mode.upper()} mode =======") + set_io_mode(io_mode) + + for benchmark in benchmarks: + print(f"\n===== Benchmark: {benchmark} ({buffer_mode}/{io_mode}) =====\n") + + # Create an isolated directory: output_root/benchmark/buffer_mode/io_mode/ + benchmark_dir = os.path.join(output_root, benchmark, buffer_mode, io_mode) + os.makedirs(benchmark_dir, exist_ok=True) + print(f"📁 Directory created: {benchmark_dir}") + + for t in threads_list: + output_csv = os.path.join( + benchmark_dir, + f"duckdb_benchmark_result-{buffer_mode}-{io_mode}-{t}threads.csv" + ) + + cmd = [ + "python", + process_script, + "--benchmark", benchmark, + "--runs", str(runs), + "--output-csv", output_csv, + "--threads", str(t), + ] + + print(f"\n▶ Executing: {benchmark}, Buffer={buffer_mode}, IO={io_mode}, {t} threads") + print("Command:", " ".join(cmd)) + + subprocess.run(cmd, check=True) + + print(f"✔ Completed: {output_csv}\n") + +print("\n🎉 All tasks (doublebuffer/singlebuffer, sync/async) completed successfully!")