diff --git a/src/uu/sort/src/chunks.rs b/src/uu/sort/src/chunks.rs index 837cb1fa95c..37f47c35d00 100644 --- a/src/uu/sort/src/chunks.rs +++ b/src/uu/sort/src/chunks.rs @@ -43,6 +43,15 @@ pub struct LineData<'a> { pub num_infos: Vec, pub parsed_floats: Vec, pub line_num_floats: Vec>, + pub filtered_lines_data: Vec, + pub filtered_line_ranges: Vec<(usize, usize)>, +} + +impl LineData<'_> { + pub fn filtered_line(&self, index: usize) -> &[u8] { + let (start, len) = self.filtered_line_ranges[index]; + &self.filtered_lines_data[start..start + len] + } } impl Chunk { @@ -54,6 +63,8 @@ impl Chunk { contents.line_data.num_infos.clear(); contents.line_data.parsed_floats.clear(); contents.line_data.line_num_floats.clear(); + contents.line_data.filtered_lines_data.clear(); + contents.line_data.filtered_line_ranges.clear(); let lines = unsafe { // SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime, // because the vector is empty. @@ -192,6 +203,8 @@ pub fn read( num_infos, parsed_floats, line_num_floats, + filtered_lines_data: Vec::new(), + filtered_line_ranges: Vec::new(), }; parse_lines(read, &mut lines, &mut line_data, separator, settings); Ok(ChunkContents { lines, line_data }) diff --git a/src/uu/sort/src/custom_str_cmp.rs b/src/uu/sort/src/custom_str_cmp.rs index aa4f73ea7bb..be50757a5da 100644 --- a/src/uu/sort/src/custom_str_cmp.rs +++ b/src/uu/sort/src/custom_str_cmp.rs @@ -60,3 +60,25 @@ pub fn custom_str_cmp( } } } + +pub fn append_filtered_line_to( + line: &[u8], + ignore_non_printing: bool, + ignore_non_dictionary: bool, + ignore_case: bool, + output: &mut Vec, +) -> usize { + let start = output.len(); + output.reserve(line.len()); + for &c in line { + if !filter_char(c, ignore_non_printing, ignore_non_dictionary) { + continue; + } + output.push(if ignore_case { + c.to_ascii_uppercase() + } else { + c + }); + } + output.len() - start +} diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index d61f7d2008d..59e3c8c66ca 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -39,6 +39,42 @@ use crate::{Line, print_sorted}; // Note: update `test_sort::test_start_buffer` if this size is changed const START_BUFFER_SIZE: usize = 8_000; +const BUFFER_CAP_LIMIT: usize = 512 * 1024 * 1024; +const BUFFER_MIN_WITHOUT_USER: usize = 8 * 1024 * 1024; +const PIPELINE_DEPTH_CHUNK: usize = 4 * 1024 * 1024; +const MIN_DYNAMIC_PIPELINE_DEPTH: usize = 2; +const MAX_DYNAMIC_PIPELINE_DEPTH: usize = 8; + +struct ReaderWriterConfig { + buffer_size: usize, +} + +fn normalized_buffer_size(settings: &GlobalSettings) -> usize { + let mut buffer_size = if settings.buffer_size <= BUFFER_CAP_LIMIT { + settings.buffer_size + } else { + settings.buffer_size / 2 + }; + if !settings.buffer_size_is_explicit { + buffer_size = buffer_size.max(BUFFER_MIN_WITHOUT_USER); + } + buffer_size +} + +fn tuned_pipeline_depth(settings: &GlobalSettings, buffer_size: usize) -> usize { + let base = settings + .pipeline_depth + .clamp(MIN_DYNAMIC_PIPELINE_DEPTH, MAX_DYNAMIC_PIPELINE_DEPTH); + if settings.pipeline_depth_is_explicit { + base + } else { + let size_based = buffer_size + .div_ceil(PIPELINE_DEPTH_CHUNK) + .clamp(MIN_DYNAMIC_PIPELINE_DEPTH, MAX_DYNAMIC_PIPELINE_DEPTH); + base.max(size_based).min(MAX_DYNAMIC_PIPELINE_DEPTH) + } +} + /// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result. pub fn ext_sort( files: &mut impl Iterator>>, @@ -46,8 +82,11 @@ pub fn ext_sort( output: Output, tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { - let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1); - let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1); + let buffer_size = normalized_buffer_size(settings); + let pipeline_depth = tuned_pipeline_depth(settings, buffer_size); + let config = ReaderWriterConfig { buffer_size }; + let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(pipeline_depth); + let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(pipeline_depth); thread::spawn({ let settings = settings.clone(); move || sorter(&recycled_receiver, &sorted_sender, &settings) @@ -87,6 +126,7 @@ pub fn ext_sort( recycled_sender, output, tmp_dir, + &config, ) } else { reader_writer::<_, WriteablePlainTmpFile>( @@ -96,6 +136,7 @@ pub fn ext_sort( recycled_sender, output, tmp_dir, + &config, ) } } @@ -110,26 +151,12 @@ fn reader_writer< sender: SyncSender, output: Output, tmp_dir: &mut TmpDirWrapper, + config: &ReaderWriterConfig, ) -> UResult<()> { let separator = settings.line_ending.into(); - // Cap oversized buffer requests to avoid unnecessary allocations and give the automatic - // heuristic room to grow when the user does not provide an explicit value. - let mut buffer_size = match settings.buffer_size { - size if size <= 512 * 1024 * 1024 => size, - size => size / 2, - }; - if !settings.buffer_size_is_explicit { - buffer_size = buffer_size.max(8 * 1024 * 1024); - } let read_result: ReadResult = read_write_loop( - files, - tmp_dir, - separator, - buffer_size, - settings, - receiver, - sender, + files, tmp_dir, separator, config, settings, receiver, sender, )?; match read_result { ReadResult::WroteChunksToFile { tmp_files } => { @@ -214,7 +241,7 @@ fn read_write_loop( mut files: impl Iterator>>, tmp_dir: &mut TmpDirWrapper, separator: u8, - buffer_size: usize, + config: &ReaderWriterConfig, settings: &GlobalSettings, receiver: &Receiver, sender: SyncSender, @@ -226,12 +253,12 @@ fn read_write_loop( for _ in 0..2 { let should_continue = chunks::read( &sender, - RecycledChunk::new(if START_BUFFER_SIZE < buffer_size { + RecycledChunk::new(if START_BUFFER_SIZE < config.buffer_size { START_BUFFER_SIZE } else { - buffer_size + config.buffer_size }), - Some(buffer_size), + Some(config.buffer_size), &mut carry_over, &mut file, &mut files, diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 6122089e2f3..3c25df41915 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -22,7 +22,7 @@ use bigdecimal::BigDecimal; use chunks::LineData; use clap::builder::ValueParser; use clap::{Arg, ArgAction, Command}; -use custom_str_cmp::custom_str_cmp; +use custom_str_cmp::{append_filtered_line_to, custom_str_cmp}; use ext_sort::ext_sort; use fnv::FnvHasher; use numeric_str_cmp::{NumInfo, NumInfoParseSettings, human_numeric_str_cmp, numeric_str_cmp}; @@ -39,6 +39,7 @@ use std::ops::Range; use std::path::Path; use std::path::PathBuf; use std::str::Utf8Error; +use std::thread; use thiserror::Error; use uucore::display::Quotable; use uucore::error::{FromIo, strip_errno}; @@ -290,6 +291,8 @@ pub struct GlobalSettings { compress_prog: Option, merge_batch_size: usize, precomputed: Precomputed, + pipeline_depth: usize, + pipeline_depth_is_explicit: bool, } /// Data needed for sorting. Should be computed once before starting to sort @@ -302,6 +305,7 @@ struct Precomputed { selections_per_line: usize, fast_lexicographic: bool, fast_ascii_insensitive: bool, + needs_filtered_view: bool, } impl GlobalSettings { @@ -345,6 +349,7 @@ impl GlobalSettings { self.precomputed.fast_lexicographic = self.can_use_fast_lexicographic(); self.precomputed.fast_ascii_insensitive = self.can_use_fast_ascii_insensitive(); + self.precomputed.needs_filtered_view = self.can_use_filtered_view(); } /// Returns true when the fast lexicographic path can be used safely. @@ -384,6 +389,12 @@ impl GlobalSettings { && !selector.settings.ignore_blanks } } + + fn can_use_filtered_view(&self) -> bool { + self.mode == SortMode::Default + && self.selectors.is_empty() + && (self.ignore_case || self.dictionary_order || self.ignore_non_printing) + } } impl Default for GlobalSettings { @@ -411,6 +422,8 @@ impl Default for GlobalSettings { compress_prog: None, merge_batch_size: default_merge_batch_size(), precomputed: Precomputed::default(), + pipeline_depth: default_pipeline_depth(), + pipeline_depth_is_explicit: false, } } } @@ -547,6 +560,17 @@ impl<'a> Line<'a> { } } } + if settings.precomputed.needs_filtered_view { + let start = line_data.filtered_lines_data.len(); + let len = append_filtered_line_to( + line, + settings.ignore_non_printing, + settings.dictionary_order, + settings.ignore_case, + &mut line_data.filtered_lines_data, + ); + line_data.filtered_line_ranges.push((start, len)); + } Self { line, index } } @@ -1261,6 +1285,26 @@ fn default_merge_batch_size() -> usize { } } +const MIN_PIPELINE_DEPTH: usize = 2; +const MAX_PIPELINE_DEPTH: usize = 8; + +fn default_pipeline_depth() -> usize { + thread::available_parallelism() + .map(|n| n.get().clamp(MIN_PIPELINE_DEPTH, MAX_PIPELINE_DEPTH)) + .unwrap_or(MIN_PIPELINE_DEPTH) +} + +fn pipeline_depth_from_parallel_arg(arg: Option<&String>) -> usize { + if let Some(raw) = arg { + if let Ok(parsed) = raw.parse::() { + if parsed > 0 { + return parsed.clamp(MIN_PIPELINE_DEPTH, MAX_PIPELINE_DEPTH); + } + } + } + default_pipeline_depth() +} + #[uucore::main] #[allow(clippy::cognitive_complexity)] pub fn uumain(args: impl uucore::Args) -> UResult<()> { @@ -1375,7 +1419,8 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { settings.dictionary_order = matches.get_flag(options::DICTIONARY_ORDER); settings.ignore_non_printing = matches.get_flag(options::IGNORE_NONPRINTING); - if matches.contains_id(options::PARALLEL) { + let parallel_override = matches.contains_id(options::PARALLEL); + if parallel_override { // "0" is default - threads = num of cores settings.threads = matches .get_one::(options::PARALLEL) @@ -1391,6 +1436,10 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { .build_global(); } + settings.pipeline_depth = + pipeline_depth_from_parallel_arg(matches.get_one::(options::PARALLEL)); + settings.pipeline_depth_is_explicit = parallel_override; + if let Some(size_str) = matches.get_one::(options::BUF_SIZE) { settings.buffer_size = GlobalSettings::parse_byte_count(size_str).map_err(|e| { USimpleError::new(2, format_error_message(&e, size_str, options::BUF_SIZE)) @@ -1874,6 +1923,19 @@ fn compare_by<'a>( } } + if global_settings.precomputed.needs_filtered_view { + let a_filtered = a_line_data.filtered_line(a.index); + let b_filtered = b_line_data.filtered_line(b.index); + let cmp = a_filtered.cmp(b_filtered); + if cmp != Ordering::Equal || a.line == b.line { + return if global_settings.reverse { + cmp.reverse() + } else { + cmp + }; + } + } + let mut selection_index = 0; let mut num_info_index = 0; let mut parsed_float_index = 0;