Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/uu/sort/src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ pub struct LineData<'a> {
pub num_infos: Vec<NumInfo>,
pub parsed_floats: Vec<GeneralBigDecimalParseResult>,
pub line_num_floats: Vec<Option<f64>>,
pub filtered_lines_data: Vec<u8>,
pub filtered_line_ranges: Vec<(usize, usize)>,
}

impl LineData<'_> {
pub fn filtered_line(&self, index: usize) -> &[u8] {
let (start, len) = self.filtered_line_ranges[index];
&self.filtered_lines_data[start..start + len]
}
}

impl Chunk {
Expand All @@ -54,6 +63,8 @@ impl Chunk {
contents.line_data.num_infos.clear();
contents.line_data.parsed_floats.clear();
contents.line_data.line_num_floats.clear();
contents.line_data.filtered_lines_data.clear();
contents.line_data.filtered_line_ranges.clear();
let lines = unsafe {
// SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime,
// because the vector is empty.
Expand Down Expand Up @@ -192,6 +203,8 @@ pub fn read<T: Read>(
num_infos,
parsed_floats,
line_num_floats,
filtered_lines_data: Vec::new(),
filtered_line_ranges: Vec::new(),
};
parse_lines(read, &mut lines, &mut line_data, separator, settings);
Ok(ChunkContents { lines, line_data })
Expand Down
22 changes: 22 additions & 0 deletions src/uu/sort/src/custom_str_cmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,25 @@ pub fn custom_str_cmp(
}
}
}

pub fn append_filtered_line_to(
line: &[u8],
ignore_non_printing: bool,
ignore_non_dictionary: bool,
ignore_case: bool,
output: &mut Vec<u8>,
) -> usize {
let start = output.len();
output.reserve(line.len());
for &c in line {
if !filter_char(c, ignore_non_printing, ignore_non_dictionary) {
continue;
}
output.push(if ignore_case {
c.to_ascii_uppercase()
} else {
c
});
}
output.len() - start
}
71 changes: 49 additions & 22 deletions src/uu/sort/src/ext_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,54 @@ use crate::{Line, print_sorted};
// Note: update `test_sort::test_start_buffer` if this size is changed
const START_BUFFER_SIZE: usize = 8_000;

const BUFFER_CAP_LIMIT: usize = 512 * 1024 * 1024;
const BUFFER_MIN_WITHOUT_USER: usize = 8 * 1024 * 1024;
const PIPELINE_DEPTH_CHUNK: usize = 4 * 1024 * 1024;
const MIN_DYNAMIC_PIPELINE_DEPTH: usize = 2;
const MAX_DYNAMIC_PIPELINE_DEPTH: usize = 8;

struct ReaderWriterConfig {
buffer_size: usize,
}

fn normalized_buffer_size(settings: &GlobalSettings) -> usize {
let mut buffer_size = if settings.buffer_size <= BUFFER_CAP_LIMIT {
settings.buffer_size
} else {
settings.buffer_size / 2
};
if !settings.buffer_size_is_explicit {
buffer_size = buffer_size.max(BUFFER_MIN_WITHOUT_USER);
}
buffer_size
}

fn tuned_pipeline_depth(settings: &GlobalSettings, buffer_size: usize) -> usize {
let base = settings
.pipeline_depth
.clamp(MIN_DYNAMIC_PIPELINE_DEPTH, MAX_DYNAMIC_PIPELINE_DEPTH);
if settings.pipeline_depth_is_explicit {
base
} else {
let size_based = buffer_size
.div_ceil(PIPELINE_DEPTH_CHUNK)
.clamp(MIN_DYNAMIC_PIPELINE_DEPTH, MAX_DYNAMIC_PIPELINE_DEPTH);
base.max(size_based).min(MAX_DYNAMIC_PIPELINE_DEPTH)
}
}

/// Sort files by using auxiliary files for storing intermediate chunks (if needed), and output the result.
pub fn ext_sort(
files: &mut impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
let buffer_size = normalized_buffer_size(settings);
let pipeline_depth = tuned_pipeline_depth(settings, buffer_size);
let config = ReaderWriterConfig { buffer_size };
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(pipeline_depth);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(pipeline_depth);
thread::spawn({
let settings = settings.clone();
move || sorter(&recycled_receiver, &sorted_sender, &settings)
Expand Down Expand Up @@ -87,6 +126,7 @@ pub fn ext_sort(
recycled_sender,
output,
tmp_dir,
&config,
)
} else {
reader_writer::<_, WriteablePlainTmpFile>(
Expand All @@ -96,6 +136,7 @@ pub fn ext_sort(
recycled_sender,
output,
tmp_dir,
&config,
)
}
}
Expand All @@ -110,26 +151,12 @@ fn reader_writer<
sender: SyncSender<Chunk>,
output: Output,
tmp_dir: &mut TmpDirWrapper,
config: &ReaderWriterConfig,
) -> UResult<()> {
let separator = settings.line_ending.into();

// Cap oversized buffer requests to avoid unnecessary allocations and give the automatic
// heuristic room to grow when the user does not provide an explicit value.
let mut buffer_size = match settings.buffer_size {
size if size <= 512 * 1024 * 1024 => size,
size => size / 2,
};
if !settings.buffer_size_is_explicit {
buffer_size = buffer_size.max(8 * 1024 * 1024);
}
let read_result: ReadResult<Tmp> = read_write_loop(
files,
tmp_dir,
separator,
buffer_size,
settings,
receiver,
sender,
files, tmp_dir, separator, config, settings, receiver, sender,
)?;
match read_result {
ReadResult::WroteChunksToFile { tmp_files } => {
Expand Down Expand Up @@ -214,7 +241,7 @@ fn read_write_loop<I: WriteableTmpFile>(
mut files: impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
tmp_dir: &mut TmpDirWrapper,
separator: u8,
buffer_size: usize,
config: &ReaderWriterConfig,
settings: &GlobalSettings,
receiver: &Receiver<Chunk>,
sender: SyncSender<Chunk>,
Expand All @@ -226,12 +253,12 @@ fn read_write_loop<I: WriteableTmpFile>(
for _ in 0..2 {
let should_continue = chunks::read(
&sender,
RecycledChunk::new(if START_BUFFER_SIZE < buffer_size {
RecycledChunk::new(if START_BUFFER_SIZE < config.buffer_size {
START_BUFFER_SIZE
} else {
buffer_size
config.buffer_size
}),
Some(buffer_size),
Some(config.buffer_size),
&mut carry_over,
&mut file,
&mut files,
Expand Down
66 changes: 64 additions & 2 deletions src/uu/sort/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use bigdecimal::BigDecimal;
use chunks::LineData;
use clap::builder::ValueParser;
use clap::{Arg, ArgAction, Command};
use custom_str_cmp::custom_str_cmp;
use custom_str_cmp::{append_filtered_line_to, custom_str_cmp};
use ext_sort::ext_sort;
use fnv::FnvHasher;
use numeric_str_cmp::{NumInfo, NumInfoParseSettings, human_numeric_str_cmp, numeric_str_cmp};
Expand All @@ -39,6 +39,7 @@ use std::ops::Range;
use std::path::Path;
use std::path::PathBuf;
use std::str::Utf8Error;
use std::thread;
use thiserror::Error;
use uucore::display::Quotable;
use uucore::error::{FromIo, strip_errno};
Expand Down Expand Up @@ -290,6 +291,8 @@ pub struct GlobalSettings {
compress_prog: Option<String>,
merge_batch_size: usize,
precomputed: Precomputed,
pipeline_depth: usize,
pipeline_depth_is_explicit: bool,
}

/// Data needed for sorting. Should be computed once before starting to sort
Expand All @@ -302,6 +305,7 @@ struct Precomputed {
selections_per_line: usize,
fast_lexicographic: bool,
fast_ascii_insensitive: bool,
needs_filtered_view: bool,
}

impl GlobalSettings {
Expand Down Expand Up @@ -345,6 +349,7 @@ impl GlobalSettings {

self.precomputed.fast_lexicographic = self.can_use_fast_lexicographic();
self.precomputed.fast_ascii_insensitive = self.can_use_fast_ascii_insensitive();
self.precomputed.needs_filtered_view = self.can_use_filtered_view();
}

/// Returns true when the fast lexicographic path can be used safely.
Expand Down Expand Up @@ -384,6 +389,12 @@ impl GlobalSettings {
&& !selector.settings.ignore_blanks
}
}

fn can_use_filtered_view(&self) -> bool {
self.mode == SortMode::Default
&& self.selectors.is_empty()
&& (self.ignore_case || self.dictionary_order || self.ignore_non_printing)
}
}

impl Default for GlobalSettings {
Expand Down Expand Up @@ -411,6 +422,8 @@ impl Default for GlobalSettings {
compress_prog: None,
merge_batch_size: default_merge_batch_size(),
precomputed: Precomputed::default(),
pipeline_depth: default_pipeline_depth(),
pipeline_depth_is_explicit: false,
}
}
}
Expand Down Expand Up @@ -547,6 +560,17 @@ impl<'a> Line<'a> {
}
}
}
if settings.precomputed.needs_filtered_view {
let start = line_data.filtered_lines_data.len();
let len = append_filtered_line_to(
line,
settings.ignore_non_printing,
settings.dictionary_order,
settings.ignore_case,
&mut line_data.filtered_lines_data,
);
line_data.filtered_line_ranges.push((start, len));
}
Self { line, index }
}

Expand Down Expand Up @@ -1261,6 +1285,26 @@ fn default_merge_batch_size() -> usize {
}
}

const MIN_PIPELINE_DEPTH: usize = 2;
const MAX_PIPELINE_DEPTH: usize = 8;

fn default_pipeline_depth() -> usize {
thread::available_parallelism()
.map(|n| n.get().clamp(MIN_PIPELINE_DEPTH, MAX_PIPELINE_DEPTH))
.unwrap_or(MIN_PIPELINE_DEPTH)
}

fn pipeline_depth_from_parallel_arg(arg: Option<&String>) -> usize {
if let Some(raw) = arg {
if let Ok(parsed) = raw.parse::<usize>() {
if parsed > 0 {
return parsed.clamp(MIN_PIPELINE_DEPTH, MAX_PIPELINE_DEPTH);
}
}
}
default_pipeline_depth()
}

#[uucore::main]
#[allow(clippy::cognitive_complexity)]
pub fn uumain(args: impl uucore::Args) -> UResult<()> {
Expand Down Expand Up @@ -1375,7 +1419,8 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {

settings.dictionary_order = matches.get_flag(options::DICTIONARY_ORDER);
settings.ignore_non_printing = matches.get_flag(options::IGNORE_NONPRINTING);
if matches.contains_id(options::PARALLEL) {
let parallel_override = matches.contains_id(options::PARALLEL);
if parallel_override {
// "0" is default - threads = num of cores
settings.threads = matches
.get_one::<String>(options::PARALLEL)
Expand All @@ -1391,6 +1436,10 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
.build_global();
}

settings.pipeline_depth =
pipeline_depth_from_parallel_arg(matches.get_one::<String>(options::PARALLEL));
settings.pipeline_depth_is_explicit = parallel_override;

if let Some(size_str) = matches.get_one::<String>(options::BUF_SIZE) {
settings.buffer_size = GlobalSettings::parse_byte_count(size_str).map_err(|e| {
USimpleError::new(2, format_error_message(&e, size_str, options::BUF_SIZE))
Expand Down Expand Up @@ -1874,6 +1923,19 @@ fn compare_by<'a>(
}
}

if global_settings.precomputed.needs_filtered_view {
let a_filtered = a_line_data.filtered_line(a.index);
let b_filtered = b_line_data.filtered_line(b.index);
let cmp = a_filtered.cmp(b_filtered);
if cmp != Ordering::Equal || a.line == b.line {
return if global_settings.reverse {
cmp.reverse()
} else {
cmp
};
}
}

let mut selection_index = 0;
let mut num_info_index = 0;
let mut parsed_float_index = 0;
Expand Down
Loading