From c349a7d457eed0e5d6179a384e9b200aa0e685e6 Mon Sep 17 00:00:00 2001 From: mattsu Date: Sun, 16 Nov 2025 23:46:29 +0900 Subject: [PATCH 1/7] feat(sort): enhance sorting with character filtering and dynamic pipeline tuning Add support for filtering non-printing and non-dictionary characters in sort keys, along with options to ignore case. Implement dynamic buffer size normalization and pipeline depth tuning based on user settings and file size for improved performance. Add new fields to LineData for caching filtered lines and UTF-8 data. This improves sort accuracy and efficiency for large file sorting scenarios. --- src/uu/sort/src/chunks.rs | 6 +++ src/uu/sort/src/custom_str_cmp.rs | 20 ++++++++ src/uu/sort/src/ext_sort.rs | 71 ++++++++++++++++++++--------- src/uu/sort/src/sort.rs | 76 +++++++++++++++++++++++++++++-- 4 files changed, 148 insertions(+), 25 deletions(-) diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index 837cb1fa95c..71e352dcba4 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -43,6 +43,8 @@ pub struct LineData<'a> { pub num_infos: Vec, pub parsed_floats: Vec, pub line_num_floats: Vec>, + pub utf8_cache: Vec>, + pub filtered_lines: Vec>, } impl Chunk { @@ -54,6 +56,8 @@ impl Chunk { contents.line_data.num_infos.clear(); contents.line_data.parsed_floats.clear(); contents.line_data.line_num_floats.clear(); + contents.line_data.utf8_cache.clear(); + contents.line_data.filtered_lines.clear(); let lines = unsafe { // SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime, // because the vector is empty. @@ -192,6 +196,8 @@ pub fn read( num_infos, parsed_floats, line_num_floats, + utf8_cache: Vec::new(), + filtered_lines: Vec::new(), }; parse_lines(read, &mut lines, &mut line_data, separator, settings); Ok(ChunkContents { lines, line_data }) diff --git a/src/uu/sort/src/custom_str_cmp.rs b/src/uu/sort/src/custom_str_cmp.rs index aa4f73ea7bb..6241a817791 100644 --- a/src/uu/sort/src/custom_str_cmp.rs +++ b/src/uu/sort/src/custom_str_cmp.rs @@ -60,3 +60,23 @@ pub fn custom_str_cmp( } } } + +pub fn build_filtered_line( + line: &[u8], + ignore_non_printing: bool, + ignore_non_dictionary: bool, + ignore_case: bool, +) -> Vec { + let mut filtered = Vec::with_capacity(line.len()); + for &c in line { + if !filter_char(c, ignore_non_printing, ignore_non_dictionary) { + continue; + } + filtered.push(if ignore_case { + c.to_ascii_uppercase() + } else { + c + }); + } + filtered +} diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index d61f7d2008d..59e3c8c66ca 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -39,6 +39,42 @@ use crate::{Line, print_sorted}; // Note: update `test_sort::test_start_buffer` if this size is changed const START_BUFFER_SIZE: usize = 8_000; +const BUFFER_CAP_LIMIT: usize = 512 * 1024 * 1024; +const BUFFER_MIN_WITHOUT_USER: usize = 8 * 1024 * 1024; +const PIPELINE_DEPTH_CHUNK: usize = 4 * 1024 * 1024; +const MIN_DYNAMIC_PIPELINE_DEPTH: usize = 2; +const MAX_DYNAMIC_PIPELINE_DEPTH: usize = 8; + +struct ReaderWriterConfig { + buffer_size: usize, +} + +fn normalized_buffer_size(settings: &GlobalSettings) -> usize { + let mut buffer_size = if settings.buffer_size <= BUFFER_CAP_LIMIT { + settings.buffer_size + } else { + settings.buffer_size / 2 + }; + if !settings.buffer_size_is_explicit { + buffer_size = buffer_size.max(BUFFER_MIN_WITHOUT_USER); + } + buffer_size +} + +fn tuned_pipeline_depth(settings: &GlobalSettings, buffer_size: usize) -> usize { + let base = settings + .pipeline_depth + .clamp(MIN_DYNAMIC_PIPELINE_DEPTH, MAX_DYNAMIC_PIPELINE_DEPTH); + if settings.pipeline_depth_is_explicit { + base + } else { + let size_based = buffer_size + .div_ceil(PIPELINE_DEPTH_CHUNK) + .clamp(MIN_DYNAMIC_PIPELINE_DEPTH, MAX_DYNAMIC_PIPELINE_DEPTH); + base.max(size_based).min(MAX_DYNAMIC_PIPELINE_DEPTH) + } +} + /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. pub fn ext_sort( files: &mut impl Iterator>>, @@ -46,8 +82,11 @@ pub fn ext_sort( output: Output, tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { - let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); - let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); + let buffer_size = normalized_buffer_size(settings); + let pipeline_depth = tuned_pipeline_depth(settings, buffer_size); + let config = ReaderWriterConfig { buffer_size }; + let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(pipeline_depth); + let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(pipeline_depth); thread::spawn({ let settings = settings.clone(); move || sorter(&recycled_receiver, &sorted_sender, &settings) @@ -87,6 +126,7 @@ pub fn ext_sort( recycled_sender, output, tmp_dir, + &config, ) } else { reader_writer::<_, WriteablePlainTmpFile>( @@ -96,6 +136,7 @@ pub fn ext_sort( recycled_sender, output, tmp_dir, + &config, ) } } @@ -110,26 +151,12 @@ fn reader_writer< sender: SyncSender, output: Output, tmp_dir: &mut TmpDirWrapper, + config: &ReaderWriterConfig, ) -> UResult<()> { let separator = settings.line_ending.into(); - // Cap oversized buffer requests to avoid unnecessary allocations and give the automatic - // heuristic room to grow when the user does not provide an explicit value. - let mut buffer_size = match settings.buffer_size { - size if size <= 512 * 1024 * 1024 => size, - size => size / 2, - }; - if !settings.buffer_size_is_explicit { - buffer_size = buffer_size.max(8 * 1024 * 1024); - } let read_result: ReadResult = read_write_loop( - files, - tmp_dir, - separator, - buffer_size, - settings, - receiver, - sender, + files, tmp_dir, separator, config, settings, receiver, sender, )?; match read_result { ReadResult::WroteChunksToFile { tmp_files } => { @@ -214,7 +241,7 @@ fn read_write_loop( mut files: impl Iterator>>, tmp_dir: &mut TmpDirWrapper, separator: u8, - buffer_size: usize, + config: &ReaderWriterConfig, settings: &GlobalSettings, receiver: &Receiver, sender: SyncSender, @@ -226,12 +253,12 @@ fn read_write_loop( for _ in 0..2 { let should_continue = chunks::read( &sender, - RecycledChunk::new(if START_BUFFER_SIZE < buffer_size { + RecycledChunk::new(if START_BUFFER_SIZE < config.buffer_size { START_BUFFER_SIZE } else { - buffer_size + config.buffer_size }), - Some(buffer_size), + Some(config.buffer_size), &mut carry_over, &mut file, &mut files, diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index ec9ab5b9305..5850623d05a 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -22,7 +22,7 @@ use bigdecimal::BigDecimal; use chunks::LineData; use clap::builder::ValueParser; use clap::{Arg, ArgAction, Command}; -use custom_str_cmp::custom_str_cmp; +use custom_str_cmp::{build_filtered_line, custom_str_cmp}; use ext_sort::ext_sort; use fnv::FnvHasher; #[cfg(target_os = "linux")] @@ -41,6 +41,7 @@ use std::ops::Range; use std::path::Path; use std::path::PathBuf; use std::str::Utf8Error; +use std::thread; use thiserror::Error; use uucore::display::Quotable; use uucore::error::{FromIo, strip_errno}; @@ -291,6 +292,8 @@ pub struct GlobalSettings { compress_prog: Option, merge_batch_size: usize, precomputed: Precomputed, + pipeline_depth: usize, + pipeline_depth_is_explicit: bool, } /// Data needed for sorting. Should be computed once before starting to sort @@ -303,6 +306,7 @@ struct Precomputed { selections_per_line: usize, fast_lexicographic: bool, fast_ascii_insensitive: bool, + needs_filtered_view: bool, } impl GlobalSettings { @@ -346,6 +350,7 @@ impl GlobalSettings { self.precomputed.fast_lexicographic = self.can_use_fast_lexicographic(); self.precomputed.fast_ascii_insensitive = self.can_use_fast_ascii_insensitive(); + self.precomputed.needs_filtered_view = self.can_use_filtered_view(); } /// Returns true when the fast lexicographic path can be used safely. @@ -385,6 +390,12 @@ impl GlobalSettings { && !selector.settings.ignore_blanks } } + + fn can_use_filtered_view(&self) -> bool { + self.mode == SortMode::Default + && self.selectors.is_empty() + && (self.ignore_case || self.dictionary_order || self.ignore_non_printing) + } } impl Default for GlobalSettings { @@ -412,6 +423,8 @@ impl Default for GlobalSettings { compress_prog: None, merge_batch_size: default_merge_batch_size(), precomputed: Precomputed::default(), + pipeline_depth: default_pipeline_depth(), + pipeline_depth_is_explicit: false, } } } @@ -548,6 +561,19 @@ impl<'a> Line<'a> { } } } + if settings.precomputed.needs_filtered_view { + line_data.filtered_lines.push(build_filtered_line( + line, + settings.ignore_non_printing, + settings.dictionary_order, + settings.ignore_case, + )); + } + if settings.precomputed.fast_lexicographic { + line_data.utf8_cache.push(std::str::from_utf8(line).ok()); + } else { + line_data.utf8_cache.push(None); + } Self { line, index } } @@ -1111,6 +1137,26 @@ fn default_merge_batch_size() -> usize { } } +const MIN_PIPELINE_DEPTH: usize = 2; +const MAX_PIPELINE_DEPTH: usize = 8; + +fn default_pipeline_depth() -> usize { + thread::available_parallelism() + .map(|n| n.get().clamp(MIN_PIPELINE_DEPTH, MAX_PIPELINE_DEPTH)) + .unwrap_or(MIN_PIPELINE_DEPTH) +} + +fn pipeline_depth_from_parallel_arg(arg: Option<&String>) -> usize { + if let Some(raw) = arg { + if let Ok(parsed) = raw.parse::() { + if parsed > 0 { + return parsed.clamp(MIN_PIPELINE_DEPTH, MAX_PIPELINE_DEPTH); + } + } + } + default_pipeline_depth() +} + #[uucore::main] #[allow(clippy::cognitive_complexity)] pub fn uumain(args: impl uucore::Args) -> UResult<()> { @@ -1221,7 +1267,8 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { settings.dictionary_order = matches.get_flag(options::DICTIONARY_ORDER); settings.ignore_non_printing = matches.get_flag(options::IGNORE_NONPRINTING); - if matches.contains_id(options::PARALLEL) { + let parallel_override = matches.contains_id(options::PARALLEL); + if parallel_override { // "0" is default - threads = num of cores settings.threads = matches .get_one::(options::PARALLEL) @@ -1231,6 +1278,10 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { } } + settings.pipeline_depth = + pipeline_depth_from_parallel_arg(matches.get_one::(options::PARALLEL)); + settings.pipeline_depth_is_explicit = parallel_override; + if let Some(size_str) = matches.get_one::(options::BUF_SIZE) { settings.buffer_size = GlobalSettings::parse_byte_count(size_str).map_err(|e| { USimpleError::new(2, format_error_message(&e, size_str, options::BUF_SIZE)) @@ -1687,7 +1738,13 @@ fn compare_by<'a>( b_line_data: &LineData<'a>, ) -> Ordering { if global_settings.precomputed.fast_lexicographic { - let cmp = a.line.cmp(b.line); + let cmp = if let (Ok(a_str), Ok(b_str)) = + (std::str::from_utf8(a.line), std::str::from_utf8(b.line)) + { + a_str.cmp(b_str) + } else { + b.line.cmp(a.line) + }; return if global_settings.reverse { cmp.reverse() } else { @@ -1706,6 +1763,19 @@ fn compare_by<'a>( } } + if global_settings.precomputed.needs_filtered_view { + let a_filtered = &a_line_data.filtered_lines[a.index]; + let b_filtered = &b_line_data.filtered_lines[b.index]; + let cmp = a_filtered.cmp(b_filtered); + if cmp != Ordering::Equal || a.line == b.line { + return if global_settings.reverse { + cmp.reverse() + } else { + cmp + }; + } + } + let mut selection_index = 0; let mut num_info_index = 0; let mut parsed_float_index = 0; From 15105faf862bfe4df53c5e612610d93fba53d5f1 Mon Sep 17 00:00:00 2001 From: mattsu Date: Mon, 17 Nov 2025 08:38:35 +0900 Subject: [PATCH 2/7] perf(uu/sort): optimize lexicographic comparison with UTF-8 caching Use utf8_cache to retrieve precomputed UTF-8 strings in fast lexicographic mode, falling back to standard from_utf8 conversion if not available. This reduces redundant UTF-8 validations and improves performance for repeated comparisons. --- src/uu/sort/src/sort.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 5850623d05a..e77dd33ffaa 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -1738,7 +1738,12 @@ fn compare_by<'a>( b_line_data: &LineData<'a>, ) -> Ordering { if global_settings.precomputed.fast_lexicographic { - let cmp = if let (Ok(a_str), Ok(b_str)) = + let cmp = if let (Some(a_str), Some(b_str)) = ( + a_line_data.utf8_cache.get(a.index).and_then(|cached| *cached), + b_line_data.utf8_cache.get(b.index).and_then(|cached| *cached), + ) { + a_str.cmp(b_str) + } else if let (Ok(a_str), Ok(b_str)) = (std::str::from_utf8(a.line), std::str::from_utf8(b.line)) { a_str.cmp(b_str) From 90ac61a580265bc66cb027cdd6e01b82c2e675f1 Mon Sep 17 00:00:00 2001 From: mattsu Date: Mon, 17 Nov 2025 08:39:16 +0900 Subject: [PATCH 3/7] refactor(sort): improve readability of method chaining in compare_by function Format long chain method calls over multiple lines for better code clarity and maintainability. --- src/uu/sort/src/sort.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index e77dd33ffaa..04fae48267c 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -1739,8 +1739,14 @@ fn compare_by<'a>( ) -> Ordering { if global_settings.precomputed.fast_lexicographic { let cmp = if let (Some(a_str), Some(b_str)) = ( - a_line_data.utf8_cache.get(a.index).and_then(|cached| *cached), - b_line_data.utf8_cache.get(b.index).and_then(|cached| *cached), + a_line_data + .utf8_cache + .get(a.index) + .and_then(|cached| *cached), + b_line_data + .utf8_cache + .get(b.index) + .and_then(|cached| *cached), ) { a_str.cmp(b_str) } else if let (Ok(a_str), Ok(b_str)) = From 17e93de4c8690f7836e2ec5b145a4a510c5e2153 Mon Sep 17 00:00:00 2001 From: mattsu Date: Mon, 17 Nov 2025 09:22:39 +0900 Subject: [PATCH 4/7] refactor(sort): optimize filtered lines storage to reduce memory allocations Refactored LineData to use a single Vec for filtered_lines_data and a Vec<(usize, usize)> for ranges, instead of Vec>. Updated build_filtered_line to append_filtered_line_to for appending. This reduces per-line allocations and improves memory efficiency in sorting operations. --- src/uu/sort/src/chunks.rs | 16 +++++++++++++--- src/uu/sort/src/custom_str_cmp.rs | 12 +++++++----- src/uu/sort/src/sort.rs | 13 ++++++++----- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index 71e352dcba4..da99b4f9228 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -44,7 +44,15 @@ pub struct LineData<'a> { pub parsed_floats: Vec, pub line_num_floats: Vec>, pub utf8_cache: Vec>, - pub filtered_lines: Vec>, + pub filtered_lines_data: Vec, + pub filtered_line_ranges: Vec<(usize, usize)>, +} + +impl<'a> LineData<'a> { + pub fn filtered_line(&self, index: usize) -> &[u8] { + let (start, len) = self.filtered_line_ranges[index]; + &self.filtered_lines_data[start..start + len] + } } impl Chunk { @@ -57,7 +65,8 @@ impl Chunk { contents.line_data.parsed_floats.clear(); contents.line_data.line_num_floats.clear(); contents.line_data.utf8_cache.clear(); - contents.line_data.filtered_lines.clear(); + contents.line_data.filtered_lines_data.clear(); + contents.line_data.filtered_line_ranges.clear(); let lines = unsafe { // SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime, // because the vector is empty. @@ -197,7 +206,8 @@ pub fn read( parsed_floats, line_num_floats, utf8_cache: Vec::new(), - filtered_lines: Vec::new(), + filtered_lines_data: Vec::new(), + filtered_line_ranges: Vec::new(), }; parse_lines(read, &mut lines, &mut line_data, separator, settings); Ok(ChunkContents { lines, line_data }) diff --git a/src/uu/sort/src/custom_str_cmp.rs b/src/uu/sort/src/custom_str_cmp.rs index 6241a817791..be50757a5da 100644 --- a/src/uu/sort/src/custom_str_cmp.rs +++ b/src/uu/sort/src/custom_str_cmp.rs @@ -61,22 +61,24 @@ pub fn custom_str_cmp( } } -pub fn build_filtered_line( +pub fn append_filtered_line_to( line: &[u8], ignore_non_printing: bool, ignore_non_dictionary: bool, ignore_case: bool, -) -> Vec { - let mut filtered = Vec::with_capacity(line.len()); + output: &mut Vec, +) -> usize { + let start = output.len(); + output.reserve(line.len()); for &c in line { if !filter_char(c, ignore_non_printing, ignore_non_dictionary) { continue; } - filtered.push(if ignore_case { + output.push(if ignore_case { c.to_ascii_uppercase() } else { c }); } - filtered + output.len() - start } diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 04fae48267c..55e2dffd82e 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -22,7 +22,7 @@ use bigdecimal::BigDecimal; use chunks::LineData; use clap::builder::ValueParser; use clap::{Arg, ArgAction, Command}; -use custom_str_cmp::{build_filtered_line, custom_str_cmp}; +use custom_str_cmp::{append_filtered_line_to, custom_str_cmp}; use ext_sort::ext_sort; use fnv::FnvHasher; #[cfg(target_os = "linux")] @@ -562,12 +562,15 @@ impl<'a> Line<'a> { } } if settings.precomputed.needs_filtered_view { - line_data.filtered_lines.push(build_filtered_line( + let start = line_data.filtered_lines_data.len(); + let len = append_filtered_line_to( line, settings.ignore_non_printing, settings.dictionary_order, settings.ignore_case, - )); + &mut line_data.filtered_lines_data, + ); + line_data.filtered_line_ranges.push((start, len)); } if settings.precomputed.fast_lexicographic { line_data.utf8_cache.push(std::str::from_utf8(line).ok()); @@ -1775,8 +1778,8 @@ fn compare_by<'a>( } if global_settings.precomputed.needs_filtered_view { - let a_filtered = &a_line_data.filtered_lines[a.index]; - let b_filtered = &b_line_data.filtered_lines[b.index]; + let a_filtered = a_line_data.filtered_line(a.index); + let b_filtered = b_line_data.filtered_line(b.index); let cmp = a_filtered.cmp(b_filtered); if cmp != Ordering::Equal || a.line == b.line { return if global_settings.reverse { From b2456850a221104c4b856c45dacd33f552fa589f Mon Sep 17 00:00:00 2001 From: mattsu Date: Mon, 17 Nov 2025 09:58:57 +0900 Subject: [PATCH 5/7] refactor(sort): replace named lifetime with anonymous one in LineData impl Use '_' instead of 'a' for the lifetime parameter in the impl block to simplify and modernize the code without changing behavior. --- src/uu/sort/src/chunks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index da99b4f9228..bcd4a7f14d6 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -48,7 +48,7 @@ pub struct LineData<'a> { pub filtered_line_ranges: Vec<(usize, usize)>, } -impl<'a> LineData<'a> { +impl LineData<'_> { pub fn filtered_line(&self, index: usize) -> &[u8] { let (start, len) = self.filtered_line_ranges[index]; &self.filtered_lines_data[start..start + len] From e54fe8267e38a61a0b9bae9e99b7636f233e2e09 Mon Sep 17 00:00:00 2001 From: mattsu Date: Tue, 2 Dec 2025 20:38:37 +0900 Subject: [PATCH 6/7] refactor(sort): simplify lexicographic comparison by removing UTF-8 cache Remove the unused utf8_cache field from LineData struct and related code for caching UTF-8 strings during line parsing. This simplifies the lexicographic comparison logic in compare_by to always perform byte-level comparison, reducing code complexity and potential maintenance overhead without affecting sorting functionality. The previous cache was intended for faster lexical sorting but is no longer needed. --- src/uu/sort/src/chunks.rs | 3 --- src/uu/sort/src/sort.rs | 24 +----------------------- 2 files changed, 1 insertion(+), 26 deletions(-) diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index bcd4a7f14d6..37f47c35d00 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -43,7 +43,6 @@ pub struct LineData<'a> { pub num_infos: Vec, pub parsed_floats: Vec, pub line_num_floats: Vec>, - pub utf8_cache: Vec>, pub filtered_lines_data: Vec, pub filtered_line_ranges: Vec<(usize, usize)>, } @@ -64,7 +63,6 @@ impl Chunk { contents.line_data.num_infos.clear(); contents.line_data.parsed_floats.clear(); contents.line_data.line_num_floats.clear(); - contents.line_data.utf8_cache.clear(); contents.line_data.filtered_lines_data.clear(); contents.line_data.filtered_line_ranges.clear(); let lines = unsafe { @@ -205,7 +203,6 @@ pub fn read( num_infos, parsed_floats, line_num_floats, - utf8_cache: Vec::new(), filtered_lines_data: Vec::new(), filtered_line_ranges: Vec::new(), }; diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 55e2dffd82e..78cda508616 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -572,11 +572,6 @@ impl<'a> Line<'a> { ); line_data.filtered_line_ranges.push((start, len)); } - if settings.precomputed.fast_lexicographic { - line_data.utf8_cache.push(std::str::from_utf8(line).ok()); - } else { - line_data.utf8_cache.push(None); - } Self { line, index } } @@ -1741,24 +1736,7 @@ fn compare_by<'a>( b_line_data: &LineData<'a>, ) -> Ordering { if global_settings.precomputed.fast_lexicographic { - let cmp = if let (Some(a_str), Some(b_str)) = ( - a_line_data - .utf8_cache - .get(a.index) - .and_then(|cached| *cached), - b_line_data - .utf8_cache - .get(b.index) - .and_then(|cached| *cached), - ) { - a_str.cmp(b_str) - } else if let (Ok(a_str), Ok(b_str)) = - (std::str::from_utf8(a.line), std::str::from_utf8(b.line)) - { - a_str.cmp(b_str) - } else { - b.line.cmp(a.line) - }; + let cmp = a.line.cmp(b.line); return if global_settings.reverse { cmp.reverse() } else { From d8c228b6469e7bd8cd19f9b0b15129b6f41c48dd Mon Sep 17 00:00:00 2001 From: mattsu Date: Wed, 24 Dec 2025 19:59:17 +0900 Subject: [PATCH 7/7] feat(sort): add key filtering and dynamic pipeline tuning --- src/uu/sort/src/chunks.rs | 13 ++++++ src/uu/sort/src/custom_str_cmp.rs | 22 ++++++++++ src/uu/sort/src/ext_sort.rs | 71 +++++++++++++++++++++---------- src/uu/sort/src/sort.rs | 66 +++++++++++++++++++++++++++- 4 files changed, 148 insertions(+), 24 deletions(-) diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index 837cb1fa95c..37f47c35d00 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -43,6 +43,15 @@ pub struct LineData<'a> { pub num_infos: Vec, pub parsed_floats: Vec, pub line_num_floats: Vec>, + pub filtered_lines_data: Vec, + pub filtered_line_ranges: Vec<(usize, usize)>, +} + +impl LineData<'_> { + pub fn filtered_line(&self, index: usize) -> &[u8] { + let (start, len) = self.filtered_line_ranges[index]; + &self.filtered_lines_data[start..start + len] + } } impl Chunk { @@ -54,6 +63,8 @@ impl Chunk { contents.line_data.num_infos.clear(); contents.line_data.parsed_floats.clear(); contents.line_data.line_num_floats.clear(); + contents.line_data.filtered_lines_data.clear(); + contents.line_data.filtered_line_ranges.clear(); let lines = unsafe { // SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime, // because the vector is empty. @@ -192,6 +203,8 @@ pub fn read( num_infos, parsed_floats, line_num_floats, + filtered_lines_data: Vec::new(), + filtered_line_ranges: Vec::new(), }; parse_lines(read, &mut lines, &mut line_data, separator, settings); Ok(ChunkContents { lines, line_data }) diff --git a/src/uu/sort/src/custom_str_cmp.rs b/src/uu/sort/src/custom_str_cmp.rs index aa4f73ea7bb..be50757a5da 100644 --- a/src/uu/sort/src/custom_str_cmp.rs +++ b/src/uu/sort/src/custom_str_cmp.rs @@ -60,3 +60,25 @@ pub fn custom_str_cmp( } } } + +pub fn append_filtered_line_to( + line: &[u8], + ignore_non_printing: bool, + ignore_non_dictionary: bool, + ignore_case: bool, + output: &mut Vec, +) -> usize { + let start = output.len(); + output.reserve(line.len()); + for &c in line { + if !filter_char(c, ignore_non_printing, ignore_non_dictionary) { + continue; + } + output.push(if ignore_case { + c.to_ascii_uppercase() + } else { + c + }); + } + output.len() - start +} diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index d61f7d2008d..59e3c8c66ca 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -39,6 +39,42 @@ use crate::{Line, print_sorted}; // Note: update `test_sort::test_start_buffer` if this size is changed const START_BUFFER_SIZE: usize = 8_000; +const BUFFER_CAP_LIMIT: usize = 512 * 1024 * 1024; +const BUFFER_MIN_WITHOUT_USER: usize = 8 * 1024 * 1024; +const PIPELINE_DEPTH_CHUNK: usize = 4 * 1024 * 1024; +const MIN_DYNAMIC_PIPELINE_DEPTH: usize = 2; +const MAX_DYNAMIC_PIPELINE_DEPTH: usize = 8; + +struct ReaderWriterConfig { + buffer_size: usize, +} + +fn normalized_buffer_size(settings: &GlobalSettings) -> usize { + let mut buffer_size = if settings.buffer_size <= BUFFER_CAP_LIMIT { + settings.buffer_size + } else { + settings.buffer_size / 2 + }; + if !settings.buffer_size_is_explicit { + buffer_size = buffer_size.max(BUFFER_MIN_WITHOUT_USER); + } + buffer_size +} + +fn tuned_pipeline_depth(settings: &GlobalSettings, buffer_size: usize) -> usize { + let base = settings + .pipeline_depth + .clamp(MIN_DYNAMIC_PIPELINE_DEPTH, MAX_DYNAMIC_PIPELINE_DEPTH); + if settings.pipeline_depth_is_explicit { + base + } else { + let size_based = buffer_size + .div_ceil(PIPELINE_DEPTH_CHUNK) + .clamp(MIN_DYNAMIC_PIPELINE_DEPTH, MAX_DYNAMIC_PIPELINE_DEPTH); + base.max(size_based).min(MAX_DYNAMIC_PIPELINE_DEPTH) + } +} + /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. pub fn ext_sort( files: &mut impl Iterator>>, @@ -46,8 +82,11 @@ pub fn ext_sort( output: Output, tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { - let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); - let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); + let buffer_size = normalized_buffer_size(settings); + let pipeline_depth = tuned_pipeline_depth(settings, buffer_size); + let config = ReaderWriterConfig { buffer_size }; + let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(pipeline_depth); + let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(pipeline_depth); thread::spawn({ let settings = settings.clone(); move || sorter(&recycled_receiver, &sorted_sender, &settings) @@ -87,6 +126,7 @@ pub fn ext_sort( recycled_sender, output, tmp_dir, + &config, ) } else { reader_writer::<_, WriteablePlainTmpFile>( @@ -96,6 +136,7 @@ pub fn ext_sort( recycled_sender, output, tmp_dir, + &config, ) } } @@ -110,26 +151,12 @@ fn reader_writer< sender: SyncSender, output: Output, tmp_dir: &mut TmpDirWrapper, + config: &ReaderWriterConfig, ) -> UResult<()> { let separator = settings.line_ending.into(); - // Cap oversized buffer requests to avoid unnecessary allocations and give the automatic - // heuristic room to grow when the user does not provide an explicit value. - let mut buffer_size = match settings.buffer_size { - size if size <= 512 * 1024 * 1024 => size, - size => size / 2, - }; - if !settings.buffer_size_is_explicit { - buffer_size = buffer_size.max(8 * 1024 * 1024); - } let read_result: ReadResult = read_write_loop( - files, - tmp_dir, - separator, - buffer_size, - settings, - receiver, - sender, + files, tmp_dir, separator, config, settings, receiver, sender, )?; match read_result { ReadResult::WroteChunksToFile { tmp_files } => { @@ -214,7 +241,7 @@ fn read_write_loop( mut files: impl Iterator>>, tmp_dir: &mut TmpDirWrapper, separator: u8, - buffer_size: usize, + config: &ReaderWriterConfig, settings: &GlobalSettings, receiver: &Receiver, sender: SyncSender, @@ -226,12 +253,12 @@ fn read_write_loop( for _ in 0..2 { let should_continue = chunks::read( &sender, - RecycledChunk::new(if START_BUFFER_SIZE < buffer_size { + RecycledChunk::new(if START_BUFFER_SIZE < config.buffer_size { START_BUFFER_SIZE } else { - buffer_size + config.buffer_size }), - Some(buffer_size), + Some(config.buffer_size), &mut carry_over, &mut file, &mut files, diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 6122089e2f3..3c25df41915 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -22,7 +22,7 @@ use bigdecimal::BigDecimal; use chunks::LineData; use clap::builder::ValueParser; use clap::{Arg, ArgAction, Command}; -use custom_str_cmp::custom_str_cmp; +use custom_str_cmp::{append_filtered_line_to, custom_str_cmp}; use ext_sort::ext_sort; use fnv::FnvHasher; use numeric_str_cmp::{NumInfo, NumInfoParseSettings, human_numeric_str_cmp, numeric_str_cmp}; @@ -39,6 +39,7 @@ use std::ops::Range; use std::path::Path; use std::path::PathBuf; use std::str::Utf8Error; +use std::thread; use thiserror::Error; use uucore::display::Quotable; use uucore::error::{FromIo, strip_errno}; @@ -290,6 +291,8 @@ pub struct GlobalSettings { compress_prog: Option, merge_batch_size: usize, precomputed: Precomputed, + pipeline_depth: usize, + pipeline_depth_is_explicit: bool, } /// Data needed for sorting. Should be computed once before starting to sort @@ -302,6 +305,7 @@ struct Precomputed { selections_per_line: usize, fast_lexicographic: bool, fast_ascii_insensitive: bool, + needs_filtered_view: bool, } impl GlobalSettings { @@ -345,6 +349,7 @@ impl GlobalSettings { self.precomputed.fast_lexicographic = self.can_use_fast_lexicographic(); self.precomputed.fast_ascii_insensitive = self.can_use_fast_ascii_insensitive(); + self.precomputed.needs_filtered_view = self.can_use_filtered_view(); } /// Returns true when the fast lexicographic path can be used safely. @@ -384,6 +389,12 @@ impl GlobalSettings { && !selector.settings.ignore_blanks } } + + fn can_use_filtered_view(&self) -> bool { + self.mode == SortMode::Default + && self.selectors.is_empty() + && (self.ignore_case || self.dictionary_order || self.ignore_non_printing) + } } impl Default for GlobalSettings { @@ -411,6 +422,8 @@ impl Default for GlobalSettings { compress_prog: None, merge_batch_size: default_merge_batch_size(), precomputed: Precomputed::default(), + pipeline_depth: default_pipeline_depth(), + pipeline_depth_is_explicit: false, } } } @@ -547,6 +560,17 @@ impl<'a> Line<'a> { } } } + if settings.precomputed.needs_filtered_view { + let start = line_data.filtered_lines_data.len(); + let len = append_filtered_line_to( + line, + settings.ignore_non_printing, + settings.dictionary_order, + settings.ignore_case, + &mut line_data.filtered_lines_data, + ); + line_data.filtered_line_ranges.push((start, len)); + } Self { line, index } } @@ -1261,6 +1285,26 @@ fn default_merge_batch_size() -> usize { } } +const MIN_PIPELINE_DEPTH: usize = 2; +const MAX_PIPELINE_DEPTH: usize = 8; + +fn default_pipeline_depth() -> usize { + thread::available_parallelism() + .map(|n| n.get().clamp(MIN_PIPELINE_DEPTH, MAX_PIPELINE_DEPTH)) + .unwrap_or(MIN_PIPELINE_DEPTH) +} + +fn pipeline_depth_from_parallel_arg(arg: Option<&String>) -> usize { + if let Some(raw) = arg { + if let Ok(parsed) = raw.parse::() { + if parsed > 0 { + return parsed.clamp(MIN_PIPELINE_DEPTH, MAX_PIPELINE_DEPTH); + } + } + } + default_pipeline_depth() +} + #[uucore::main] #[allow(clippy::cognitive_complexity)] pub fn uumain(args: impl uucore::Args) -> UResult<()> { @@ -1375,7 +1419,8 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { settings.dictionary_order = matches.get_flag(options::DICTIONARY_ORDER); settings.ignore_non_printing = matches.get_flag(options::IGNORE_NONPRINTING); - if matches.contains_id(options::PARALLEL) { + let parallel_override = matches.contains_id(options::PARALLEL); + if parallel_override { // "0" is default - threads = num of cores settings.threads = matches .get_one::(options::PARALLEL) @@ -1391,6 +1436,10 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { .build_global(); } + settings.pipeline_depth = + pipeline_depth_from_parallel_arg(matches.get_one::(options::PARALLEL)); + settings.pipeline_depth_is_explicit = parallel_override; + if let Some(size_str) = matches.get_one::(options::BUF_SIZE) { settings.buffer_size = GlobalSettings::parse_byte_count(size_str).map_err(|e| { USimpleError::new(2, format_error_message(&e, size_str, options::BUF_SIZE)) @@ -1874,6 +1923,19 @@ fn compare_by<'a>( } } + if global_settings.precomputed.needs_filtered_view { + let a_filtered = a_line_data.filtered_line(a.index); + let b_filtered = b_line_data.filtered_line(b.index); + let cmp = a_filtered.cmp(b_filtered); + if cmp != Ordering::Equal || a.line == b.line { + return if global_settings.reverse { + cmp.reverse() + } else { + cmp + }; + } + } + let mut selection_index = 0; let mut num_info_index = 0; let mut parsed_float_index = 0;