From c597a4e7ae519288d9e9b1ecaa9e4e48b3ac8059 Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Wed, 26 Mar 2025 07:17:49 +0200 Subject: [PATCH 1/2] bump DF --- Cargo.lock | 150 +++++++++++++++++++++++++++++------------------------ 1 file changed, 81 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7bad13e4a..b31fbfe98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,9 +282,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e899dade2c3b7f5642eb8366cfd898958bcca099cde6dfea543c7e8d3ad88d4" +checksum = "bc6ed265c73f134a583d02c3cab5e16afab9446d8048ede8707e31f85fad58a0" dependencies = [ "bytes", "half", @@ -330,9 +330,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a329fb064477c9ec5f0870d2f5130966f91055c7c5bce2b3a084f116bc28c3b" +checksum = "5f2cebf504bb6a92a134a87fff98f01b14fbb3a93ecf7aef90cd0f888c5fffa4" dependencies = [ "arrow-buffer", "arrow-schema", @@ -402,9 +402,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85934a9d0261e0fa5d4e2a5295107d743b543a6e0484a835d4b8db2da15306f9" +checksum = "a5c53775bba63f319189f366d2b86e9a8889373eb198f07d8544938fc9f8ed9a" dependencies = [ "bitflags 2.9.0", "serde", @@ -794,6 +794,16 @@ dependencies = [ "aws-smithy-types", ] +[[package]] +name = "aws-smithy-observability" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445d065e76bc1ef54963db400319f1dd3ebb3e0a74af20f7f7630625b0cc7cc0" +dependencies = [ + "aws-smithy-runtime-api", + "once_cell", +] + [[package]] name = "aws-smithy-query" version = "0.60.7" @@ -806,13 +816,14 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6328865e36c6fd970094ead6b05efd047d3a80ec5fc3be5e743910da9f2ebf8" +checksum = "0152749e17ce4d1b47c7747bdfec09dac1ccafdcbc741ebf9daa2a373356730f" dependencies = [ "aws-smithy-async", "aws-smithy-http", "aws-smithy-http-client", + "aws-smithy-observability", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -1107,9 +1118,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.16" +version = "1.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c" +checksum = "1fcb57c740ae1daf453ae85f16e37396f672b039e00d9d866e07ddb24e328e3a" dependencies = [ "jobserver", "libc", @@ -1154,9 +1165,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.10.1" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c6ac4f2c0bf0f44e9161aec9675e1050aa4a530663c4a9e37e108fa948bca9f" +checksum = "efdce149c370f133a071ca8ef6ea340b7b88748ab0810097a9e2976eaa34b4f3" dependencies = [ "chrono", "chrono-tz-build", @@ -1165,9 +1176,9 @@ dependencies = [ [[package]] name = "chrono-tz-build" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94fea34d77a245229e7746bd2beb786cd2a896f306ff491fb8cecb3074b10a7" +checksum = "8f10f8c9340e31fc120ff885fcdb54a0b48e474bbd77cab557f0c30a3e569402" dependencies = [ "parse-zoneinfo", "phf_codegen", @@ -1384,7 +1395,7 @@ dependencies = [ [[package]] name = "datafusion" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "apache-avro", "arrow", @@ -1436,7 +1447,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "async-trait", @@ -1455,7 +1466,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "async-trait", @@ -1476,7 +1487,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "ahash", "apache-avro", @@ -1502,7 +1513,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "log", "tokio", @@ -1511,7 +1522,7 @@ dependencies = [ [[package]] name = "datafusion-datasource" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "async-compression", @@ -1544,12 +1555,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" [[package]] name = "datafusion-execution" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "dashmap", @@ -1567,7 +1578,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "chrono", @@ -1588,7 +1599,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "datafusion-common", @@ -1619,7 +1630,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "arrow-buffer", @@ -1647,7 +1658,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "ahash", "arrow", @@ -1667,7 +1678,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "ahash", "arrow", @@ -1679,7 +1690,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "arrow-ord", @@ -1699,7 +1710,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "async-trait", @@ -1714,7 +1725,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "datafusion-common", "datafusion-doc", @@ -1730,7 +1741,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1739,7 +1750,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "datafusion-expr", "quote", @@ -1749,7 +1760,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "chrono", @@ -1770,7 +1781,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "ahash", "arrow", @@ -1791,7 +1802,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "ahash", "arrow", @@ -1804,7 +1815,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "datafusion-common", @@ -1822,7 +1833,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "ahash", "arrow", @@ -1851,7 +1862,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "chrono", @@ -1866,7 +1877,7 @@ dependencies = [ [[package]] name = "datafusion-proto-common" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "datafusion-common", @@ -1901,7 +1912,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "arrow", "bigdecimal", @@ -1917,7 +1928,7 @@ dependencies = [ [[package]] name = "datafusion-substrait" version = "46.0.0" -source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#4b23a6fb47238f64b0704fef2e9d9e90271422e9" +source = "git+https://github.com/hstack/arrow-datafusion.git?branch=main#98b93f342b4a61d52b756144da424bdbf272d95b" dependencies = [ "async-recursion", "async-trait", @@ -2094,9 +2105,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +checksum = "28cfac68e08048ae1883171632c2aef3ebc555621ae56fbccce1cbf22dd7f058" dependencies = [ "powerfmt", ] @@ -2685,14 +2696,15 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.61" +version = "0.1.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +checksum = "b2fd658b06e56721792c5df4475705b6cda790e9298d19d2f8af083457bcd127" dependencies = [ "android_system_properties", "core-foundation-sys", "iana-time-zone-haiku", "js-sys", + "log", "wasm-bindgen", "windows-core", ] @@ -2914,9 +2926,9 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "jiff" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d699bc6dfc879fb1bf9bdff0d4c56f0884fc6f0d0eb0fba397a6d00cd9a6b85e" +checksum = "c102670231191d07d37a35af3eb77f1f0dbf7a71be51a962dcd57ea607be7260" dependencies = [ "jiff-static", "log", @@ -2927,9 +2939,9 @@ dependencies = [ [[package]] name = "jiff-static" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d16e75759ee0aa64c57a56acbf43916987b20c77373cb7e808979e02b93c9f9" +checksum = "4cdde31a9d349f1b1f51a0b3714a5940ac022976f4b49485fc04be052b183b4c" dependencies = [ "proc-macro2", "quote", @@ -3127,9 +3139,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.26" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "lz4_flex" @@ -3609,7 +3621,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" dependencies = [ - "zerocopy 0.8.23", + "zerocopy 0.8.24", ] [[package]] @@ -3785,9 +3797,9 @@ checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" [[package]] name = "quick-xml" -version = "0.37.2" +version = "0.37.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "165859e9e55f79d67b96c5d96f4e88b6f2695a1972849c15a6a3f5c59fc2c003" +checksum = "bf763ab1c7a3aa408be466efc86efe35ed1bd3dd74173ed39d6b0d0a6f0ba148" dependencies = [ "memchr", "serde", @@ -3835,9 +3847,9 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944" +checksum = "541d0f57c6ec747a90738a52741d3221f7960e8ac2f0ff4b1a63680e033b4ab5" dependencies = [ "cfg_aliases", "libc", @@ -3881,7 +3893,7 @@ checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.3", - "zerocopy 0.8.23", + "zerocopy 0.8.24", ] [[package]] @@ -4155,7 +4167,7 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.0", + "rustls-webpki 0.103.1", "subtle", "zeroize", ] @@ -4223,9 +4235,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.0" +version = "0.103.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aa4eeac2588ffff23e9d7a7e9b3f971c5fb5b7ebc9452745e0c232c64f83b2f" +checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03" dependencies = [ "aws-lc-rs", "ring", @@ -4757,9 +4769,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.40" +version = "0.3.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d9c75b47bdff86fa3334a3db91356b8d7d86a9b839dab7d0bdc5c3d3a077618" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" dependencies = [ "deranged", "num-conv", @@ -4777,9 +4789,9 @@ checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" [[package]] name = "time-macros" -version = "0.2.21" +version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29aa485584182073ed57fd5004aa09c371f021325014694e432313345865fd04" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" dependencies = [ "num-conv", "time-core", @@ -5605,11 +5617,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.23" +version = "0.8.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" +checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" dependencies = [ - "zerocopy-derive 0.8.23", + "zerocopy-derive 0.8.24", ] [[package]] @@ -5625,9 +5637,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.23" +version = "0.8.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154" +checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" dependencies = [ "proc-macro2", "quote", From edff9574c36320f6a5f84e15d054e12abdc93e98 Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Mon, 24 Mar 2025 11:11:11 +0200 Subject: [PATCH 2/2] [HSTACK] FIXUP distributed plan serde cleanup --- python/datafusion/__init__.py | 3 +- src/dataframe.rs | 150 ++++++++++++++-------------------- src/lib.rs | 1 - 3 files changed, 61 insertions(+), 93 deletions(-) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index ea7527376..f11ce54a6 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -29,7 +29,7 @@ from . import functions, object_store, substrait # The following imports are okay to remain as opaque to the user. -from ._internal import Config, partition_stream +from ._internal import Config from .catalog import Catalog, Database, Table from .common import ( DFSchema, @@ -86,7 +86,6 @@ "read_avro", "read_csv", "read_json", - "partition_stream", ] diff --git a/src/dataframe.rs b/src/dataframe.rs index a218a7b22..c1bd00c13 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -29,38 +29,37 @@ use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; use datafusion::arrow::util::pretty; use datafusion::common::stats::Precision; -use datafusion::common::{DFSchema, DataFusionError, Statistics, UnnestOptions}; use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::common::{DFSchema, DataFusionError, Statistics, UnnestOptions}; use datafusion::config::{CsvOptions, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::datasource::memory::DataSourceExec; -use datafusion::datasource::TableProvider; use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::datasource::source::DataSource; -use datafusion::execution::{SendableRecordBatchStream}; +use datafusion::datasource::TableProvider; +use datafusion::execution::SendableRecordBatchStream; use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use datafusion::prelude::*; -use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec}; -use datafusion_proto::protobuf::PhysicalPlanNode; -use deltalake::delta_datafusion::DeltaPhysicalCodec; -use prost::Message; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; -use pyo3::types::{PyBytes, PyCapsule, PyDict, PyTuple, PyTupleMethods}; +use pyo3::types::{PyCapsule, PyTuple, PyTupleMethods}; use tokio::task::JoinHandle; use crate::catalog::PyTable; use crate::common::df_schema::PyDFSchema; use crate::errors::{py_datafusion_err, PyDataFusionError}; use crate::expr::sort_expr::to_sort_expressions; -use crate::physical_plan::{ codec, PyExecutionPlan } ; +use crate::physical_plan::PyExecutionPlan; use crate::record_batch::PyRecordBatchStream; use crate::sql::logical::PyLogicalPlan; use crate::utils::{get_tokio_runtime, validate_pycapsule, wait_for_future}; -use crate::{errors::PyDataFusionResult, expr::{sort_expr::PySortExpr, PyExpr}}; +use crate::{ + errors::PyDataFusionResult, + expr::{sort_expr::PySortExpr, PyExpr}, +}; // https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116 // - we have not decided on the table_provider approach yet @@ -712,7 +711,6 @@ impl PyDataFrame { let future_plan = DistributedPlan::try_new(self.df.as_ref()); wait_for_future(py, future_plan).map_err(py_datafusion_err) } - } #[pyclass(get_all)] @@ -724,37 +722,16 @@ pub struct DistributedPlan { #[pymethods] impl DistributedPlan { - - fn marshal(&self, py: Python) -> PyResult { - let bytes = PhysicalPlanNode::try_from_physical_plan(self.plan().clone(), codec()) - .map(|node| node.encode_to_vec()) - .map_err(py_datafusion_err)?; - let state = PyDict::new(py); - state.set_item("plan", PyBytes::new(py, bytes.as_slice()))?; - state.set_item("min_size", self.min_size)?; - Ok(state.into()) - } - #[new] - fn unmarshal(state: Bound) -> PyResult{ - let ctx = SessionContext::new(); - let serialized_plan = state.get_item("plan")? - .expect("missing key `plan` from state"); - let serialized_plan = serialized_plan - .downcast::()? - .as_bytes(); - let min_size = state.get_item("min_size")? - .expect("missing key `min_size` from state") - .extract::()?; - let plan = deserialize_plan(serialized_plan, &ctx)?; + fn new(physical_plan: PyExecutionPlan, min_size: usize) -> PyResult { Ok(Self { min_size, - physical_plan: PyExecutionPlan::new(plan), + physical_plan, }) } fn partition_count(&self) -> usize { - self.plan().output_partitioning().partition_count() + self.physical_plan.partition_count() } fn num_bytes(&self) -> Option { @@ -772,51 +749,68 @@ impl DistributedPlan { } fn set_desired_parallelism(&mut self, desired_parallelism: usize) -> PyResult<()> { - let updated_plan = self.plan().clone().transform_up(|node| { - if let Some(exec) = node.as_any().downcast_ref::() { - // Remove redundant ranges from partition files because FileScanConfig refuses to repartition - // if any file has a range defined (even when the range actually covers the entire file). - // The EnforceDistribution optimizer rule adds ranges for both full and partial files, - // so this tries to revert that in order to trigger a repartition when no files are actually split. - if let Some(file_scan) = exec.data_source().as_any().downcast_ref::() { - let mut range_free_file_scan = file_scan.clone(); - let mut total_size: usize = 0; - for group in range_free_file_scan.file_groups.iter_mut() { - for file in group.iter_mut() { - if let Some(range) = &file.range { - total_size += (range.end - range.start) as usize; - if range.start == 0 && range.end == file.object_meta.size as i64 { - file.range = None; // remove redundant range + let updated_plan = self + .plan() + .clone() + .transform_up(|node| { + if let Some(exec) = node.as_any().downcast_ref::() { + // Remove redundant ranges from partition files because FileScanConfig refuses to repartition + // if any file has a range defined (even when the range actually covers the entire file). + // The EnforceDistribution optimizer rule adds ranges for both full and partial files, + // so this tries to revert that in order to trigger a repartition when no files are actually split. + if let Some(file_scan) = + exec.data_source().as_any().downcast_ref::() + { + let mut range_free_file_scan = file_scan.clone(); + let mut total_size: usize = 0; + for group in range_free_file_scan.file_groups.iter_mut() { + for file in group.iter_mut() { + if let Some(range) = &file.range { + total_size += (range.end - range.start) as usize; + if range.start == 0 && range.end == file.object_meta.size as i64 + { + file.range = None; // remove redundant range + } + } else { + total_size += file.object_meta.size; } - } else { - total_size += file.object_meta.size; } } - } - let min_size_buckets = max(1, total_size.div_ceil(self.min_size)); - let partitions = min(min_size_buckets, desired_parallelism); - let ordering = range_free_file_scan.eq_properties().output_ordering(); - if let Some(repartitioned) = range_free_file_scan - .repartitioned(partitions, 1, ordering)? { - return Ok(Transformed::yes(Arc::new(DataSourceExec::new(repartitioned)))) + let min_size_buckets = max(1, total_size.div_ceil(self.min_size)); + let partitions = min(min_size_buckets, desired_parallelism); + let ordering = range_free_file_scan.eq_properties().output_ordering(); + if let Some(repartitioned) = + range_free_file_scan.repartitioned(partitions, 1, ordering)? + { + return Ok(Transformed::yes(Arc::new(DataSourceExec::new( + repartitioned, + )))); + } } } - } - Ok(Transformed::no(node)) - }).map_err(py_datafusion_err)?.data; + Ok(Transformed::no(node)) + }) + .map_err(py_datafusion_err)? + .data; self.physical_plan = PyExecutionPlan::new(updated_plan); Ok(()) } } impl DistributedPlan { - async fn try_new(df: &DataFrame) -> Result { let (mut session_state, logical_plan) = df.clone().into_parts(); - let min_size = session_state.config_options().optimizer.repartition_file_min_size; + let min_size = session_state + .config_options() + .optimizer + .repartition_file_min_size; // Create the physical plan with a single partition, to ensure that no files are split into ranges. // Otherwise, any subsequent repartition attempt would fail (see the comment in `set_desired_parallelism`) - session_state.config_mut().options_mut().execution.target_partitions = 1; + session_state + .config_mut() + .options_mut() + .execution + .target_partitions = 1; let physical_plan = session_state.create_physical_plan(&logical_plan).await?; let physical_plan = PyExecutionPlan::new(physical_plan); Ok(Self { @@ -830,7 +824,7 @@ impl DistributedPlan { } fn stats_field(&self, field: fn(Statistics) -> Precision) -> Option { - if let Ok(stats) = self.physical_plan.plan.statistics() { + if let Ok(stats) = self.plan().statistics() { match field(stats) { Precision::Exact(n) => Some(n), _ => None, @@ -839,30 +833,6 @@ impl DistributedPlan { None } } - -} - -fn deserialize_plan(serialized_plan: &[u8], ctx: &SessionContext) -> PyResult> { - deltalake::ensure_initialized(); - let node = PhysicalPlanNode::decode(serialized_plan) - .map_err(|e| DataFusionError::External(Box::new(e))) - .map_err(py_datafusion_err)?; - let plan = node.try_into_physical_plan(ctx, ctx.runtime_env().as_ref(), codec()) - .map_err(py_datafusion_err)?; - Ok(plan) -} - -#[pyfunction] -pub fn partition_stream(serialized_plan: &[u8], partition: usize, py: Python) -> PyResult { - let ctx = SessionContext::new(); - let plan = deserialize_plan(serialized_plan, &ctx)?; - let stream_with_runtime = get_tokio_runtime().0.spawn(async move { - plan.execute(partition, ctx.task_ctx()) - }); - wait_for_future(py, stream_with_runtime) - .map_err(py_datafusion_err)? - .map(PyRecordBatchStream::new) - .map_err(py_datafusion_err) } /// Print DataFrame diff --git a/src/lib.rs b/src/lib.rs index 43f154d85..04d6f45d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,7 +117,6 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { setup_substrait_module(py, &m)?; m.add_class::()?; - m.add_wrapped(wrap_pyfunction!(dataframe::partition_stream))?; Ok(()) }