From 896867502e92f5586715cefa67ebe1bfff35d052 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 26 Jul 2025 23:11:27 +0800 Subject: [PATCH 1/4] fix: patch futures buffer_by_ordered Signed-off-by: tison Co-authored-by: Andy Lok Signed-off-by: tison --- bin/oli/Cargo.lock | 1 + core/Cargo.lock | 1 + core/Cargo.toml | 1 + core/src/lib.rs | 5 +- core/src/patches/buffer_by_ordered.rs | 158 ++++++++++++++++++++++++++ core/src/patches/mod.rs | 19 ++++ core/src/patches/stream.rs | 34 ++++++ core/src/raw/futures_util.rs | 14 ++- core/src/raw/oio/list/page_list.rs | 1 + core/src/types/read/reader.rs | 6 +- 10 files changed, 232 insertions(+), 8 deletions(-) create mode 100644 core/src/patches/buffer_by_ordered.rs create mode 100644 core/src/patches/mod.rs create mode 100644 core/src/patches/stream.rs diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock index faf8fa562697..cb3f63654da2 100644 --- a/bin/oli/Cargo.lock +++ b/bin/oli/Cargo.lock @@ -1397,6 +1397,7 @@ dependencies = [ "log", "md-5", "percent-encoding", + "pin-project-lite", "prost", "quick-xml 0.38.0", "reqsign", diff --git a/core/Cargo.lock b/core/Cargo.lock index c6edf5426554..db1b6b061eaf 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -5419,6 +5419,7 @@ dependencies = [ "ouroboros", "percent-encoding", "persy", + "pin-project-lite", "pretty_assertions", "probe", "prometheus 0.14.0", diff --git a/core/Cargo.toml b/core/Cargo.toml index 50e4c897a13f..9bd23f2b32b6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -255,6 +255,7 @@ http-body = "1" log = "0.4" md-5 = "0.10" percent-encoding = "2" +pin-project-lite = { version = "0.2.16"} quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] } reqwest = { version = "0.12.22", features = [ "stream", diff --git a/core/src/lib.rs b/core/src/lib.rs index 864dfb4527a9..1ecaf22a224b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -153,7 +153,10 @@ //! - [Performance Guide][crate::docs::performance] // Make sure all our public APIs have docs. -#![warn(missing_docs)] +#![deny(missing_docs)] + +// Private modules, they will not be accessed by users. +mod patches; // Private module with public types, they will be accessed via `opendal::Xxxx` mod types; diff --git a/core/src/patches/buffer_by_ordered.rs b/core/src/patches/buffer_by_ordered.rs new file mode 100644 index 000000000000..3f6b0da01159 --- /dev/null +++ b/core/src/patches/buffer_by_ordered.rs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::fmt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use futures::future::Future; +use futures::stream::Fuse; +use futures::stream::FuturesOrdered; +use futures::stream::Stream; +use futures::stream::StreamExt; + +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`buffer_by_ordered`] method. + /// + /// [`buffer_by_ordered`]: crate::StreamExt::buffer_by_ordered + #[must_use = "streams do nothing unless polled"] + pub struct BufferByOrdered + where + St: Stream, + F: Future, + { + #[pin] + stream: Fuse, + in_progress_queue: FuturesOrdered>, + ready_queue: VecDeque<(F::Output, usize)>, + max_size: usize, + current_size: usize, + } +} + +impl fmt::Debug for BufferByOrdered +where + St: Stream + fmt::Debug, + F: Future, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BufferByOrdered") + .field("stream", &self.stream) + .field("in_progress_queue", &self.in_progress_queue) + .field("max_size", &self.max_size) + .field("current_size", &self.current_size) + .finish() + } +} + +impl BufferByOrdered +where + St: Stream, + F: Future, +{ + pub(crate) fn new(stream: St, max_size: usize) -> Self { + Self { + stream: stream.fuse(), + in_progress_queue: FuturesOrdered::new(), + ready_queue: VecDeque::new(), + max_size, + current_size: 0, + } + } +} + +impl Stream for BufferByOrdered +where + St: Stream, + F: Future, +{ + type Item = F::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + // First up, try to spawn off as many futures as possible by filling up + // our queue of futures. + while this.current_size < this.max_size { + match this.stream.as_mut().poll_next(cx) { + Poll::Ready(Some((future, size))) => { + *this.current_size += size; + this.in_progress_queue + .push_back(SizedFuture { future, size }); + } + Poll::Ready(None) => break, + Poll::Pending => break, + } + } + + // Try to poll all ready futures in the in_progress_queue. + loop { + match this.in_progress_queue.poll_next_unpin(cx) { + Poll::Ready(Some(output)) => { + this.ready_queue.push_back(output); + } + Poll::Ready(None) => break, + Poll::Pending => break, + } + } + + if let Some((output, size)) = this.ready_queue.pop_front() { + // If we have any ready outputs, return the first one. + *this.current_size -= size; + Poll::Ready(Some(output)) + } else if this.stream.is_done() && this.in_progress_queue.is_empty() { + Poll::Ready(None) + } else { + Poll::Pending + } + } + + fn size_hint(&self) -> (usize, Option) { + let queue_len = self.in_progress_queue.len() + self.ready_queue.len(); + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(queue_len); + let upper = match upper { + Some(x) => x.checked_add(queue_len), + None => None, + }; + (lower, upper) + } +} + +pin_project! { + struct SizedFuture { + #[pin] + future: F, + size: usize, + } +} + +impl Future for SizedFuture { + type Output = (F::Output, usize); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.future.poll(cx) { + Poll::Ready(output) => Poll::Ready((output, *this.size)), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/core/src/patches/mod.rs b/core/src/patches/mod.rs new file mode 100644 index 000000000000..90992883ef26 --- /dev/null +++ b/core/src/patches/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod buffer_by_ordered; +pub(crate) mod stream; diff --git a/core/src/patches/stream.rs b/core/src/patches/stream.rs new file mode 100644 index 000000000000..55c0c67c4263 --- /dev/null +++ b/core/src/patches/stream.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::patches::buffer_by_ordered::BufferByOrdered; + +use futures::Stream; +use std::future::Future; + +pub trait StreamExt: Stream { + fn buffer_by_ordered(self, max_size: usize) -> BufferByOrdered + where + Self: Sized, + Self: Stream, + F: Future, + { + BufferByOrdered::new(self, max_size) + } +} + +impl StreamExt for T where T: Stream {} diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index 44deab8d9016..1c6cd9386d18 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -25,19 +25,17 @@ use futures::FutureExt; use crate::*; /// BoxedFuture is the type alias of [`futures::future::BoxFuture`]. -/// -/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target. #[cfg(not(target_arch = "wasm32"))] pub type BoxedFuture<'a, T> = futures::future::BoxFuture<'a, T>; #[cfg(target_arch = "wasm32")] +/// BoxedFuture is the type alias of [`futures::future::LocalBoxFuture`]. pub type BoxedFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>; /// BoxedStaticFuture is the type alias of [`futures::future::BoxFuture`]. -/// -/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target. #[cfg(not(target_arch = "wasm32"))] pub type BoxedStaticFuture = futures::future::BoxFuture<'static, T>; #[cfg(target_arch = "wasm32")] +/// BoxedStaticFuture is the type alias of [`futures::future::LocalBoxFuture`]. pub type BoxedStaticFuture = futures::future::LocalBoxFuture<'static, T>; /// MaybeSend is a marker to determine whether a type is `Send` or not. @@ -49,6 +47,14 @@ pub type BoxedStaticFuture = futures::future::LocalBoxFuture<'static, T>; /// And it's empty trait on wasm32 target to indicate that a type is not `Send`. #[cfg(not(target_arch = "wasm32"))] pub trait MaybeSend: Send {} + +/// MaybeSend is a marker to determine whether a type is `Send` or not. +/// We use this trait to wrap the `Send` requirement for wasm32 target. +/// +/// # Safety +/// +/// [`MaybeSend`] is equivalent to `Send` on non-wasm32 target. +/// And it's empty trait on wasm32 target to indicate that a type is not `Send`. #[cfg(target_arch = "wasm32")] pub trait MaybeSend {} diff --git a/core/src/raw/oio/list/page_list.rs b/core/src/raw/oio/list/page_list.rs index 2e146e33ce64..efdd826e3c23 100644 --- a/core/src/raw/oio/list/page_list.rs +++ b/core/src/raw/oio/list/page_list.rs @@ -36,6 +36,7 @@ pub trait PageList: Send + Sync + Unpin + 'static { #[cfg(not(target_arch = "wasm32"))] fn next_page(&self, ctx: &mut PageContext) -> impl Future> + MaybeSend; #[cfg(target_arch = "wasm32")] + /// next_page is used to fetch next page of entries from underlying storage. fn next_page(&self, ctx: &mut PageContext) -> impl Future>; } diff --git a/core/src/types/read/reader.rs b/core/src/types/read/reader.rs index 34e8cc67fb49..c827e9fd3885 100644 --- a/core/src/types/read/reader.rs +++ b/core/src/types/read/reader.rs @@ -21,9 +21,9 @@ use std::sync::Arc; use bytes::BufMut; use futures::stream; -use futures::StreamExt; use futures::TryStreamExt; +use crate::patches::stream::StreamExt; use crate::*; /// Reader is designed to read data from given path in an asynchronous @@ -147,8 +147,8 @@ impl Reader { let merged_ranges = self.merge_ranges(ranges.clone()); let merged_bufs: Vec<_> = - stream::iter(merged_ranges.clone().into_iter().map(|v| self.read(v))) - .buffered(self.ctx.options().concurrent()) + stream::iter(merged_ranges.clone().into_iter().map(|v| (self.read(v), 1))) + .buffer_by_ordered(self.ctx.options().concurrent()) .try_collect() .await?; From ffdf53aa059d3a9c5e476150d6f90c27cf757ab3 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 27 Jul 2025 09:42:05 +0800 Subject: [PATCH 2/4] feat: fetch concurrency by ConcurrentTasks Signed-off-by: tison --- core/src/lib.rs | 3 - core/src/patches/buffer_by_ordered.rs | 158 -------------------------- core/src/patches/mod.rs | 19 ---- core/src/patches/stream.rs | 34 ------ core/src/types/read/reader.rs | 35 ++++-- 5 files changed, 28 insertions(+), 221 deletions(-) delete mode 100644 core/src/patches/buffer_by_ordered.rs delete mode 100644 core/src/patches/mod.rs delete mode 100644 core/src/patches/stream.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 1ecaf22a224b..f7b490f9dd62 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -155,9 +155,6 @@ // Make sure all our public APIs have docs. #![deny(missing_docs)] -// Private modules, they will not be accessed by users. -mod patches; - // Private module with public types, they will be accessed via `opendal::Xxxx` mod types; pub use types::*; diff --git a/core/src/patches/buffer_by_ordered.rs b/core/src/patches/buffer_by_ordered.rs deleted file mode 100644 index 3f6b0da01159..000000000000 --- a/core/src/patches/buffer_by_ordered.rs +++ /dev/null @@ -1,158 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::VecDeque; -use std::fmt; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -use futures::future::Future; -use futures::stream::Fuse; -use futures::stream::FuturesOrdered; -use futures::stream::Stream; -use futures::stream::StreamExt; - -use pin_project_lite::pin_project; - -pin_project! { - /// Stream for the [`buffer_by_ordered`] method. - /// - /// [`buffer_by_ordered`]: crate::StreamExt::buffer_by_ordered - #[must_use = "streams do nothing unless polled"] - pub struct BufferByOrdered - where - St: Stream, - F: Future, - { - #[pin] - stream: Fuse, - in_progress_queue: FuturesOrdered>, - ready_queue: VecDeque<(F::Output, usize)>, - max_size: usize, - current_size: usize, - } -} - -impl fmt::Debug for BufferByOrdered -where - St: Stream + fmt::Debug, - F: Future, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("BufferByOrdered") - .field("stream", &self.stream) - .field("in_progress_queue", &self.in_progress_queue) - .field("max_size", &self.max_size) - .field("current_size", &self.current_size) - .finish() - } -} - -impl BufferByOrdered -where - St: Stream, - F: Future, -{ - pub(crate) fn new(stream: St, max_size: usize) -> Self { - Self { - stream: stream.fuse(), - in_progress_queue: FuturesOrdered::new(), - ready_queue: VecDeque::new(), - max_size, - current_size: 0, - } - } -} - -impl Stream for BufferByOrdered -where - St: Stream, - F: Future, -{ - type Item = F::Output; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - // First up, try to spawn off as many futures as possible by filling up - // our queue of futures. - while this.current_size < this.max_size { - match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some((future, size))) => { - *this.current_size += size; - this.in_progress_queue - .push_back(SizedFuture { future, size }); - } - Poll::Ready(None) => break, - Poll::Pending => break, - } - } - - // Try to poll all ready futures in the in_progress_queue. - loop { - match this.in_progress_queue.poll_next_unpin(cx) { - Poll::Ready(Some(output)) => { - this.ready_queue.push_back(output); - } - Poll::Ready(None) => break, - Poll::Pending => break, - } - } - - if let Some((output, size)) = this.ready_queue.pop_front() { - // If we have any ready outputs, return the first one. - *this.current_size -= size; - Poll::Ready(Some(output)) - } else if this.stream.is_done() && this.in_progress_queue.is_empty() { - Poll::Ready(None) - } else { - Poll::Pending - } - } - - fn size_hint(&self) -> (usize, Option) { - let queue_len = self.in_progress_queue.len() + self.ready_queue.len(); - let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(queue_len); - let upper = match upper { - Some(x) => x.checked_add(queue_len), - None => None, - }; - (lower, upper) - } -} - -pin_project! { - struct SizedFuture { - #[pin] - future: F, - size: usize, - } -} - -impl Future for SizedFuture { - type Output = (F::Output, usize); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.future.poll(cx) { - Poll::Ready(output) => Poll::Ready((output, *this.size)), - Poll::Pending => Poll::Pending, - } - } -} diff --git a/core/src/patches/mod.rs b/core/src/patches/mod.rs deleted file mode 100644 index 90992883ef26..000000000000 --- a/core/src/patches/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub(crate) mod buffer_by_ordered; -pub(crate) mod stream; diff --git a/core/src/patches/stream.rs b/core/src/patches/stream.rs deleted file mode 100644 index 55c0c67c4263..000000000000 --- a/core/src/patches/stream.rs +++ /dev/null @@ -1,34 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::patches::buffer_by_ordered::BufferByOrdered; - -use futures::Stream; -use std::future::Future; - -pub trait StreamExt: Stream { - fn buffer_by_ordered(self, max_size: usize) -> BufferByOrdered - where - Self: Sized, - Self: Stream, - F: Future, - { - BufferByOrdered::new(self, max_size) - } -} - -impl StreamExt for T where T: Stream {} diff --git a/core/src/types/read/reader.rs b/core/src/types/read/reader.rs index c827e9fd3885..904e5f9a280a 100644 --- a/core/src/types/read/reader.rs +++ b/core/src/types/read/reader.rs @@ -20,10 +20,10 @@ use std::ops::RangeBounds; use std::sync::Arc; use bytes::BufMut; -use futures::stream; use futures::TryStreamExt; -use crate::patches::stream::StreamExt; +use crate::raw::Access; +use crate::raw::ConcurrentTasks; use crate::*; /// Reader is designed to read data from given path in an asynchronous @@ -146,11 +146,32 @@ impl Reader { pub async fn fetch(&self, ranges: Vec>) -> Result> { let merged_ranges = self.merge_ranges(ranges.clone()); - let merged_bufs: Vec<_> = - stream::iter(merged_ranges.clone().into_iter().map(|v| (self.read(v), 1))) - .buffer_by_ordered(self.ctx.options().concurrent()) - .try_collect() - .await?; + #[derive(Clone)] + struct FetchInput { + reader: Reader, + range: Range, + } + + let mut tasks = ConcurrentTasks::new( + self.ctx.accessor().info().executor(), + self.ctx.options().concurrent(), + |input: FetchInput| { + Box::pin(async move { + let FetchInput { range, reader } = input.clone(); + (input, reader.read(range).await) + }) + }, + ); + + for range in merged_ranges.clone() { + let reader = self.clone(); + tasks.create_task(FetchInput { reader, range }); + } + + let mut merged_bufs = vec![]; + while let Some(b) = tasks.next().await { + merged_bufs.push(b?); + } let mut bufs = Vec::with_capacity(ranges.len()); for range in ranges { From 8cd6e16df9d36ebd0355258d22d46f2e3b5a5cb8 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 27 Jul 2025 10:40:13 +0800 Subject: [PATCH 3/4] update con task Signed-off-by: tison --- core/src/types/read/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/types/read/reader.rs b/core/src/types/read/reader.rs index 904e5f9a280a..15c1cd46b1c8 100644 --- a/core/src/types/read/reader.rs +++ b/core/src/types/read/reader.rs @@ -165,7 +165,7 @@ impl Reader { for range in merged_ranges.clone() { let reader = self.clone(); - tasks.create_task(FetchInput { reader, range }); + tasks.execute(FetchInput { reader, range }).await?; } let mut merged_bufs = vec![]; From 044e9a91a63e1a67d3838040e14cfe2972b0ff29 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 27 Jul 2025 13:51:22 +0800 Subject: [PATCH 4/4] drop unused dep Signed-off-by: tison --- bin/oli/Cargo.lock | 1 - core/Cargo.lock | 1 - core/Cargo.toml | 1 - 3 files changed, 3 deletions(-) diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock index cb3f63654da2..faf8fa562697 100644 --- a/bin/oli/Cargo.lock +++ b/bin/oli/Cargo.lock @@ -1397,7 +1397,6 @@ dependencies = [ "log", "md-5", "percent-encoding", - "pin-project-lite", "prost", "quick-xml 0.38.0", "reqsign", diff --git a/core/Cargo.lock b/core/Cargo.lock index db1b6b061eaf..c6edf5426554 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -5419,7 +5419,6 @@ dependencies = [ "ouroboros", "percent-encoding", "persy", - "pin-project-lite", "pretty_assertions", "probe", "prometheus 0.14.0", diff --git a/core/Cargo.toml b/core/Cargo.toml index 9bd23f2b32b6..50e4c897a13f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -255,7 +255,6 @@ http-body = "1" log = "0.4" md-5 = "0.10" percent-encoding = "2" -pin-project-lite = { version = "0.2.16"} quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] } reqwest = { version = "0.12.22", features = [ "stream",