Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ impl CanSplitDoBetter {

/// Searches multiple splits, potentially in multiple indexes, sitting on different storages and
/// having different doc mappings.
#[instrument(skip_all, fields(index = ?leaf_search_request.search_request.as_ref().unwrap().index_id_patterns))]
#[instrument(skip_all, fields(index = ?PrettySample::new(&leaf_search_request.search_request.as_ref().unwrap().index_id_patterns, 5)))]
pub async fn multi_index_leaf_search(
searcher_context: Arc<SearcherContext>,
leaf_search_request: LeafSearchRequest,
Expand Down
127 changes: 78 additions & 49 deletions quickwit/quickwit-search/src/metrics_trackers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,78 @@ use std::task::{Context, Poll, ready};
use std::time::Instant;

use pin_project::{pin_project, pinned_drop};
use quickwit_proto::search::LeafSearchResponse;
use quickwit_proto::search::{LeafSearchResponse, SearchResponse};

use crate::SearchError;
use crate::metrics::SEARCH_METRICS;

// root
// planning

pub enum RootSearchMetricsStep {
Plan,
Exec { num_targeted_splits: usize },
/// Wrapper around the plan future to tracks error/cancellation metrics.
/// Planning phase success isn't explicitely recorded as it can be deduced from
/// the search phase metrics.
#[pin_project(PinnedDrop)]
pub struct SearchPlanMetricsFuture<F> {
#[pin]
pub tracked: F,
pub start: Instant,
pub is_success: Option<bool>,
}

#[pinned_drop]
impl<F> PinnedDrop for SearchPlanMetricsFuture<F> {
fn drop(self: Pin<&mut Self>) {
let status = match self.is_success {
// this is a partial success, actual status will be recorded during the search step
Some(true) => return,
Some(false) => "plan-error",
None => "plan-cancelled",
};

let label_values = [status];
SEARCH_METRICS
.root_search_requests_total
.with_label_values(label_values)
.inc();
SEARCH_METRICS
.root_search_request_duration_seconds
.with_label_values(label_values)
.observe(self.start.elapsed().as_secs_f64());
}
}

/// Wrapper around the plan and search futures to track metrics.
impl<F, R> Future for SearchPlanMetricsFuture<F>
where F: Future<Output = crate::Result<R>>
{
type Output = crate::Result<R>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let response = ready!(this.tracked.poll(cx));
if let Err(err) = &response {
tracing::error!(?err, "root search planning failed");
}
*this.is_success = Some(response.is_ok());
Poll::Ready(Ok(response?))
}
}

// root search

/// Wrapper around the root search futures to track metrics.
#[pin_project(PinnedDrop)]
pub struct RootSearchMetricsFuture<F> {
#[pin]
pub tracked: F,
pub start: Instant,
pub step: RootSearchMetricsStep,
pub is_success: Option<bool>,
pub num_targeted_splits: usize,
pub status: Option<&'static str>,
}

#[pinned_drop]
impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
fn drop(self: Pin<&mut Self>) {
let (num_targeted_splits, status) = match (&self.step, self.is_success) {
// is is a partial success, actual success is recorded during the search step
(RootSearchMetricsStep::Plan, Some(true)) => return,
(RootSearchMetricsStep::Plan, Some(false)) => (0, "plan-error"),
(RootSearchMetricsStep::Plan, None) => (0, "plan-cancelled"),
(
RootSearchMetricsStep::Exec {
num_targeted_splits,
},
Some(true),
) => (*num_targeted_splits, "success"),
(
RootSearchMetricsStep::Exec {
num_targeted_splits,
},
Some(false),
) => (*num_targeted_splits, "error"),
(
RootSearchMetricsStep::Exec {
num_targeted_splits,
},
None,
) => (*num_targeted_splits, "cancelled"),
};

let status = self.status.unwrap_or("cancelled");
let label_values = [status];
SEARCH_METRICS
.root_search_requests_total
Expand All @@ -81,30 +103,39 @@ impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
SEARCH_METRICS
.root_search_targeted_splits
.with_label_values(label_values)
.observe(num_targeted_splits as f64);
.observe(self.num_targeted_splits as f64);
}
}

impl<F, R, E> Future for RootSearchMetricsFuture<F>
where F: Future<Output = Result<R, E>>
impl<F> Future for RootSearchMetricsFuture<F>
where F: Future<Output = crate::Result<SearchResponse>>
{
type Output = Result<R, E>;
type Output = crate::Result<SearchResponse>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let response = ready!(this.tracked.poll(cx));
*this.is_success = Some(response.is_ok());
if let Err(err) = &response {
tracing::error!(?err, "root search failed");
}
if let Ok(resp) = &response {
if resp.failed_splits.is_empty() {
*this.status = Some("success");
} else {
*this.status = Some("partial-success");
}
} else {
*this.status = Some("error");
}
Poll::Ready(Ok(response?))
}
}

// leaf
// leaf search

/// Wrapper around the search future to track metrics.
#[pin_project(PinnedDrop)]
pub struct LeafSearchMetricsFuture<F>
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
{
pub struct LeafSearchMetricsFuture<F> {
#[pin]
pub tracked: F,
pub start: Instant,
Expand All @@ -113,9 +144,7 @@ where F: Future<Output = Result<LeafSearchResponse, SearchError>>
}

#[pinned_drop]
impl<F> PinnedDrop for LeafSearchMetricsFuture<F>
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
{
impl<F> PinnedDrop for LeafSearchMetricsFuture<F> {
fn drop(self: Pin<&mut Self>) {
let label_values = [self.status.unwrap_or("cancelled")];
SEARCH_METRICS
Expand All @@ -141,10 +170,10 @@ where F: Future<Output = Result<LeafSearchResponse, SearchError>>
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let response = ready!(this.tracked.poll(cx));
*this.status = if response.is_ok() {
Some("success")
} else {
Some("error")
*this.status = match &response {
Ok(resp) if !resp.failed_splits.is_empty() => Some("partial-success"),
Ok(_) => Some("success"),
Err(_) => Some("error"),
};
Poll::Ready(Ok(response?))
}
Expand Down
11 changes: 4 additions & 7 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use tracing::{debug, info, info_span, instrument};

use crate::cluster_client::ClusterClient;
use crate::collector::{QuickwitAggregations, make_merge_collector};
use crate::metrics_trackers::{RootSearchMetricsFuture, RootSearchMetricsStep};
use crate::metrics_trackers::{RootSearchMetricsFuture, SearchPlanMetricsFuture};
use crate::scroll_context::{ScrollContext, ScrollKeyAndStartOffset};
use crate::search_job_placer::{Job, group_by, group_jobs_by_index_id};
use crate::search_response_rest::StorageRequestCount;
Expand Down Expand Up @@ -1201,11 +1201,10 @@ pub async fn root_search(
) -> crate::Result<SearchResponse> {
let start_instant = Instant::now();

let (split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture {
let (split_metadatas, indexes_meta_for_leaf_search) = SearchPlanMetricsFuture {
start: start_instant,
tracked: plan_splits_for_root_search(&mut search_request, &mut metastore),
is_success: None,
step: RootSearchMetricsStep::Plan,
}
.await?;

Expand Down Expand Up @@ -1233,10 +1232,8 @@ pub async fn root_search(
split_metadatas,
cluster_client,
),
is_success: None,
step: RootSearchMetricsStep::Exec {
num_targeted_splits: num_splits,
},
status: None,
num_targeted_splits: num_splits,
}
.await;

Expand Down