Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
[submodule "cpp/third-party/googletest"]
path = cpp/third-party/googletest
url = git@github.com:google/googletest.git
[submodule "cpp/FlameGraph"]
path = cpp/FlameGraph
url = git@github.com:brendangregg/FlameGraph.git
5 changes: 5 additions & 0 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ plot
**/cmake-build-debug
**/CMakeCache.txt
**/CMakeFiles
# remove perf svg
cpp/testcase/perf-*/*.svg/*.svg
*.csv
*.txt
*.svg
1 change: 1 addition & 0 deletions cpp/FlameGraph
Submodule FlameGraph added at 41fee1
2 changes: 1 addition & 1 deletion cpp/pixels-common/include/physical/BufferPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class BufferPool
static thread_local bool isInitialized;
static thread_local std::map<uint32_t, std::shared_ptr < ByteBuffer>>
buffers[2];
static std::shared_ptr <DirectIoLib> directIoLib;
static thread_local std::shared_ptr <DirectIoLib> directIoLib;
static thread_local int currBufferIdx;
static thread_local int nextBufferIdx;
friend class DirectUringRandomAccessFile;
Expand Down
2 changes: 1 addition & 1 deletion cpp/pixels-common/lib/physical/BufferPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ BufferPool::buffers[2];
// since we call switch function first.
thread_local int BufferPool::currBufferIdx = 1;
thread_local int BufferPool::nextBufferIdx = 0;
std::shared_ptr <DirectIoLib> BufferPool::directIoLib;
thread_local std::shared_ptr <DirectIoLib> BufferPool::directIoLib;

void BufferPool::Initialize(std::vector <uint32_t> colIds, std::vector <uint64_t> bytes,
std::vector <std::string> columnNames)
Expand Down
3 changes: 3 additions & 0 deletions cpp/pixels-core/lib/PixelsFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ void PixelsFilter::ApplyFilter(std::shared_ptr <ColumnVector> vector, duckdb::Ta
case duckdb::TableFilterType::IS_NULL:
// TODO: support is null
break;
case duckdb::TableFilterType::OPTIONAL_FILTER:
// nothing to do
return;
default:
D_ASSERT(0);
break;
Expand Down
4 changes: 4 additions & 0 deletions cpp/pixels-cpp.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ column.chunk.alignment=32

# for DuckDB, it is only effective when column.chunk.alignment also meets the alignment of the isNull bitmap
isnull.bitmap.alignment=8


# pixels.doublebuffer
pixels.doublebuffer=false
27 changes: 22 additions & 5 deletions cpp/pixels-duckdb/PixelsScanFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
namespace duckdb
{

bool PixelsScanFunction::enable_filter_pushdown = false;
bool PixelsScanFunction::enable_filter_pushdown = true;

static idx_t PixelsScanGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
LocalTableFunctionState *local_state,
Expand Down Expand Up @@ -63,8 +63,8 @@ TableFunctionSet PixelsScanFunction::GetFunctionSet()
TableFunction table_function("pixels_scan", {LogicalType::VARCHAR}, PixelsScanImplementation, PixelsScanBind,
PixelsScanInitGlobal, PixelsScanInitLocal);
table_function.projection_pushdown = true;
// table_function.filter_pushdown = true;
//table_function.filter_prune = true;
table_function.filter_pushdown = true;
// table_function.filter_prune = true;
enable_filter_pushdown = table_function.filter_pushdown;
MultiFileReader::AddParameters(table_function);
table_function.cardinality = PixelsCardinality;
Expand Down Expand Up @@ -501,14 +501,25 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P
scan_data.currReader->close();
}

::BufferPool::Switch();
if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="true")
{
::BufferPool::Switch();
}
// double/single buffer

scan_data.currReader = scan_data.nextReader;
scan_data.currPixelsRecordReader = scan_data.nextPixelsRecordReader;
// asyncReadComplete is not invoked in the first run (is_init_state = true)
if (scan_data.currPixelsRecordReader != nullptr)
{
auto currPixelsRecordReader = std::static_pointer_cast<PixelsRecordReaderImpl>(
scan_data.currPixelsRecordReader);
if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="false")
{
//single buffer
currPixelsRecordReader->read();
}

currPixelsRecordReader->asyncReadComplete((int) scan_data.column_names.size());
}
if (scan_data.next_file_index < StorageInstance->getFileSum(scan_data.deviceID))
Expand All @@ -526,7 +537,13 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P
scan_data.nextPixelsRecordReader = scan_data.nextReader->read(option);
auto nextPixelsRecordReader = std::static_pointer_cast<PixelsRecordReaderImpl>(
scan_data.nextPixelsRecordReader);
nextPixelsRecordReader->read();

if (ConfigFactory::Instance().getProperty("pixels.doublebuffer")=="true")
{
//double buffer
nextPixelsRecordReader->read();
}

} else
{
scan_data.nextReader = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion cpp/pixels-duckdb/duckdb
Submodule duckdb updated 228 files
39 changes: 39 additions & 0 deletions cpp/testcase/README-zh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# 测试
本目录存放了所有测试

## 运行脚本
`process_sqls.py` 运行查询,需要传入benchmark参数指定要运行的benchmark,也需要指定要运行的查询
```bash
usage: process_sqls.py [-h] [--runs RUNS] [--duckdb-bin DUCKDB_BIN] [--sql-dir SQL_DIR]
[--output-csv OUTPUT_CSV] [--wait-after-run WAIT_AFTER_RUN]
[--threads THREADS] [--benchmark BENCHMARK] [--benchmark-json BENCHMARK_JSON]

DuckDB ClickBench Batch Test Script (Multi-column CSV, ensures resource release)

options:
-h, --help show this help message and exit
--runs RUNS Number of runs per SQL file (default: 3)
--duckdb-bin DUCKDB_BIN
Path to duckdb executable
--sql-dir SQL_DIR Directory containing SQL files (only processes .sql files starting with 'q')
--output-csv OUTPUT_CSV
Path to output result CSV
--wait-after-run WAIT_AFTER_RUN
Seconds to wait after each run (ensures resource release, default: 2s)
--threads THREADS Number of threads to use in DuckDB (default: 96)
--benchmark BENCHMARK
Name of benchmark to use (must exist in benchmark JSON, e.g. clickbench-
pixels-e0)
--benchmark-json BENCHMARK_JSON
Path to benchmark configuration JSON file (default: ./benchmark.json)

```

## I/O粒度测试
`blk_stat.py`在执行`process_sqls.py`的同时,调用blktrace和blkprase读取底层块设备的I/O粒度,同时也需要注意运行的查询由`process_sql.py`内置

## 单/双buffer性能测试
`single_doublebuffer_async_sync_test.py` 设置运行参数,执行单双buffer测试

## perf实验

15 changes: 15 additions & 0 deletions cpp/testcase/benchmark.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"tpch-pixels-e0":"",
"tpch-pixels-e1":"",
"tpch-pixels-e2":"",
"tpch-parquet-e0":"",
"tpch-parquet-e2":"",
"clickbench-parquet-e2":"",
"clickbench-parquet-e0":"CREATE VIEW hits AS SELECT * FROM parquet_scan([\n \"/data/9a3-01/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-02/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-03/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-04/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-05/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-06/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-07/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-08/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-09/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-10/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-11/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-12/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-13/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-14/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-15/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-16/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-17/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-18/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-19/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-20/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-21/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-22/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-23/clickbench/parquet-e0/hits/*\",\n \"/data/9a3-24/clickbench/parquet-e0/hits/*\"\n ]\n);",
"clickbench-pixels-e2":"",
"clickbench-pixels-e0-24ssd":"CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-07/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-08/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-09/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-10/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-11/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-12/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-13/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-14/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-15/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-16/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-17/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-18/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-19/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-20/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-21/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-22/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-23/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-24/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);",
"clickbench-pixels-e1":"",
"clickbench-pixels-e0-1ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\"]);\n",
"clickbench-pixels-e0-6ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);",
"clickbench-pixels-e0-12ssd": "CREATE VIEW hits AS SELECT * FROM pixels_scan([\n \"/data/9a3-01/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-02/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-03/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-04/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-05/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-06/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-07/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-08/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-09/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-10/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-11/clickbench/pixels-e0/hits/v-0-ordered/*\",\n \"/data/9a3-12/clickbench/pixels-e0/hits/v-0-ordered/*\"\n ]\n);"
}
96 changes: 96 additions & 0 deletions cpp/testcase/blk_stat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import subprocess
import time
import re
import csv
import argparse
from collections import Counter
import os # <-- 导入 os 模块


def clear_page_cache():
"""Clear Linux page cache to ensure fair benchmarking"""
try:
print("🧹 Clearing Linux page cache...")
# Synchronize filesystem caches
subprocess.run(["sync"], check=True)
# Drop caches (3 clears pagecache, dentries, and inodes)
subprocess.run(["sudo", "bash", "-c", "echo 3 > /proc/sys/vm/drop_caches"], check=True)
print("✅ Page cache cleared successfully")
except subprocess.CalledProcessError as e:
print(f"⚠️ Failed to clear page cache: {e}")


# -------------------- 1️⃣ Parse Command Line Arguments --------------------
parser = argparse.ArgumentParser(description="Monitor I/O granularity using blktrace and blkparse")
parser.add_argument("--benchmark", required=True, help="Benchmark name, used as output file prefix")
args = parser.parse_args()
benchmark_name = args.benchmark

# -------------------- 2️⃣ Define Regex Pattern --------------------
# Pattern for capturing I/O size (in sectors) and the process name
# The current pattern targets 'G' (Get request) operations.
pattern = re.compile(r"\sG\s+RA?\s+\d+\s+\+\s+(\d+)\s+\[(duckdb|iou-sqp-\d+)\]")

# -------------------- 3️⃣ Start blktrace and blkparse Pipeline --------------------
# blktrace monitors block device I/O on nvme0n1 and outputs raw data to stdout
blktrace_cmd = ["sudo", "blktrace", "-d", "/dev/nvme0n1","-o", "-"]
# blkparse reads raw data from stdin ('-')
blkparse_cmd = ["blkparse", "-i", "-"]

p1 = subprocess.Popen(blktrace_cmd, stdout=subprocess.PIPE)
p2 = subprocess.Popen(blkparse_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, text=True)

# -------------------- 4️⃣ Clear Page Cache --------------------
clear_page_cache()

# -------------------- 5️⃣ Start Benchmark Script (process_sqls.py) --------------------
proc = subprocess.Popen(["python3", "process_sqls.py", "--runs", "1", "--benchmark", benchmark_name])

# -------------------- 6️⃣ Real-time I/O Granularity Collection --------------------
counter = Counter()
print(f"📊 Collecting I/O traces while benchmark '{benchmark_name}' is running...")

try:
# Read blkparse output line by line
for line in p2.stdout:
# Search for I/O size and process name using the defined pattern
match = pattern.search(line)

if match:
# Group 1 is the I/O size in sectors
size = int(match.group(1))
counter[size] += 1

# Check if the benchmark process (process_sqls) has finished
if proc.poll() is not None:
break
except KeyboardInterrupt:
print("⏹️ Stopped manually")

# -------------------- 7️⃣ Terminate blktrace/blkparse --------------------
p1.terminate()
p2.terminate()

# -------------------- 8️⃣ Create Output Directory and Save Results --------------------
output_dir = "io_results"
output_filename = os.path.join(output_dir, f"io_granularity_stats-{benchmark_name}.csv") # 使用 os.path.join 组合路径

# --- 检查并创建目录 ---
if not os.path.exists(output_dir):
print(f"📁 Output directory '{output_dir}' not found. Creating it...")
# recursively create directories if they don't exist
os.makedirs(output_dir)
# ----------------------

with open(output_filename, "w", newline="") as f:
writer = csv.writer(f)
# Write header: IO size in sectors, count of requests, and IO size converted to bytes (512 bytes/sector)
writer.writerow(["IO_Size_Sectors", "Count", "IO_Size_Bytes"])
# Write sorted results
for s, c in sorted(counter.items()):
writer.writerow([s, c, s * 512])

print(f"✅ Results saved to {output_filename}")



Loading