diff --git a/.github/workflows/unit-test-cpp.yml b/.github/workflows/unit-test-cpp.yml index 08c138018..429e0b95a 100644 --- a/.github/workflows/unit-test-cpp.yml +++ b/.github/workflows/unit-test-cpp.yml @@ -104,10 +104,16 @@ jobs: core.setOutput('platform_suffix', ``) } - - name: Install clang-format + - name: Install dependencies shell: bash run: | if [[ "$RUNNER_OS" == "Linux" ]]; then + if command -v apt-get >/dev/null 2>&1; then + sudo apt-get update + sudo apt-get install -y uuid-dev + elif command -v yum >/dev/null 2>&1; then + sudo yum install -y libuuid-devel + fi sudo update-alternatives --install /usr/bin/clang-format clang-format /usr/bin/clang-format-17 100 sudo update-alternatives --set clang-format /usr/bin/clang-format-17 elif [[ "$RUNNER_OS" == "Windows" ]]; then diff --git a/.github/workflows/unit-test-python.yml b/.github/workflows/unit-test-python.yml index c99935976..dd16ae3ec 100644 --- a/.github/workflows/unit-test-python.yml +++ b/.github/workflows/unit-test-python.yml @@ -60,6 +60,17 @@ jobs: key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2- + - name: Install dependencies + shell: bash + run: | + if [[ "$RUNNER_OS" == "Linux" ]]; then + if command -v apt-get >/dev/null 2>&1; then + sudo apt-get update + sudo apt-get install -y uuid-dev + elif command -v yum >/dev/null 2>&1; then + sudo yum install -y libuuid-devel + fi + fi # On Windows systems the 'mvnw' script needs an additional ".cmd" appended. - name: Calculate platform suffix id: platform_suffix diff --git a/.github/workflows/wheels.yml b/.github/workflows/wheels.yml new file mode 100644 index 000000000..6e9dacec4 --- /dev/null +++ b/.github/workflows/wheels.yml @@ -0,0 +1,169 @@ +name: Build TsFile wheels(multi-platform) + +on: + push: + branches: + - "release_v*.*.*" + pull_request: + paths: + - "cpp/**" + - "python/**" + - ".github/**" + workflow_dispatch: + +jobs: + build: + name: Build wheels on ${{ matrix.name }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + include: + - name: linux-x86_64 + os: ubuntu-22.04 + platform: linux + cibw_archs_linux: "x86_64" + + - name: linux-aarch64 + os: ubuntu-22.04-arm + platform: linux + cibw_archs_linux: "aarch64" + + - name: macos-x86_64 + os: macos-13 + platform: macos + cibw_archs_macos: "x86_64" + + - name: macos-arm64 + os: macos-14 + platform: macos + cibw_archs_macos: "arm64" +# currently, compile on windows is not supported for cibuildwheel +# - name: windows-amd64 +# os: windows-2022 +# platform: windows +# cibw_archs_windows: "AMD64" + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: false + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + + - name: Install system deps (macOS) + if: matrix.platform == 'macos' + run: | + set -eux + brew update + brew install pkg-config || true + + - name: Install build tools + run: | + python -m pip install -U pip wheel + python -m pip install cibuildwheel==2.21.3 + +# - name: Build C++ core via Maven(win) +# if: matrix.platform == 'windows' +# shell: bash +# run: | +# set -euxo pipefail +# chmod +x mvnw || true +# ./mvnw -Pwith-cpp clean verify package \ +# -DskipTests -Dspotless.check.skip=true -Dspotless.apply.skip=true +# test -d cpp/target/build/lib +# test -d cpp/target/build/include + + - name: Build C++ core via Maven (macOS) + if: matrix.platform == 'macos' + shell: bash + env: + MACOSX_DEPLOYMENT_TARGET: "12.0" + CFLAGS: "-mmacosx-version-min=12.0" + CXXFLAGS: "-mmacosx-version-min=12.0" + LDFLAGS: "-mmacosx-version-min=12.0" + run: | + set -euxo pipefail + chmod +x mvnw || true + ./mvnw -Pwith-cpp clean verify package \ + -DskipTests -Dspotless.check.skip=true -Dspotless.apply.skip=true \ + -Dcmake.args="-DCMAKE_OSX_DEPLOYMENT_TARGET=12.0" + otool -l cpp/target/build/lib/libtsfile*.dylib | grep -A2 LC_VERSION_MIN_MACOSX || true + + - name: Build wheels via cibuildwheel + if: matrix.platform != 'macos' + env: + CIBW_ARCHS_LINUX: ${{ matrix.cibw_archs_linux }} +# CIBW_ARCHS_WINDOWS: ${{ matrix.cibw_archs_windows }} + + CIBW_BUILD: "cp39-* cp310-* cp311-* cp312-* cp313-* cp314-*" + CIBW_SKIP: "pp* *-musllinux*" + + CIBW_MANYLINUX_X86_64_IMAGE: "manylinux2014" + CIBW_MANYLINUX_AARCH64_IMAGE: "manylinux2014" + + MACOSX_DEPLOYMENT_TARGET: "12.0" + + CIBW_BEFORE_ALL_LINUX: | + set -euxo pipefail + if command -v yum >/dev/null 2>&1; then + yum install -y wget tar gzip pkgconfig libuuid-devel libblkid-devel + else + echo "Not a yum-based image?" ; exit 1 + fi + ARCH="$(uname -m)" + mkdir -p /opt/java + if [ "$ARCH" = "x86_64" ]; then + JDK_URL="https://download.oracle.com/java/17/archive/jdk-17.0.12_linux-x64_bin.tar.gz" + else + # aarch64 + JDK_URL="https://download.oracle.com/java/17/archive/jdk-17.0.12_linux-aarch64_bin.tar.gz" + fi + curl -L -o /tmp/jdk17.tar.gz "$JDK_URL" + tar -xzf /tmp/jdk17.tar.gz -C /opt/java + export JAVA_HOME=$(echo /opt/java/jdk-17.0.12*) + export PATH="$JAVA_HOME/bin:$PATH" + java -version + + chmod +x mvnw || true + ./mvnw -Pwith-cpp clean verify package \ + -DskipTests -Dspotless.check.skip=true -Dspotless.apply.skip=true + test -d cpp/target/build/lib && test -d cpp/target/build/include + + CIBW_TEST_COMMAND: > + python -c "import tsfile, tsfile.tsfile_reader as r; print('import-ok:')" + CIBW_BUILD_VERBOSITY: "1" + run: cibuildwheel --output-dir wheelhouse python + + - name: Build wheels via cibuildwheel (macOS) + if: matrix.platform == 'macos' + env: + CIBW_ARCHS_MACOS: ${{ matrix.cibw_archs_macos }} + CIBW_BUILD: "cp39-* cp310-* cp311-* cp312-* cp313-* cp314-*" +# CIBW_BUILD: "cp313-*" + CIBW_SKIP: "pp*" + CIBW_ENVIRONMENT_MACOS: "MACOSX_DEPLOYMENT_TARGET=12.0" + MACOSX_DEPLOYMENT_TARGET: "12.0" + CIBW_TEST_COMMAND: > + python -c "import tsfile, tsfile.tsfile_reader as r; print('import-ok:')" + CIBW_BUILD_VERBOSITY: "1" + run: cibuildwheel --output-dir wheelhouse python + + - name: Upload wheels as artifact + uses: actions/upload-artifact@v4 + with: + name: tsfile-wheels-${{ matrix.name }} + path: wheelhouse/*.whl + + diff --git a/cpp/.clang-format b/cpp/.clang-format index 26651a2cd..da6eee613 100644 --- a/cpp/.clang-format +++ b/cpp/.clang-format @@ -65,7 +65,6 @@ ConstructorInitializerIndentWidth: 4 ContinuationIndentWidth: 4 Cpp11BracedListStyle: true DeriveLineEnding: true -DerivePointerAlignment: true DisableFormat: false EmptyLineAfterAccessModifier: Never EmptyLineBeforeAccessModifier: LogicalBlock @@ -207,7 +206,7 @@ SpacesInParentheses: false SpacesInSquareBrackets: false SpaceBeforeSquareBrackets: false BitFieldColonSpacing: Both -Standard: Auto +Standard: Cpp11 StatementAttributeLikeMacros: - Q_EMIT StatementMacros: diff --git a/cpp/pom.xml b/cpp/pom.xml index fa6b00708..6dd8c9085 100644 --- a/cpp/pom.xml +++ b/cpp/pom.xml @@ -22,7 +22,7 @@ org.apache.tsfile tsfile-parent - 2.2.0-SNAPSHOT + 2.1.5 tsfile-cpp pom @@ -151,39 +151,6 @@ - - linux-install-uuid-dev - - - unix - Linux - - - - - - org.codehaus.mojo - exec-maven-plugin - - - install-uuid-dev - validate - - exec - - - - - bash - - -c - sudo apt-get update && sudo apt-get install -y uuid-dev - - - - - - jenkins-build diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 86c67f2b4..3d2d292fa 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -36,6 +36,7 @@ set(ANTLR4_WITH_STATIC_CRT OFF) set(PROJECT_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/src ${THIRD_PARTY_INCLUDE}/google_snappy + ${THIRD_PARTY_INCLUDE}/zlib-1.2.13 ${CMAKE_SOURCE_DIR}/third_party/lz4 ${CMAKE_SOURCE_DIR}/third_party/lzokay ${CMAKE_SOURCE_DIR}/third_party/zlib-1.2.13 diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h index 06e7e7e42..499dd5bc7 100644 --- a/cpp/src/common/schema.h +++ b/cpp/src/common/schema.h @@ -46,8 +46,8 @@ struct MeasurementSchema { common::TSDataType data_type_; common::TSEncoding encoding_; common::CompressionType compression_type_; - storage::ChunkWriter *chunk_writer_; - ValueChunkWriter *value_chunk_writer_; + storage::ChunkWriter* chunk_writer_; + ValueChunkWriter* value_chunk_writer_; std::map props_; MeasurementSchema() @@ -58,7 +58,7 @@ struct MeasurementSchema { chunk_writer_(nullptr), value_chunk_writer_(nullptr) {} - MeasurementSchema(const std::string &measurement_name, + MeasurementSchema(const std::string& measurement_name, common::TSDataType data_type) : measurement_name_(measurement_name), data_type_(data_type), @@ -67,7 +67,7 @@ struct MeasurementSchema { chunk_writer_(nullptr), value_chunk_writer_(nullptr) {} - MeasurementSchema(const std::string &measurement_name, + MeasurementSchema(const std::string& measurement_name, common::TSDataType data_type, common::TSEncoding encoding, common::CompressionType compression_type) : measurement_name_(measurement_name), @@ -88,7 +88,7 @@ struct MeasurementSchema { } } - int serialize_to(common::ByteStream &out) { + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL( common::SerializationUtil::write_str(measurement_name_, out))) { @@ -102,7 +102,7 @@ struct MeasurementSchema { if (ret == common::E_OK) { if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(), out))) { - for (const auto &prop : props_) { + for (const auto& prop : props_) { if (RET_FAIL(common::SerializationUtil::write_str( prop.first, out))) { } else if (RET_FAIL(common::SerializationUtil::write_str( @@ -115,7 +115,7 @@ struct MeasurementSchema { return ret; } - int deserialize_from(common::ByteStream &in) { + int deserialize_from(common::ByteStream& in) { int ret = common::E_OK; uint8_t data_type = common::TSDataType::INVALID_DATATYPE, encoding = common::TSEncoding::INVALID_ENCODING, @@ -153,8 +153,8 @@ struct MeasurementSchema { } }; -typedef std::map MeasurementSchemaMap; -typedef std::map::iterator +typedef std::map MeasurementSchemaMap; +typedef std::map::iterator MeasurementSchemaMapIter; typedef std::pair MeasurementSchemaMapInsertResult; @@ -164,7 +164,7 @@ struct MeasurementSchemaGroup { // measurement_name -> MeasurementSchema MeasurementSchemaMap measurement_schema_map_; bool is_aligned_ = false; - TimeChunkWriter *time_chunk_writer_ = nullptr; + TimeChunkWriter* time_chunk_writer_ = nullptr; ~MeasurementSchemaGroup() { if (time_chunk_writer_ != nullptr) { @@ -195,11 +195,11 @@ class TableSchema { * Each ColumnSchema defines the schema for one column * in the table. */ - TableSchema(const std::string &table_name, - const std::vector &column_schemas) + TableSchema(const std::string& table_name, + const std::vector& column_schemas) : table_name_(table_name) { to_lowercase_inplace(table_name_); - for (const common::ColumnSchema &column_schema : column_schemas) { + for (const common::ColumnSchema& column_schema : column_schemas) { column_schemas_.emplace_back(std::make_shared( column_schema.get_column_name(), column_schema.get_data_type())); @@ -207,16 +207,16 @@ class TableSchema { column_schema.get_column_category()); } int idx = 0; - for (const auto &measurement_schema : column_schemas_) { + for (const auto& measurement_schema : column_schemas_) { to_lowercase_inplace(measurement_schema->measurement_name_); column_pos_index_.insert( std::make_pair(measurement_schema->measurement_name_, idx++)); } } - TableSchema(const std::string &table_name, - const std::vector &column_schemas, - const std::vector &column_categories) + TableSchema(const std::string& table_name, + const std::vector& column_schemas, + const std::vector& column_categories) : table_name_(table_name), column_categories_(column_categories) { to_lowercase_inplace(table_name_); for (const auto column_schema : column_schemas) { @@ -226,34 +226,42 @@ class TableSchema { } } int idx = 0; - for (const auto &measurement_schema : column_schemas_) { + for (const auto& measurement_schema : column_schemas_) { to_lowercase_inplace(measurement_schema->measurement_name_); column_pos_index_.insert( std::make_pair(measurement_schema->measurement_name_, idx++)); } } - TableSchema(TableSchema &&other) noexcept + TableSchema(TableSchema&& other) noexcept : table_name_(std::move(other.table_name_)), column_schemas_(std::move(other.column_schemas_)), column_categories_(std::move(other.column_categories_)) {} - TableSchema(const TableSchema &other) noexcept + TableSchema(const TableSchema& other) noexcept : table_name_(other.table_name_), column_categories_(other.column_categories_) { - for (const auto &column_schema : other.column_schemas_) { + for (const auto& column_schema : other.column_schemas_) { // Just call default construction column_schemas_.emplace_back( std::make_shared(*column_schema)); } int idx = 0; - for (const auto &measurement_schema : column_schemas_) { + for (const auto& measurement_schema : column_schemas_) { column_pos_index_.insert( std::make_pair(measurement_schema->measurement_name_, idx++)); } } - int serialize_to(common::ByteStream &out) { + // In cases where data is retrieved from a tree to form the table, + // there is no table name in the tree path, so adjustments are needed for + // this scenario. This flag is used specifically for such cases. + // TODO(Colin): remove this. + void set_virtual_table() { is_virtual_table_ = true; } + + bool is_virtual_table() { return is_virtual_table_; } + + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_var_uint( column_schemas_.size(), out))) { @@ -271,7 +279,7 @@ class TableSchema { return ret; } - int deserialize(common::ByteStream &in) { + int deserialize(common::ByteStream& in) { int ret = common::E_OK; uint32_t num_columns; if (RET_FAIL( @@ -294,9 +302,9 @@ class TableSchema { ~TableSchema() { column_schemas_.clear(); } - const std::string &get_table_name() { return table_name_; } + const std::string& get_table_name() { return table_name_; } - void set_table_name(const std::string &table_name) { + void set_table_name(const std::string& table_name) { table_name_ = table_name; } @@ -310,7 +318,7 @@ class TableSchema { int32_t get_columns_num() const { return column_schemas_.size(); } - int find_column_index(const std::string &column_name) { + int find_column_index(const std::string& column_name) { std::string lower_case_column_name = to_lower(column_name); auto it = column_pos_index_.find(lower_case_column_name); if (it != column_pos_index_.end()) { @@ -333,10 +341,10 @@ class TableSchema { size_t get_column_pos_index_num() const { return column_pos_index_.size(); } - void update(ChunkGroupMeta *chunk_group_meta) { + void update(ChunkGroupMeta* chunk_group_meta) { for (auto iter = chunk_group_meta->chunk_meta_list_.begin(); iter != chunk_group_meta->chunk_meta_list_.end(); iter++) { - auto &chunk_meta = iter.get(); + auto& chunk_meta = iter.get(); if (chunk_meta->data_type_ == common::VECTOR) { continue; } @@ -365,7 +373,7 @@ class TableSchema { std::vector get_data_types() const { std::vector ret; - for (const auto &measurement_schema : column_schemas_) { + for (const auto& measurement_schema : column_schemas_) { ret.emplace_back(measurement_schema->data_type_); } return ret; @@ -375,12 +383,12 @@ class TableSchema { return column_categories_; } - std::vector > get_measurement_schemas() + std::vector> get_measurement_schemas() const { return column_schemas_; } - common::ColumnSchema get_column_schema(const std::string &column_name) { + common::ColumnSchema get_column_schema(const std::string& column_name) { int column_idx = find_column_index(column_name); if (column_idx == -1) { return common::ColumnSchema(); @@ -394,7 +402,7 @@ class TableSchema { } } - int32_t find_id_column_order(const std::string &column_name) { + int32_t find_id_column_order(const std::string& column_name) { std::string lower_case_column_name = to_lower(column_name); int column_order = 0; @@ -412,17 +420,18 @@ class TableSchema { private: std::string table_name_; - std::vector > column_schemas_; + std::vector> column_schemas_; std::vector column_categories_; std::map column_pos_index_; + bool is_virtual_table_ = false; }; struct Schema { - typedef std::unordered_map > + typedef std::unordered_map> TableSchemasMap; TableSchemasMap table_schema_map_; - void update_table_schema(ChunkGroupMeta *chunk_group_meta) { + void update_table_schema(ChunkGroupMeta* chunk_group_meta) { std::shared_ptr device_id = chunk_group_meta->device_id_; auto table_name = device_id->get_table_name(); if (table_schema_map_.find(table_name) == table_schema_map_.end()) { @@ -431,7 +440,7 @@ struct Schema { table_schema_map_[table_name]->update(chunk_group_meta); } void register_table_schema( - const std::shared_ptr &table_schema) { + const std::shared_ptr& table_schema) { table_schema_map_[table_schema->get_table_name()] = table_schema; } }; diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h index a0e94391b..dce94f8ad 100644 --- a/cpp/src/common/tsblock/tsblock.h +++ b/cpp/src/common/tsblock/tsblock.h @@ -44,7 +44,7 @@ class TsBlock { * information, such as insert scenarios, etc. Then we will use the given * number of rows */ - explicit TsBlock(TupleDesc *tupledesc, uint32_t max_row_count = 0) + explicit TsBlock(TupleDesc* tupledesc, uint32_t max_row_count = 0) : capacity_(g_config_value_.tsblock_max_memory_), row_count_(0), max_row_count_(max_row_count), @@ -60,9 +60,9 @@ class TsBlock { FORCE_INLINE uint32_t get_row_count() const { return row_count_; } - FORCE_INLINE TupleDesc *get_tuple_desc() const { return tuple_desc_; } + FORCE_INLINE TupleDesc* get_tuple_desc() const { return tuple_desc_; } - FORCE_INLINE Vector *get_vector(uint32_t index) { return vectors_[index]; } + FORCE_INLINE Vector* get_vector(uint32_t index) { return vectors_[index]; } FORCE_INLINE uint32_t get_column_count() const { return tuple_desc_->get_column_count(); @@ -104,8 +104,8 @@ class TsBlock { row_count_ = 0; } - FORCE_INLINE static int create_tsblock(TupleDesc *tupledesc, - TsBlock *&ret_tsblock, + FORCE_INLINE static int create_tsblock(TupleDesc* tupledesc, + TsBlock*& ret_tsblock, uint32_t max_row_count = 0) { int ret = common::E_OK; if (ret_tsblock == nullptr) { @@ -119,13 +119,13 @@ class TsBlock { } int init(); - void tsblock_to_json(ByteStream *byte_stream); + void tsblock_to_json(ByteStream* byte_stream); std::string debug_string(); private: int build_vector(common::TSDataType type, uint32_t row_count); - void write_data(ByteStream *__restrict byte_stream, char *__restrict val, + void write_data(ByteStream* __restrict byte_stream, char* __restrict val, uint32_t len, bool has_null, TSDataType type); private: @@ -134,13 +134,13 @@ class TsBlock { uint32_t max_row_count_; common::BitMap select_list_; - TupleDesc *tuple_desc_; - std::vector vectors_; + TupleDesc* tuple_desc_; + std::vector vectors_; }; class RowAppender { public: - explicit RowAppender(TsBlock *tsblock) : tsblock_(tsblock) {} + explicit RowAppender(TsBlock* tsblock) : tsblock_(tsblock) {} ~RowAppender() {} // todo:(yanghao) maybe need to consider select-list @@ -157,25 +157,37 @@ class RowAppender { tsblock_->row_count_--; } - FORCE_INLINE void append(uint32_t slot_index, const char *value, + FORCE_INLINE void append(uint32_t slot_index, const char* value, uint32_t len) { ASSERT(slot_index < tsblock_->tuple_desc_->get_column_count()); - Vector *vec = tsblock_->vectors_[slot_index]; - vec->append(value, len); + Vector* vec = tsblock_->vectors_[slot_index]; + // TODO(Colin): Refine this. + TSDataType datatype = vec->get_vector_type(); + if (len == 4 && datatype == INT64) { + int32_t int32_val = *reinterpret_cast(value); + int64_t int64_val = static_cast(int32_val); + vec->append(reinterpret_cast(&int64_val), 8); + } else if (len == 4 && datatype == DOUBLE) { + float float_val = *reinterpret_cast(value); + double double_val = static_cast(float_val); + vec->append(reinterpret_cast(&double_val), 8); + } else { + vec->append(value, len); + } } FORCE_INLINE void append_null(uint32_t slot_index) { - Vector *vec = tsblock_->vectors_[slot_index]; + Vector* vec = tsblock_->vectors_[slot_index]; vec->set_null(tsblock_->row_count_ - 1); } private: - TsBlock *tsblock_; + TsBlock* tsblock_; }; class ColAppender { public: - ColAppender(uint32_t column_index, TsBlock *tsblock) + ColAppender(uint32_t column_index, TsBlock* tsblock) : column_index_(column_index), column_row_count_(0), tsblock_(tsblock) { ASSERT(column_index < tsblock_->tuple_desc_->get_column_count()); vec_ = tsblock_->vectors_[column_index]; @@ -194,7 +206,7 @@ class ColAppender { } } - FORCE_INLINE void append(const char *value, uint32_t len) { + FORCE_INLINE void append(const char* value, uint32_t len) { vec_->append(value, len); } @@ -211,7 +223,7 @@ class ColAppender { } return E_OK; } - FORCE_INLINE int fill(const char *value, uint32_t len, uint32_t end_index) { + FORCE_INLINE int fill(const char* value, uint32_t len, uint32_t end_index) { while (column_row_count_ < end_index) { if (!add_row()) { return E_INVALID_ARG; @@ -225,14 +237,14 @@ class ColAppender { private: uint32_t column_index_; uint32_t column_row_count_; - TsBlock *tsblock_; - Vector *vec_; + TsBlock* tsblock_; + Vector* vec_; }; // todo:(yanghao) need to deal with select-list class RowIterator { public: - explicit RowIterator(TsBlock *tsblock) : tsblock_(tsblock), row_id_(0) { + explicit RowIterator(TsBlock* tsblock) : tsblock_(tsblock), row_id_(0) { column_count_ = tsblock_->tuple_desc_->get_column_count(); } @@ -264,17 +276,17 @@ class RowIterator { FORCE_INLINE void update_row_id() { row_id_++; } - FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len, - bool *__restrict null) { + FORCE_INLINE char* read(uint32_t column_index, uint32_t* __restrict len, + bool* __restrict null) { ASSERT(column_index < column_count_); - Vector *vec = tsblock_->vectors_[column_index]; + Vector* vec = tsblock_->vectors_[column_index]; return vec->read(len, null, row_id_); } std::string debug_string(); // for debug private: - TsBlock *tsblock_; + TsBlock* tsblock_; uint32_t row_id_; // The line number currently being reader uint32_t column_count_; }; @@ -282,7 +294,7 @@ class RowIterator { // todo:(yanghao) need to deal with select-list class ColIterator { public: - ColIterator(uint32_t column_index, const TsBlock *tsblock) + ColIterator(uint32_t column_index, const TsBlock* tsblock) : column_index_(column_index), row_id_(0), tsblock_(tsblock) { ASSERT(column_index < tsblock_->tuple_desc_->get_column_count()); vec_ = tsblock_->vectors_[column_index]; @@ -303,22 +315,22 @@ class ColIterator { FORCE_INLINE TSDataType get_data_type() { return vec_->get_vector_type(); } - FORCE_INLINE char *read(uint32_t *__restrict len, bool *__restrict null) { + FORCE_INLINE char* read(uint32_t* __restrict len, bool* __restrict null) { return vec_->read(len, null, row_id_); } - FORCE_INLINE char *read(uint32_t *len) { return vec_->read(len); } + FORCE_INLINE char* read(uint32_t* len) { return vec_->read(len); } FORCE_INLINE uint32_t get_column_index() { return column_index_; } private: uint32_t column_index_; uint32_t row_id_; - const TsBlock *tsblock_; - Vector *vec_; + const TsBlock* tsblock_; + Vector* vec_; }; -int merge_tsblock_by_row(TsBlock *sea, TsBlock *river); +int merge_tsblock_by_row(TsBlock* sea, TsBlock* river); } // end namespace common #endif // COMMON_TSBLOCK_TSBLOCK_H diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h index dd22ca401..39cd027ef 100644 --- a/cpp/src/common/tsfile_common.h +++ b/cpp/src/common/tsfile_common.h @@ -39,7 +39,7 @@ namespace storage { -extern const char *MAGIC_STRING_TSFILE; +extern const char* MAGIC_STRING_TSFILE; constexpr int MAGIC_STRING_TSFILE_LEN = 6; extern const char VERSION_NUM_BYTE; extern const char CHUNK_GROUP_HEADER_MARKER; @@ -60,7 +60,7 @@ typedef int64_t TsFileID; struct PageHeader { uint32_t uncompressed_size_; uint32_t compressed_size_; - Statistic *statistic_; + Statistic* statistic_; PageHeader() : uncompressed_size_(0), compressed_size_(0), statistic_(nullptr) {} @@ -73,7 +73,7 @@ struct PageHeader { uncompressed_size_ = 0; compressed_size_ = 0; } - int deserialize_from(common::ByteStream &in, bool deserialize_stat, + int deserialize_from(common::ByteStream& in, bool deserialize_stat, common::TSDataType data_type) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_var_uint( @@ -99,7 +99,7 @@ struct PageHeader { } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) { + friend std::ostream& operator<<(std::ostream& os, const PageHeader& h) { os << "{uncompressed_size_=" << h.uncompressed_size_ << ", compressed_size_=" << h.uncompressed_size_; if (h.statistic_ == nullptr) { @@ -132,7 +132,7 @@ struct ChunkHeader { ~ChunkHeader() = default; - int serialize_to(common::ByteStream &out) { + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_char(chunk_type_, out))) { } else if (RET_FAIL(common::SerializationUtil::write_var_str( @@ -148,7 +148,7 @@ struct ChunkHeader { } return ret; } - int deserialize_from(common::ByteStream &in) { + int deserialize_from(common::ByteStream& in) { int ret = common::E_OK; in.mark_read_pos(); if (RET_FAIL(common::SerializationUtil::read_char(chunk_type_, in))) { @@ -157,18 +157,18 @@ struct ChunkHeader { } else if (RET_FAIL(common::SerializationUtil::read_var_uint(data_size_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_char( - (char &)data_type_, in))) { + (char&)data_type_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_char( - (char &)compression_type_, in))) { + (char&)compression_type_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_char( - (char &)encoding_type_, in))) { + (char&)encoding_type_, in))) { } else { serialized_size_ = in.get_mark_len(); } return ret; } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, const ChunkHeader &h) { + friend std::ostream& operator<<(std::ostream& os, const ChunkHeader& h) { os << "{measurement_name=" << h.measurement_name_ << ", data_size=" << h.data_size_ << ", data_type=" << h.data_type_ << ", compression_type=" << h.compression_type_ @@ -197,7 +197,7 @@ struct ChunkMeta { common::String measurement_name_; common::TSDataType data_type_; int64_t offset_of_chunk_header_; - Statistic *statistic_; + Statistic* statistic_; char mask_; common::TSEncoding encoding_; common::CompressionType compression_type_; @@ -209,10 +209,10 @@ struct ChunkMeta { statistic_(nullptr), mask_(0) {} - int init(const common::String &measurement_name, + int init(const common::String& measurement_name, common::TSDataType data_type, int64_t offset_of_chunk_header, - Statistic *stat, char mask, common::TSEncoding encoding, - common::CompressionType compression_type, common::PageArena &pa) { + Statistic* stat, char mask, common::TSEncoding encoding, + common::CompressionType compression_type, common::PageArena& pa) { // TODO check parameter valid measurement_name_.dup_from(measurement_name, pa); data_type_ = data_type; @@ -223,10 +223,10 @@ struct ChunkMeta { compression_type_ = compression_type; return common::E_OK; } - FORCE_INLINE void clone_statistic_from(Statistic *stat) { + FORCE_INLINE void clone_statistic_from(Statistic* stat) { clone_statistic(stat, statistic_, data_type_); } - FORCE_INLINE int clone_from(ChunkMeta &that, common::PageArena *pa) { + FORCE_INLINE int clone_from(ChunkMeta& that, common::PageArena* pa) { int ret = common::E_OK; if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa))) { return ret; @@ -244,7 +244,7 @@ struct ChunkMeta { mask_ = that.mask_; return ret; } - int serialize_to(common::ByteStream &out, bool serialize_statistic) { + int serialize_to(common::ByteStream& out, bool serialize_statistic) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_i64( offset_of_chunk_header_, out))) { @@ -253,8 +253,8 @@ struct ChunkMeta { } return ret; } - int deserialize_from(common::ByteStream &in, bool deserialize_stat, - common::PageArena *pa) { + int deserialize_from(common::ByteStream& in, bool deserialize_stat, + common::PageArena* pa) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_i64( offset_of_chunk_header_, in))) { @@ -270,7 +270,7 @@ struct ChunkMeta { return ret; } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, const ChunkMeta &cm) { + friend std::ostream& operator<<(std::ostream& os, const ChunkMeta& cm) { os << "{measurement_name=" << cm.measurement_name_ << ", data_type=" << cm.data_type_ << ", offset_of_chunk_header=" << cm.offset_of_chunk_header_ @@ -287,16 +287,16 @@ struct ChunkMeta { struct ChunkGroupMeta { std::shared_ptr device_id_; - common::SimpleList chunk_meta_list_; + common::SimpleList chunk_meta_list_; - explicit ChunkGroupMeta(common::PageArena *pa_ptr) + explicit ChunkGroupMeta(common::PageArena* pa_ptr) : chunk_meta_list_(pa_ptr) {} FORCE_INLINE int init(std::shared_ptr device_id) { device_id_ = device_id; return 0; } - FORCE_INLINE int push(ChunkMeta *cm) { + FORCE_INLINE int push(ChunkMeta* cm) { return chunk_meta_list_.push_back(cm); } }; @@ -305,13 +305,13 @@ class ITimeseriesIndex { public: ITimeseriesIndex() {} ~ITimeseriesIndex() {} - virtual common::SimpleList *get_chunk_meta_list() const { + virtual common::SimpleList* get_chunk_meta_list() const { return nullptr; } - virtual common::SimpleList *get_time_chunk_meta_list() const { + virtual common::SimpleList* get_time_chunk_meta_list() const { return nullptr; } - virtual common::SimpleList *get_value_chunk_meta_list() const { + virtual common::SimpleList* get_value_chunk_meta_list() const { return nullptr; } @@ -321,7 +321,7 @@ class ITimeseriesIndex { virtual common::TSDataType get_data_type() const { return common::INVALID_DATATYPE; } - virtual Statistic *get_statistic() const { return nullptr; } + virtual Statistic* get_statistic() const { return nullptr; } }; /* @@ -368,19 +368,18 @@ class TimeseriesIndex : public ITimeseriesIndex { } } - int add_chunk_meta(ChunkMeta *chunk_meta, bool serialize_statistic); - FORCE_INLINE int set_measurement_name(common::String &measurement_name, - common::PageArena &pa) { + int add_chunk_meta(ChunkMeta* chunk_meta, bool serialize_statistic); + FORCE_INLINE int set_measurement_name(common::String& measurement_name, + common::PageArena& pa) { return measurement_name_.dup_from(measurement_name, pa); } - FORCE_INLINE void set_measurement_name(common::String &measurement_name) { + FORCE_INLINE void set_measurement_name(common::String& measurement_name) { measurement_name_.shallow_copy_from(measurement_name); } FORCE_INLINE virtual common::String get_measurement_name() const { return measurement_name_; } - virtual inline common::SimpleList *get_chunk_meta_list() - const { + virtual inline common::SimpleList* get_chunk_meta_list() const { return chunk_meta_list_; } FORCE_INLINE void set_ts_meta_type(char ts_meta_type) { @@ -405,7 +404,7 @@ class TimeseriesIndex : public ITimeseriesIndex { statistic_->reset(); return common::E_OK; } - virtual Statistic *get_statistic() const { return statistic_; } + virtual Statistic* get_statistic() const { return statistic_; } common::TsID get_ts_id() const { return ts_id_; } FORCE_INLINE void finish() { @@ -413,7 +412,7 @@ class TimeseriesIndex : public ITimeseriesIndex { chunk_meta_list_serialized_buf_.total_size(); } - int serialize_to(common::ByteStream &out) { + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_char( timeseries_meta_type_, out))) { @@ -430,14 +429,14 @@ class TimeseriesIndex : public ITimeseriesIndex { return ret; } - int deserialize_from(common::ByteStream &in, common::PageArena *pa) { + int deserialize_from(common::ByteStream& in, common::PageArena* pa) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_char(timeseries_meta_type_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_mystring( measurement_name_, pa, in))) { } else if (RET_FAIL(common::SerializationUtil::read_char( - (char &)data_type_, in))) { + (char&)data_type_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_var_uint( chunk_meta_list_data_size_, in))) { } else if (nullptr == @@ -447,22 +446,22 @@ class TimeseriesIndex : public ITimeseriesIndex { } else if (RET_FAIL(statistic_->deserialize_from(in))) { } else { statistic_from_pa_ = true; - void *chunk_meta_list_buf = pa->alloc(sizeof(*chunk_meta_list_)); + void* chunk_meta_list_buf = pa->alloc(sizeof(*chunk_meta_list_)); if (IS_NULL(chunk_meta_list_buf)) { return common::E_OOM; } const bool deserialize_chunk_meta_statistic = (timeseries_meta_type_ & 0x3F); // TODO chunk_meta_list_ = - new (chunk_meta_list_buf) common::SimpleList(pa); + new (chunk_meta_list_buf) common::SimpleList(pa); uint32_t start_pos = in.read_pos(); while (IS_SUCC(ret) && in.read_pos() < start_pos + chunk_meta_list_data_size_) { - void *cm_buf = pa->alloc(sizeof(ChunkMeta)); + void* cm_buf = pa->alloc(sizeof(ChunkMeta)); if (IS_NULL(cm_buf)) { ret = common::E_OOM; } else { - ChunkMeta *cm = new (cm_buf) ChunkMeta; + ChunkMeta* cm = new (cm_buf) ChunkMeta; cm->measurement_name_.shallow_copy_from( this->measurement_name_); cm->data_type_ = this->data_type_; @@ -477,7 +476,7 @@ class TimeseriesIndex : public ITimeseriesIndex { return ret; } - int clone_from(const TimeseriesIndex &that, common::PageArena *pa) { + int clone_from(const TimeseriesIndex& that, common::PageArena* pa) { int ret = common::E_OK; timeseries_meta_type_ = that.timeseries_meta_type_; chunk_meta_list_data_size_ = that.chunk_meta_list_data_size_; @@ -496,20 +495,20 @@ class TimeseriesIndex : public ITimeseriesIndex { } if (that.chunk_meta_list_ != nullptr) { - void *buf = pa->alloc(sizeof(*chunk_meta_list_)); + void* buf = pa->alloc(sizeof(*chunk_meta_list_)); if (IS_NULL(buf)) { return common::E_OOM; } - chunk_meta_list_ = new (buf) common::SimpleList(pa); - common::SimpleList::Iterator it; + chunk_meta_list_ = new (buf) common::SimpleList(pa); + common::SimpleList::Iterator it; for (it = that.chunk_meta_list_->begin(); IS_SUCC(ret) && it != that.chunk_meta_list_->end(); it++) { - ChunkMeta *cm = it.get(); - void *cm_buf = pa->alloc(sizeof(ChunkMeta)); + ChunkMeta* cm = it.get(); + void* cm_buf = pa->alloc(sizeof(ChunkMeta)); if (IS_NULL(cm_buf)) { return common::E_OOM; } else { - ChunkMeta *my_cm = new (cm_buf) ChunkMeta; + ChunkMeta* my_cm = new (cm_buf) ChunkMeta; if (RET_FAIL(my_cm->clone_from(*cm, pa))) { } else if (RET_FAIL(chunk_meta_list_->push_back(my_cm))) { } @@ -519,8 +518,8 @@ class TimeseriesIndex : public ITimeseriesIndex { return ret; } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const TimeseriesIndex &tsi) { + friend std::ostream& operator<<(std::ostream& os, + const TimeseriesIndex& tsi) { os << "{meta_type=" << (int)tsi.timeseries_meta_type_ << ", chunk_meta_list_data_size=" << tsi.chunk_meta_list_data_size_ << ", measurement_name=" << tsi.measurement_name_ @@ -531,7 +530,7 @@ class TimeseriesIndex : public ITimeseriesIndex { if (tsi.chunk_meta_list_) { os << ", chunk_meta_list={"; int count = 0; - common::SimpleList::Iterator it = + common::SimpleList::Iterator it = tsi.chunk_meta_list_->begin(); for (; it != tsi.chunk_meta_list_->end(); it++, count++) { if (count != 0) { @@ -565,24 +564,24 @@ class TimeseriesIndex : public ITimeseriesIndex { * TimeseriesIndex.statistic_ is duplicated with ChunkMeta.statistic_. In * this case, we do not serialize ChunkMeta.statistic_. */ - Statistic *statistic_; + Statistic* statistic_; bool statistic_from_pa_; common::ByteStream chunk_meta_list_serialized_buf_; // common::PageArena page_arena_; - common::SimpleList *chunk_meta_list_; // for deserialize_from + common::SimpleList* chunk_meta_list_; // for deserialize_from }; class AlignedTimeseriesIndex : public ITimeseriesIndex { public: - TimeseriesIndex *time_ts_idx_; - TimeseriesIndex *value_ts_idx_; + TimeseriesIndex* time_ts_idx_; + TimeseriesIndex* value_ts_idx_; AlignedTimeseriesIndex() {} ~AlignedTimeseriesIndex() {} - virtual common::SimpleList *get_time_chunk_meta_list() const { + virtual common::SimpleList* get_time_chunk_meta_list() const { return time_ts_idx_->get_chunk_meta_list(); } - virtual common::SimpleList *get_value_chunk_meta_list() const { + virtual common::SimpleList* get_value_chunk_meta_list() const { return value_ts_idx_->get_chunk_meta_list(); } @@ -592,13 +591,13 @@ class AlignedTimeseriesIndex : public ITimeseriesIndex { virtual common::TSDataType get_data_type() const { return time_ts_idx_->get_data_type(); } - virtual Statistic *get_statistic() const { + virtual Statistic* get_statistic() const { return value_ts_idx_->get_statistic(); } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const AlignedTimeseriesIndex &tsi) { + friend std::ostream& operator<<(std::ostream& os, + const AlignedTimeseriesIndex& tsi) { os << "time_ts_idx=" << *tsi.time_ts_idx_; os << ", value_ts_idx=" << *tsi.value_ts_idx_; return os; @@ -609,7 +608,7 @@ class AlignedTimeseriesIndex : public ITimeseriesIndex { class TSMIterator { public: explicit TSMIterator( - common::SimpleList &chunk_group_meta_list) + common::SimpleList& chunk_group_meta_list) : chunk_group_meta_list_(chunk_group_meta_list), chunk_group_meta_iter_(), chunk_meta_iter_() {} @@ -617,38 +616,38 @@ class TSMIterator { // sort => iterate int init(); bool has_next() const; - int get_next(std::shared_ptr &ret_device_id, - common::String &ret_measurement_name, - TimeseriesIndex &ret_ts_index); + int get_next(std::shared_ptr& ret_device_id, + common::String& ret_measurement_name, + TimeseriesIndex& ret_ts_index); private: - common::SimpleList &chunk_group_meta_list_; - common::SimpleList::Iterator chunk_group_meta_iter_; - common::SimpleList::Iterator chunk_meta_iter_; + common::SimpleList& chunk_group_meta_list_; + common::SimpleList::Iterator chunk_group_meta_iter_; + common::SimpleList::Iterator chunk_meta_iter_; // timeseries measurenemnt chunk meta info // map >> std::map, - std::map>> + std::map>> tsm_chunk_meta_info_; // device iterator std::map, - std::map>>::iterator + std::map>>::iterator tsm_device_iter_; // measurement iterator - std::map>::iterator + std::map>::iterator tsm_measurement_iter_; }; /* =============== TsFile Index ================ */ struct IComparable { virtual ~IComparable() = default; - virtual bool operator<(const IComparable &other) const = 0; - virtual bool operator>(const IComparable &other) const = 0; - virtual bool operator==(const IComparable &other) const = 0; - virtual int compare(const IComparable &other) { + virtual bool operator<(const IComparable& other) const = 0; + virtual bool operator>(const IComparable& other) const = 0; + virtual bool operator==(const IComparable& other) const = 0; + virtual int compare(const IComparable& other) { if (this->operator<(other)) { return -1; } else if (this->operator==(other)) { @@ -663,27 +662,27 @@ struct IComparable { struct DeviceIDComparable : IComparable { std::shared_ptr device_id_; - explicit DeviceIDComparable(const std::shared_ptr &device_id) + explicit DeviceIDComparable(const std::shared_ptr& device_id) : device_id_(device_id) {} - bool operator<(const IComparable &other) const override { - const auto *other_device = - dynamic_cast(&other); + bool operator<(const IComparable& other) const override { + const auto* other_device = + dynamic_cast(&other); if (!other_device) throw std::runtime_error("Incompatible comparison"); return *device_id_ < *other_device->device_id_; } - bool operator>(const IComparable &other) const override { - const auto *other_device = - dynamic_cast(&other); + bool operator>(const IComparable& other) const override { + const auto* other_device = + dynamic_cast(&other); if (!other_device) throw std::runtime_error("Incompatible comparison"); return *device_id_ != *other_device->device_id_ && !(*device_id_ < *other_device->device_id_); } - bool operator==(const IComparable &other) const override { - const auto *other_device = - dynamic_cast(&other); + bool operator==(const IComparable& other) const override { + const auto* other_device = + dynamic_cast(&other); if (!other_device) throw std::runtime_error("Incompatible comparison"); return *device_id_ == *other_device->device_id_; } @@ -696,25 +695,25 @@ struct DeviceIDComparable : IComparable { struct StringComparable : IComparable { std::string value_; - explicit StringComparable(const std::string &value) : value_(value) {} + explicit StringComparable(const std::string& value) : value_(value) {} - bool operator<(const IComparable &other) const override { - const auto *other_string = - dynamic_cast(&other); + bool operator<(const IComparable& other) const override { + const auto* other_string = + dynamic_cast(&other); if (!other_string) throw std::runtime_error("Incompatible comparison"); return value_ < other_string->value_; } - bool operator>(const IComparable &other) const override { - const auto *other_string = - dynamic_cast(&other); + bool operator>(const IComparable& other) const override { + const auto* other_string = + dynamic_cast(&other); if (!other_string) throw std::runtime_error("Incompatible comparison"); return value_ > other_string->value_; } - bool operator==(const IComparable &other) const override { - const auto *other_string = - dynamic_cast(&other); + bool operator==(const IComparable& other) const override { + const auto* other_string = + dynamic_cast(&other); if (!other_string) throw std::runtime_error("Incompatible comparison"); return value_ == other_string->value_; } @@ -723,7 +722,7 @@ struct StringComparable : IComparable { }; struct IMetaIndexEntry { - static void self_destructor(IMetaIndexEntry *ptr) { + static void self_destructor(IMetaIndexEntry* ptr) { if (ptr) { ptr->~IMetaIndexEntry(); } @@ -731,9 +730,9 @@ struct IMetaIndexEntry { IMetaIndexEntry() = default; virtual ~IMetaIndexEntry() = default; - virtual int serialize_to(common::ByteStream &out) { return common::E_OK; } - virtual int deserialize_from(common::ByteStream &out, - common::PageArena *pa) { + virtual int serialize_to(common::ByteStream& out) { return common::E_OK; } + virtual int deserialize_from(common::ByteStream& out, + common::PageArena* pa) { return common::E_NOT_SUPPORT; } virtual int64_t get_offset() const = 0; @@ -743,11 +742,11 @@ struct IMetaIndexEntry { } virtual common::String get_name() const { return {}; } virtual std::shared_ptr get_device_id() const { return nullptr; } - virtual std::shared_ptr clone(common::PageArena *pa) = 0; + virtual std::shared_ptr clone(common::PageArena* pa) = 0; #ifndef NDEBUG - virtual void print(std::ostream &os) const {} - friend std::ostream &operator<<(std::ostream &os, - const IMetaIndexEntry &entry) { + virtual void print(std::ostream& os) const {} + friend std::ostream& operator<<(std::ostream& os, + const IMetaIndexEntry& entry) { entry.print(os); return os; } @@ -760,19 +759,19 @@ struct DeviceMetaIndexEntry : IMetaIndexEntry { DeviceMetaIndexEntry() = default; - DeviceMetaIndexEntry(const std::shared_ptr &device_id, + DeviceMetaIndexEntry(const std::shared_ptr& device_id, const int64_t offset) : device_id_(device_id), offset_(offset) {} ~DeviceMetaIndexEntry() override = default; - static void self_deleter(DeviceMetaIndexEntry *ptr) { + static void self_deleter(DeviceMetaIndexEntry* ptr) { if (ptr) { ptr->~DeviceMetaIndexEntry(); } } - int serialize_to(common::ByteStream &out) override { + int serialize_to(common::ByteStream& out) override { int ret = common::E_OK; if (RET_FAIL(device_id_->serialize(out))) { } else if (RET_FAIL( @@ -781,10 +780,10 @@ struct DeviceMetaIndexEntry : IMetaIndexEntry { return ret; } - std::shared_ptr &get_device_id() { return device_id_; } + std::shared_ptr& get_device_id() { return device_id_; } - int deserialize_from(common::ByteStream &in, - common::PageArena *pa) override { + int deserialize_from(common::ByteStream& in, + common::PageArena* pa) override { int ret = common::E_OK; device_id_ = std::make_shared("init"); if (RET_FAIL(device_id_->deserialize(in))) { @@ -804,11 +803,11 @@ struct DeviceMetaIndexEntry : IMetaIndexEntry { std::shared_ptr get_device_id() const override { return device_id_; } - std::shared_ptr clone(common::PageArena *pa) override { + std::shared_ptr clone(common::PageArena* pa) override { return std::make_shared(device_id_, offset_); } #ifndef NDEBUG - void print(std::ostream &os) const override { + void print(std::ostream& os) const override { os << "name=" << device_id_ << ", offset=" << offset_; } #endif @@ -821,19 +820,19 @@ struct MeasurementMetaIndexEntry : IMetaIndexEntry { ~MeasurementMetaIndexEntry() override = default; MeasurementMetaIndexEntry() = default; - MeasurementMetaIndexEntry(const common::String &name, const int64_t offset, - common::PageArena &pa) { + MeasurementMetaIndexEntry(const common::String& name, const int64_t offset, + common::PageArena& pa) { offset_ = offset; name_.dup_from(name, pa); } - FORCE_INLINE int init(const std::string &str, const int64_t offset, - common::PageArena &pa) { + FORCE_INLINE int init(const std::string& str, const int64_t offset, + common::PageArena& pa) { offset_ = offset; return name_.dup_from(str, pa); } - int serialize_to(common::ByteStream &out) override { + int serialize_to(common::ByteStream& out) override { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_mystring(name_, out))) { } else if (RET_FAIL( @@ -842,8 +841,8 @@ struct MeasurementMetaIndexEntry : IMetaIndexEntry { return ret; } - int deserialize_from(common::ByteStream &in, - common::PageArena *pa) override { + int deserialize_from(common::ByteStream& in, + common::PageArena* pa) override { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_mystring(name_, pa, in))) { } else if (RET_FAIL(common::SerializationUtil::read_i64(offset_, in))) { @@ -863,11 +862,11 @@ struct MeasurementMetaIndexEntry : IMetaIndexEntry { std::shared_ptr get_device_id() const override { return nullptr; } - std::shared_ptr clone(common::PageArena *pa) override { + std::shared_ptr clone(common::PageArena* pa) override { return std::make_shared(name_, offset_, *pa); } #ifndef NDEBUG - void print(std::ostream &os) const override { + void print(std::ostream& os) const override { os << "name=" << name_ << ", offset=" << offset_; } #endif @@ -881,7 +880,7 @@ enum MetaIndexNodeType { INVALID_META_NODE_TYPE = 4, }; #ifndef NDEBUG -static const char *meta_index_node_type_names[5] = { +static const char* meta_index_node_type_names[5] = { "INTERNAL_DEVICE", "LEAF_DEVICE", "INTERNAL_MEASUREMENT", "LEAF_MEASUREMENT", "INVALID_META_NODE_TYPE"}; #endif @@ -892,9 +891,9 @@ struct MetaIndexNode { std::vector> children_; int64_t end_offset_; MetaIndexNodeType node_type_; - common::PageArena *pa_; + common::PageArena* pa_; - explicit MetaIndexNode(common::PageArena *pa) + explicit MetaIndexNode(common::PageArena* pa) : children_(), end_offset_(0), node_type_(), pa_(pa) {} std::shared_ptr peek() { @@ -906,7 +905,7 @@ struct MetaIndexNode { ~MetaIndexNode() {} - static void self_deleter(MetaIndexNode *ptr) { + static void self_deleter(MetaIndexNode* ptr) { if (ptr) { ptr->~MetaIndexNode(); } @@ -914,10 +913,10 @@ struct MetaIndexNode { int binary_search_children( std::shared_ptr key, bool exact_search, - std::shared_ptr &ret_index_entry, - int64_t &ret_end_offset); + std::shared_ptr& ret_index_entry, + int64_t& ret_end_offset); - int serialize_to(common::ByteStream &out) { + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; #if DEBUG_SE int64_t start_pos = out.total_size(); @@ -946,12 +945,12 @@ struct MetaIndexNode { return ret; } - int deserialize_from(const char *buf, int len) { + int deserialize_from(const char* buf, int len) { common::ByteStream bs; bs.wrap_from(buf, len); return deserialize_from(bs); } - int deserialize_from(common::ByteStream &in) { + int deserialize_from(common::ByteStream& in) { int ret = common::E_OK; uint32_t children_size = 0; if (RET_FAIL( @@ -959,7 +958,7 @@ struct MetaIndexNode { return ret; } for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { - void *entry_buf = pa_->alloc(sizeof(MeasurementMetaIndexEntry)); + void* entry_buf = pa_->alloc(sizeof(MeasurementMetaIndexEntry)); if (IS_NULL(entry_buf)) { return common::E_OOM; } @@ -987,12 +986,12 @@ struct MetaIndexNode { #endif return ret; } - int device_deserialize_from(const char *buf, int len) { + int device_deserialize_from(const char* buf, int len) { common::ByteStream bs; bs.wrap_from(buf, len); return device_deserialize_from(bs); } - int device_deserialize_from(common::ByteStream &in) { + int device_deserialize_from(common::ByteStream& in) { int ret = common::E_OK; uint32_t children_size = 0; if (RET_FAIL( @@ -1000,11 +999,11 @@ struct MetaIndexNode { return ret; } for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { - void *entry_buf = pa_->alloc(sizeof(DeviceMetaIndexEntry)); + void* entry_buf = pa_->alloc(sizeof(DeviceMetaIndexEntry)); if (IS_NULL(entry_buf)) { return common::E_OOM; } - auto *entry_ptr = new (entry_buf) DeviceMetaIndexEntry(); + auto* entry_ptr = new (entry_buf) DeviceMetaIndexEntry(); auto entry = std::shared_ptr( entry_ptr, DeviceMetaIndexEntry::self_deleter); if (RET_FAIL(entry->deserialize_from(in, pa_))) { @@ -1030,8 +1029,8 @@ struct MetaIndexNode { } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const MetaIndexNode &node) { + friend std::ostream& operator<<(std::ostream& os, + const MetaIndexNode& node) { os << "end_offset=" << node.end_offset_ << ", node_type=" << meta_index_node_type_names[node.node_type_]; @@ -1073,16 +1072,16 @@ struct TsFileMeta { DeviceNodeMap; std::map> table_metadata_index_node_map_; - std::unordered_map tsfile_properties_; + std::unordered_map tsfile_properties_; typedef std::unordered_map> TableSchemasMap; TableSchemasMap table_schemas_; int64_t meta_offset_; - BloomFilter *bloom_filter_; - common::PageArena *page_arena_; + BloomFilter* bloom_filter_; + common::PageArena* page_arena_; - int get_table_metaindex_node(const std::string &table_name, - MetaIndexNode *&ret_node) { + int get_table_metaindex_node(const std::string& table_name, + MetaIndexNode*& ret_node) { std::map>::iterator it = table_metadata_index_node_map_.find(table_name); if (it == table_metadata_index_node_map_.end()) { @@ -1092,8 +1091,8 @@ struct TsFileMeta { return common::E_OK; } - int get_table_schema(const std::string &table_name, - std::shared_ptr &ret_schema) { + int get_table_schema(const std::string& table_name, + std::shared_ptr& ret_schema) { TableSchemasMap::iterator it = table_schemas_.find(table_name); if (it == table_schemas_.end()) { return common::E_TABLE_NOT_EXIST; @@ -1105,7 +1104,7 @@ struct TsFileMeta { TsFileMeta() : meta_offset_(0), bloom_filter_(nullptr), page_arena_(nullptr) {} - explicit TsFileMeta(common::PageArena *pa) + explicit TsFileMeta(common::PageArena* pa) : meta_offset_(0), bloom_filter_(nullptr), page_arena_(pa) {} ~TsFileMeta() { if (bloom_filter_ != nullptr) { @@ -1114,19 +1113,21 @@ struct TsFileMeta { for (auto properties : tsfile_properties_) { if (properties.second != nullptr) { delete properties.second; + properties.second = nullptr; } } + tsfile_properties_.clear(); table_metadata_index_node_map_.clear(); table_schemas_.clear(); } - int serialize_to(common::ByteStream &out); + int serialize_to(common::ByteStream& out); - int deserialize_from(common::ByteStream &in); + int deserialize_from(common::ByteStream& in); #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const TsFileMeta &tsfile_meta) { + friend std::ostream& operator<<(std::ostream& os, + const TsFileMeta& tsfile_meta) { os << "meta_offset=" << tsfile_meta.meta_offset_; return os; } diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 1b09db49c..ebe8107ab 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -72,7 +72,7 @@ int set_global_compression(uint8_t compression) { return common::set_global_compression(compression); } -WriteFile write_file_new(const char *pathname, ERRNO *err_code) { +WriteFile write_file_new(const char* pathname, ERRNO* err_code) { int ret; init_tsfile_config(); @@ -86,14 +86,14 @@ WriteFile write_file_new(const char *pathname, ERRNO *err_code) { flags |= O_BINARY; #endif mode_t mode = 0666; - storage::WriteFile *file = new storage::WriteFile; + storage::WriteFile* file = new storage::WriteFile; ret = file->create(pathname, flags, mode); *err_code = ret; return file; } -TsFileWriter tsfile_writer_new(WriteFile file, TableSchema *schema, - ERRNO *err_code) { +TsFileWriter tsfile_writer_new(WriteFile file, TableSchema* schema, + ERRNO* err_code) { if (schema->column_num == 0) { *err_code = common::E_INVALID_SCHEMA; return nullptr; @@ -121,19 +121,19 @@ TsFileWriter tsfile_writer_new(WriteFile file, TableSchema *schema, static_cast(cur_schema.column_category)); } - storage::TableSchema *table_schema = + storage::TableSchema* table_schema = new storage::TableSchema(schema->table_name, column_schemas); auto table_writer = new storage::TsFileTableWriter( - static_cast(file), table_schema); + static_cast(file), table_schema); delete table_schema; *err_code = common::E_OK; return table_writer; } TsFileWriter tsfile_writer_new_with_memory_threshold(WriteFile file, - TableSchema *schema, + TableSchema* schema, uint64_t memory_threshold, - ERRNO *err_code) { + ERRNO* err_code) { if (schema->column_num == 0) { *err_code = common::E_INVALID_SCHEMA; return nullptr; @@ -154,17 +154,17 @@ TsFileWriter tsfile_writer_new_with_memory_threshold(WriteFile file, static_cast(cur_schema.column_category)); } - storage::TableSchema *table_schema = + storage::TableSchema* table_schema = new storage::TableSchema(schema->table_name, column_schemas); - auto table_writer = - new storage::TsFileTableWriter(static_cast(file), - table_schema, memory_threshold); + auto table_writer = new storage::TsFileTableWriter( + static_cast(file), table_schema, memory_threshold); *err_code = common::E_OK; delete table_schema; return table_writer; } -TsFileReader tsfile_reader_new(const char *pathname, ERRNO *err_code) { + +TsFileReader tsfile_reader_new(const char* pathname, ERRNO* err_code) { init_tsfile_config(); auto reader = new storage::TsFileReader(); int ret = reader->open(pathname); @@ -180,7 +180,7 @@ ERRNO tsfile_writer_close(TsFileWriter writer) { if (writer == nullptr) { return common::E_OK; } - auto *w = static_cast(writer); + auto* w = static_cast(writer); int ret = w->flush(); if (ret != common::E_OK) { return ret; @@ -194,12 +194,12 @@ ERRNO tsfile_writer_close(TsFileWriter writer) { } ERRNO tsfile_reader_close(TsFileReader reader) { - auto *ts_reader = static_cast(reader); + auto* ts_reader = static_cast(reader); delete ts_reader; return common::E_OK; } -Tablet tablet_new(char **column_name_list, TSDataType *data_types, +Tablet tablet_new(char** column_name_list, TSDataType* data_types, uint32_t column_num, uint32_t max_rows) { std::vector measurement_list; std::vector data_type_list; @@ -212,20 +212,20 @@ Tablet tablet_new(char **column_name_list, TSDataType *data_types, } uint32_t tablet_get_cur_row_size(Tablet tablet) { - return static_cast(tablet)->get_cur_row_size(); + return static_cast(tablet)->get_cur_row_size(); } ERRNO tablet_add_timestamp(Tablet tablet, uint32_t row_index, Timestamp timestamp) { - return static_cast(tablet)->add_timestamp(row_index, - timestamp); + return static_cast(tablet)->add_timestamp(row_index, + timestamp); } #define TABLET_ADD_VALUE_BY_NAME_DEF(type) \ ERRNO tablet_add_value_by_name_##type(Tablet tablet, uint32_t row_index, \ - const char *column_name, \ + const char* column_name, \ const type value) { \ - return static_cast(tablet)->add_value( \ + return static_cast(tablet)->add_value( \ row_index, storage::to_lower(column_name), value); \ } TABLET_ADD_VALUE_BY_NAME_DEF(int32_t); @@ -235,9 +235,9 @@ TABLET_ADD_VALUE_BY_NAME_DEF(double); TABLET_ADD_VALUE_BY_NAME_DEF(bool); ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index, - const char *column_name, - const char *value) { - return static_cast(tablet)->add_value( + const char* column_name, + const char* value) { + return static_cast(tablet)->add_value( row_index, storage::to_lower(column_name), common::String(value)); } @@ -245,14 +245,14 @@ ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index, ERRNO tablet_add_value_by_index_##type(Tablet tablet, uint32_t row_index, \ uint32_t column_index, \ const type value) { \ - return static_cast(tablet)->add_value( \ + return static_cast(tablet)->add_value( \ row_index, column_index, value); \ } ERRNO tablet_add_value_by_index_string(Tablet tablet, uint32_t row_index, uint32_t column_index, - const char *value) { - return static_cast(tablet)->add_value( + const char* value) { + return static_cast(tablet)->add_value( row_index, column_index, common::String(value)); } @@ -263,16 +263,16 @@ TABLE_ADD_VALUE_BY_INDEX_DEF(double); TABLE_ADD_VALUE_BY_INDEX_DEF(bool); // TsRecord API -TsRecord _ts_record_new(const char *device_id, Timestamp timestamp, +TsRecord _ts_record_new(const char* device_id, Timestamp timestamp, int timeseries_num) { - auto *record = new storage::TsRecord(timestamp, device_id, timeseries_num); + auto* record = new storage::TsRecord(timestamp, device_id, timeseries_num); return record; } #define INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(type) \ ERRNO _insert_data_into_ts_record_by_name_##type( \ - TsRecord data, const char *measurement_name, type value) { \ - auto *record = (storage::TsRecord *)data; \ + TsRecord data, const char* measurement_name, type value) { \ + auto* record = (storage::TsRecord*)data; \ storage::DataPoint point(measurement_name, value); \ if (record->points_.size() + 1 > record->points_.capacity()) \ return common::E_BUF_NOT_ENOUGH; \ @@ -302,8 +302,8 @@ return writer; */ ERRNO tsfile_writer_write(TsFileWriter writer, Tablet tablet) { - auto *w = static_cast(writer); - auto *tbl = static_cast(tablet); + auto* w = static_cast(writer); + auto* tbl = static_cast(tablet); return w->write_table(*tbl); } @@ -314,12 +314,12 @@ ERRNO tsfile_writer_write(TsFileWriter writer, Tablet tablet) { // Query -ResultSet tsfile_query_table(TsFileReader reader, const char *table_name, - char **columns, uint32_t column_num, +ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, - ERRNO *err_code) { - auto *r = static_cast(reader); - storage::ResultSet *table_result_set = nullptr; + ERRNO* err_code) { + auto* r = static_cast(reader); + storage::ResultSet* table_result_set = nullptr; std::vector column_names; for (uint32_t i = 0; i < column_num; i++) { column_names.emplace_back(columns[i]); @@ -329,8 +329,22 @@ ResultSet tsfile_query_table(TsFileReader reader, const char *table_name, return table_result_set; } -bool tsfile_result_set_next(ResultSet result_set, ERRNO *err_code) { - auto *r = static_cast(result_set); +ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, + uint32_t column_num, Timestamp start_time, + Timestamp end_time, ERRNO* err_code) { + auto* r = static_cast(reader); + storage::ResultSet* table_result_set = nullptr; + std::vector column_names; + for (uint32_t i = 0; i < column_num; i++) { + column_names.emplace_back(columns[i]); + } + *err_code = r->query_table_on_tree(column_names, start_time, end_time, + table_result_set); + return table_result_set; +} + +bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { + auto* r = static_cast(result_set); bool has_next = true; int ret = common::E_OK; ret = r->next(has_next); @@ -343,8 +357,8 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO *err_code) { #define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type) \ type tsfile_result_set_get_value_by_name_##type(ResultSet result_set, \ - const char *column_name) { \ - auto *r = static_cast(result_set); \ + const char* column_name) { \ + auto* r = static_cast(result_set); \ std::string column_name_(column_name); \ return r->get_value(column_name_); \ } @@ -354,13 +368,13 @@ TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(int32_t); TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(int64_t); TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(float); TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(double); -char *tsfile_result_set_get_value_by_name_string(ResultSet result_set, - const char *column_name) { - auto *r = static_cast(result_set); +char* tsfile_result_set_get_value_by_name_string(ResultSet result_set, + const char* column_name) { + auto* r = static_cast(result_set); std::string column_name_(column_name); - common::String *ret = r->get_value(column_name_); + common::String* ret = r->get_value(column_name_); // Caller should free return's char* 's space. - char *dup = (char *)malloc(ret->len_ + 1); + char* dup = (char*)malloc(ret->len_ + 1); if (dup) { memcpy(dup, ret->buf_, ret->len_); dup[ret->len_] = '\0'; @@ -371,7 +385,7 @@ char *tsfile_result_set_get_value_by_name_string(ResultSet result_set, #define TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(type) \ type tsfile_result_set_get_value_by_index_##type(ResultSet result_set, \ uint32_t column_index) { \ - auto *r = static_cast(result_set); \ + auto* r = static_cast(result_set); \ return r->get_value(column_index); \ } @@ -381,12 +395,12 @@ TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(float); TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(double); TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(bool); -char *tsfile_result_set_get_value_by_index_string(ResultSet result_set, +char* tsfile_result_set_get_value_by_index_string(ResultSet result_set, uint32_t column_index) { - auto *r = static_cast(result_set); - common::String *ret = r->get_value(column_index); + auto* r = static_cast(result_set); + common::String* ret = r->get_value(column_index); // Caller should free return's char* 's space. - char *dup = (char *)malloc(ret->len_ + 1); + char* dup = (char*)malloc(ret->len_ + 1); if (dup) { memcpy(dup, ret->buf_, ret->len_); dup[ret->len_] = '\0'; @@ -395,19 +409,19 @@ char *tsfile_result_set_get_value_by_index_string(ResultSet result_set, } bool tsfile_result_set_is_null_by_name(ResultSet result_set, - const char *column_name) { - auto *r = static_cast(result_set); + const char* column_name) { + auto* r = static_cast(result_set); return r->is_null(column_name); } bool tsfile_result_set_is_null_by_index(const ResultSet result_set, const uint32_t column_index) { - auto *r = static_cast(result_set); + auto* r = static_cast(result_set); return r->is_null(column_index); } ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) { - auto *r = static_cast(result_set); + auto* r = static_cast(result_set); if (result_set == NULL) { return ResultSetMetaData(); } @@ -417,8 +431,8 @@ ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) { r->get_metadata(); meta_data.column_num = result_set_metadata->get_column_count(); meta_data.column_names = - static_cast(malloc(meta_data.column_num * sizeof(char *))); - meta_data.data_types = static_cast( + static_cast(malloc(meta_data.column_num * sizeof(char*))); + meta_data.data_types = static_cast( malloc(meta_data.column_num * sizeof(TSDataType))); for (int i = 0; i < meta_data.column_num; i++) { meta_data.column_names[i] = @@ -429,7 +443,7 @@ ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) { return meta_data; } -char *tsfile_result_set_metadata_get_column_name(ResultSetMetaData result_set, +char* tsfile_result_set_metadata_get_column_name(ResultSetMetaData result_set, uint32_t column_index) { if (column_index > (uint32_t)result_set.column_num) { return nullptr; @@ -482,15 +496,15 @@ int tsfile_result_set_metadata_get_column_num(ResultSetMetaData result_set) { // } TableSchema tsfile_reader_get_table_schema(TsFileReader reader, - const char *table_name) { - auto *r = static_cast(reader); + const char* table_name) { + auto* r = static_cast(reader); auto table_shcema = r->get_table_schema(table_name); TableSchema ret_schema; ret_schema.table_name = strdup(table_shcema->get_table_name().c_str()); int column_num = table_shcema->get_columns_num(); ret_schema.column_num = column_num; ret_schema.column_schemas = - static_cast(malloc(sizeof(ColumnSchema) * column_num)); + static_cast(malloc(sizeof(ColumnSchema) * column_num)); for (int i = 0; i < column_num; i++) { auto column_schema = table_shcema->get_measurement_schemas()[i]; ret_schema.column_schemas[i].column_name = @@ -504,18 +518,18 @@ TableSchema tsfile_reader_get_table_schema(TsFileReader reader, return ret_schema; } -TableSchema *tsfile_reader_get_all_table_schemas(TsFileReader reader, - uint32_t *size) { - auto *r = static_cast(reader); +TableSchema* tsfile_reader_get_all_table_schemas(TsFileReader reader, + uint32_t* size) { + auto* r = static_cast(reader); auto table_schemas = r->get_all_table_schemas(); size_t table_num = table_schemas.size(); - TableSchema *ret = - static_cast(malloc(sizeof(TableSchema) * table_num)); + TableSchema* ret = + static_cast(malloc(sizeof(TableSchema) * table_num)); for (size_t i = 0; i < table_schemas.size(); i++) { ret[i].table_name = strdup(table_schemas[i]->get_table_name().c_str()); int column_num = table_schemas[i]->get_columns_num(); ret[i].column_num = column_num; - ret[i].column_schemas = static_cast( + ret[i].column_schemas = static_cast( malloc(column_num * sizeof(ColumnSchema))); auto column_schemas = table_schemas[i]->get_measurement_schemas(); for (int j = 0; j < column_num; j++) { @@ -533,23 +547,23 @@ TableSchema *tsfile_reader_get_all_table_schemas(TsFileReader reader, } // delete pointer -void _free_tsfile_ts_record(TsRecord *record) { +void _free_tsfile_ts_record(TsRecord* record) { if (*record != nullptr) { - delete static_cast(*record); + delete static_cast(*record); } *record = nullptr; } -void free_tablet(Tablet *tablet) { +void free_tablet(Tablet* tablet) { if (*tablet != nullptr) { - delete static_cast(*tablet); + delete static_cast(*tablet); } *tablet = nullptr; } -void free_tsfile_result_set(ResultSet *result_set) { +void free_tsfile_result_set(ResultSet* result_set) { if (*result_set != nullptr) { - delete static_cast(*result_set); + delete static_cast(*result_set); } *result_set = nullptr; } @@ -583,15 +597,15 @@ void free_table_schema(TableSchema schema) { } void free_column_schema(ColumnSchema schema) { free(schema.column_name); } -void free_write_file(WriteFile *write_file) { - auto f = static_cast(*write_file); +void free_write_file(WriteFile* write_file) { + auto f = static_cast(*write_file); delete f; *write_file = nullptr; } // For Python API -TsFileWriter _tsfile_writer_new(const char *pathname, uint64_t memory_threshold, - ERRNO *err_code) { +TsFileWriter _tsfile_writer_new(const char* pathname, uint64_t memory_threshold, + ERRNO* err_code) { init_tsfile_config(); auto writer = new storage::TsFileWriter(); int flags = O_WRONLY | O_CREAT | O_TRUNC; @@ -608,9 +622,9 @@ TsFileWriter _tsfile_writer_new(const char *pathname, uint64_t memory_threshold, return writer; } -Tablet _tablet_new_with_target_name(const char *device_id, - char **column_name_list, - TSDataType *data_types, int column_num, +Tablet _tablet_new_with_target_name(const char* device_id, + char** column_name_list, + TSDataType* data_types, int column_num, int max_rows) { std::vector measurement_list; std::vector data_type_list; @@ -627,27 +641,27 @@ Tablet _tablet_new_with_target_name(const char *device_id, } } -ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema *schema) { - std::vector measurement_schemas; +ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema* schema) { + std::vector measurement_schemas; std::vector column_categories; measurement_schemas.resize(schema->column_num); for (int i = 0; i < schema->column_num; i++) { - ColumnSchema *cur_schema = schema->column_schemas + i; + ColumnSchema* cur_schema = schema->column_schemas + i; measurement_schemas[i] = new storage::MeasurementSchema( cur_schema->column_name, static_cast(cur_schema->data_type)); column_categories.push_back( static_cast(cur_schema->column_category)); } - auto tsfile_writer = static_cast(writer); + auto tsfile_writer = static_cast(writer); return tsfile_writer->register_table(std::make_shared( schema->table_name, measurement_schemas, column_categories)); } ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer, - const char *device_id, - const TimeseriesSchema *schema) { - auto *w = static_cast(writer); + const char* device_id, + const TimeseriesSchema* schema) { + auto* w = static_cast(writer); int ret = w->register_timeseries( device_id, @@ -660,8 +674,8 @@ ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer, } ERRNO _tsfile_writer_register_device(TsFileWriter writer, - const device_schema *device_schema) { - auto *w = static_cast(writer); + const device_schema* device_schema) { + auto* w = static_cast(writer); for (int column_id = 0; column_id < device_schema->timeseries_num; column_id++) { TimeseriesSchema schema = device_schema->timeseries_schema[column_id]; @@ -680,26 +694,26 @@ ERRNO _tsfile_writer_register_device(TsFileWriter writer, } ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet) { - auto *w = static_cast(writer); - const auto *tbl = static_cast(tablet); + auto* w = static_cast(writer); + const auto* tbl = static_cast(tablet); return w->write_tablet(*tbl); } ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet) { - auto *w = static_cast(writer); - auto *tbl = static_cast(tablet); + auto* w = static_cast(writer); + auto* tbl = static_cast(tablet); return w->write_table(*tbl); } ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { - auto *w = static_cast(writer); - const storage::TsRecord *record = static_cast(data); + auto* w = static_cast(writer); + const storage::TsRecord* record = static_cast(data); const int ret = w->write_record(*record); return ret; } ERRNO _tsfile_writer_close(TsFileWriter writer) { - auto *w = static_cast(writer); + auto* w = static_cast(writer); int ret = w->flush(); if (ret != common::E_OK) { return ret; @@ -713,23 +727,23 @@ ERRNO _tsfile_writer_close(TsFileWriter writer) { } ERRNO _tsfile_writer_flush(TsFileWriter writer) { - auto *w = static_cast(writer); + auto* w = static_cast(writer); return w->flush(); } ResultSet _tsfile_reader_query_device(TsFileReader reader, - const char *device_name, - char **sensor_name, uint32_t sensor_num, + const char* device_name, + char** sensor_name, uint32_t sensor_num, Timestamp start_time, Timestamp end_time, - ERRNO *err_code) { - auto *r = static_cast(reader); + ERRNO* err_code) { + auto* r = static_cast(reader); std::vector selected_paths; selected_paths.reserve(sensor_num); for (uint32_t i = 0; i < sensor_num; i++) { selected_paths.push_back(std::string(device_name) + "." + std::string(sensor_name[i])); } - storage::ResultSet *qds = nullptr; + storage::ResultSet* qds = nullptr; *err_code = r->query(selected_paths, start_time, end_time, qds); return qds; } diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 75dc03643..f94325aa4 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -428,6 +428,10 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); + +ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, + uint32_t column_num, Timestamp start_time, + Timestamp end_time, ERRNO* err_code); // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, // char** sensor_name, uint32_t sensor_num, diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index 273f09a44..e16b6b4a2 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -24,7 +24,7 @@ using namespace common; namespace storage { -int TsFileIOReader::init(const std::string &file_path) { +int TsFileIOReader::init(const std::string& file_path) { int ret = E_OK; read_file_ = new ReadFile; read_file_created_ = true; @@ -33,7 +33,7 @@ int TsFileIOReader::init(const std::string &file_path) { return ret; } -int TsFileIOReader::init(ReadFile *read_file) { +int TsFileIOReader::init(ReadFile* read_file) { if (IS_NULL(read_file)) { ASSERT(false); return E_INVALID_ARG; @@ -56,9 +56,9 @@ void TsFileIOReader::reset() { } int TsFileIOReader::alloc_ssi(std::shared_ptr device_id, - const std::string &measurement_name, - TsFileSeriesScanIterator *&ssi, - common::PageArena &pa, Filter *time_filter) { + const std::string& measurement_name, + TsFileSeriesScanIterator*& ssi, + common::PageArena& pa, Filter* time_filter) { int ret = E_OK; if (RET_FAIL(load_tsfile_meta_if_necessary())) { } else { @@ -80,7 +80,7 @@ int TsFileIOReader::alloc_ssi(std::shared_ptr device_id, return ret; } -void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator *ssi) { +void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator* ssi) { if (ssi != nullptr) { ssi->destroy(); delete ssi; @@ -89,12 +89,12 @@ void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator *ssi) { int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( std::shared_ptr device_id, - std::vector ×eries_indexs, PageArena &pa) { + std::vector& timeseries_indexs, PageArena& pa) { int ret = E_OK; load_tsfile_meta_if_necessary(); std::shared_ptr meta_index_entry; int64_t end_offset; - std::vector, int64_t> > + std::vector, int64_t>> meta_index_entry_list; if (RET_FAIL(load_device_index_entry( std::make_shared(device_id), meta_index_entry, @@ -108,8 +108,8 @@ int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( return ret; } -bool TsFileIOReader::filter_stasify(ITimeseriesIndex *ts_index, - Filter *time_filter) { +bool TsFileIOReader::filter_stasify(ITimeseriesIndex* ts_index, + Filter* time_filter) { ASSERT(ts_index->get_statistic() != nullptr); return time_filter->satisfy(ts_index->get_statistic()); } @@ -141,7 +141,7 @@ int TsFileIOReader::load_tsfile_meta() { // Step 1: reader the tsfile_meta_size // 1.1 prepare reader buffer int32_t alloc_size = UTIL_MIN(TSFILE_READ_IO_SIZE, file_size()); - char *read_buf = (char *)mem_alloc(alloc_size, MOD_TSFILE_READER); + char* read_buf = (char*)mem_alloc(alloc_size, MOD_TSFILE_READER); if (IS_NULL(read_buf)) { return E_OOM; } @@ -159,7 +159,7 @@ int TsFileIOReader::load_tsfile_meta() { // 1.3 deserialize tsfile_meta_size if (IS_SUCC(ret)) { // deserialize tsfile_meta_size - char *size_buf = read_buf + alloc_size - TAIL_MAGIC_AND_META_SIZE_SIZE; + char* size_buf = read_buf + alloc_size - TAIL_MAGIC_AND_META_SIZE_SIZE; tsfile_meta_size = SerializationUtil::read_ui32(size_buf); ASSERT(tsfile_meta_size > 0 && tsfile_meta_size <= (1ll << 20)); } @@ -167,12 +167,12 @@ int TsFileIOReader::load_tsfile_meta() { // Step 2: reader TsFileMeta if (IS_SUCC(ret)) { // 2.1 prepare enough buffer (use the previous buffer if can). - char *tsfile_meta_buf = nullptr; + char* tsfile_meta_buf = nullptr; if (tsfile_meta_size + TAIL_MAGIC_AND_META_SIZE_SIZE > (uint32_t)alloc_size) { // prepare buffer to re-reader from start of tsfile_meta - char *old_read_buf = read_buf; - read_buf = (char *)mem_realloc(read_buf, tsfile_meta_size); + char* old_read_buf = read_buf; + read_buf = (char*)mem_realloc(read_buf, tsfile_meta_size); if (IS_NULL(read_buf)) { read_buf = old_read_buf; ret = E_OOM; @@ -211,8 +211,8 @@ int TsFileIOReader::load_tsfile_meta() { } int TsFileIOReader::load_timeseries_index_for_ssi( - std::shared_ptr device_id, const std::string &measurement_name, - TsFileSeriesScanIterator *&ssi) { + std::shared_ptr device_id, const std::string& measurement_name, + TsFileSeriesScanIterator*& ssi) { int ret = E_OK; std::shared_ptr device_index_entry; int64_t device_ie_end_offset = 0; @@ -224,19 +224,19 @@ int TsFileIOReader::load_timeseries_index_for_ssi( device_ie_end_offset))) { return ret; } - auto &pa = ssi->timeseries_index_pa_; + auto& pa = ssi->timeseries_index_pa_; int start_offset = device_index_entry->get_offset(), end_offset = device_ie_end_offset; ASSERT(start_offset < end_offset); const int32_t read_size = end_offset - start_offset; int32_t ret_read_len = 0; - char *data_buf = (char *)pa.alloc(read_size); - void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { return E_OOM; } - auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); auto top_node = std::shared_ptr(top_node_ptr, MetaIndexNode::self_deleter); @@ -248,7 +248,7 @@ int TsFileIOReader::load_timeseries_index_for_ssi( } bool is_aligned = is_aligned_device(top_node); - TimeseriesIndex *timeseries_index = nullptr; + TimeseriesIndex* timeseries_index = nullptr; if (is_aligned) { if (RET_FAIL( get_time_column_metadata(top_node, timeseries_index, pa))) { @@ -267,8 +267,8 @@ int TsFileIOReader::load_timeseries_index_for_ssi( return ret; } if (is_aligned) { - auto *aligned_timeseries_index = - dynamic_cast(ssi->itimeseries_index_); + auto* aligned_timeseries_index = + dynamic_cast(ssi->itimeseries_index_); if (aligned_timeseries_index) { aligned_timeseries_index->time_ts_idx_ = timeseries_index; } @@ -277,10 +277,10 @@ int TsFileIOReader::load_timeseries_index_for_ssi( #if DEBUG_SE if (measurement_index_entry.name_.len_) { std::cout << "load timeseries index: " - << *((TimeseriesIndex *)ssi->itimeseries_index_) << std::endl; + << *((TimeseriesIndex*)ssi->itimeseries_index_) << std::endl; } else { std::cout << "load aligned timeseries index: " - << *((AlignedTimeseriesIndex *)ssi->itimeseries_index_) + << *((AlignedTimeseriesIndex*)ssi->itimeseries_index_) << std::endl; } #endif @@ -289,7 +289,7 @@ int TsFileIOReader::load_timeseries_index_for_ssi( int TsFileIOReader::load_device_index_entry( std::shared_ptr device_name, - std::shared_ptr &device_index_entry, int64_t &end_offset) { + std::shared_ptr& device_index_entry, int64_t& end_offset) { int ret = E_OK; std::shared_ptr device_id_comparable = std::dynamic_pointer_cast(device_name); @@ -322,10 +322,10 @@ int TsFileIOReader::load_device_index_entry( } int TsFileIOReader::load_measurement_index_entry( - const std::string &measurement_name_str, + const std::string& measurement_name_str, std::shared_ptr top_node, - std::shared_ptr &ret_measurement_index_entry, - int64_t &ret_end_offset) { + std::shared_ptr& ret_measurement_index_entry, + int64_t& ret_end_offset) { int ret = E_OK; // search from top_node in top-down way auto measurement_name = @@ -346,9 +346,9 @@ int TsFileIOReader::load_measurement_index_entry( } int TsFileIOReader::load_all_measurement_index_entry( - int64_t start_offset, int64_t end_offset, common::PageArena &pa, - std::vector, int64_t> > - &ret_measurement_index_entry) { + int64_t start_offset, int64_t end_offset, common::PageArena& pa, + std::vector, int64_t>>& + ret_measurement_index_entry) { #if DEBUG_SE std::cout << "load_measurement_index_entry: measurement_name_str= " << ", start_offset=" << start_offset @@ -359,12 +359,12 @@ int TsFileIOReader::load_all_measurement_index_entry( // 1. load top measuremnt_index_node const int32_t read_size = (int32_t)(end_offset - start_offset); int32_t ret_read_len = 0; - char *data_buf = (char *)pa.alloc(read_size); - void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { return E_OOM; } - auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); auto top_node = std::shared_ptr(top_node_ptr, MetaIndexNode::self_deleter); if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, @@ -389,15 +389,15 @@ int TsFileIOReader::load_all_measurement_index_entry( int TsFileIOReader::read_device_meta_index(int32_t start_offset, int32_t end_offset, - common::PageArena &pa, - MetaIndexNode *&device_meta_index, + common::PageArena& pa, + MetaIndexNode*& device_meta_index, bool leaf) { int ret = E_OK; ASSERT(start_offset < end_offset); const int32_t read_size = (int32_t)(end_offset - start_offset); int32_t ret_read_len = 0; - char *data_buf = (char *)pa.alloc(read_size); - void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { return E_OOM; } @@ -415,8 +415,8 @@ int TsFileIOReader::read_device_meta_index(int32_t start_offset, int TsFileIOReader::get_timeseries_indexes( std::shared_ptr device_id, - const std::unordered_set &measurement_names, - std::vector ×eries_indexs, common::PageArena &pa) { + const std::unordered_set& measurement_names, + std::vector& timeseries_indexs, common::PageArena& pa) { int ret = E_OK; std::shared_ptr device_index_entry; int64_t device_ie_end_offset = 0; @@ -433,12 +433,12 @@ int TsFileIOReader::get_timeseries_indexes( ASSERT(start_offset < end_offset); const int32_t read_size = end_offset - start_offset; int32_t ret_read_len = 0; - char *data_buf = (char *)pa.alloc(read_size); - void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { return E_OOM; } - auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); auto top_node = std::shared_ptr(top_node_ptr, MetaIndexNode::self_deleter); @@ -450,24 +450,26 @@ int TsFileIOReader::get_timeseries_indexes( } bool is_aligned = is_aligned_device(top_node); - TimeseriesIndex *timeseries_index = nullptr; + TimeseriesIndex* timeseries_index = nullptr; if (is_aligned) { get_time_column_metadata(top_node, timeseries_index, pa); } int64_t idx = 0; - for (const auto &measurement_name : measurement_names) { + for (const auto& measurement_name : measurement_names) { if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node, measurement_index_entry, measurement_ie_end_offset))) { - } else if (RET_FAIL(do_load_timeseries_index( + } else if (do_load_timeseries_index( measurement_name, measurement_index_entry->get_offset(), measurement_ie_end_offset, pa, timeseries_indexs[idx], - is_aligned))) { + is_aligned) == E_NOT_EXIST) { + idx++; + continue; } if (is_aligned) { - AlignedTimeseriesIndex *aligned_timeseries_index = - dynamic_cast(timeseries_indexs[idx]); + AlignedTimeseriesIndex* aligned_timeseries_index = + dynamic_cast(timeseries_indexs[idx]); if (aligned_timeseries_index) { aligned_timeseries_index->time_ts_idx_ = timeseries_index; } @@ -485,8 +487,8 @@ int TsFileIOReader::get_timeseries_indexes( int TsFileIOReader::search_from_leaf_node( std::shared_ptr target_name, std::shared_ptr index_node, - std::shared_ptr &ret_index_entry, - int64_t &ret_end_offset) { + std::shared_ptr& ret_index_entry, + int64_t& ret_end_offset) { int ret = E_OK; ret = index_node->binary_search_children(target_name, true, ret_index_entry, ret_end_offset); @@ -496,8 +498,8 @@ int TsFileIOReader::search_from_leaf_node( int TsFileIOReader::search_from_internal_node( std::shared_ptr target_name, bool is_device, std::shared_ptr index_node, - std::shared_ptr &ret_index_entry, - int64_t &ret_end_offset) { + std::shared_ptr& ret_index_entry, + int64_t& ret_end_offset) { int ret = E_OK; std::shared_ptr index_entry; int64_t end_offset = 0; @@ -519,12 +521,12 @@ int TsFileIOReader::search_from_internal_node( #endif ASSERT(read_size > 0 && read_size < (1 << 30)); PageArena cur_level_index_node_pa; - void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); - char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size); + void* buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)cur_level_index_node_pa.alloc(read_size); if (IS_NULL(buf) || IS_NULL(data_buf)) { return E_OOM; } - MetaIndexNode *cur_level_index_node = + MetaIndexNode* cur_level_index_node = new (buf) MetaIndexNode(&cur_level_index_node_pa); int32_t ret_read_len = 0; if (RET_FAIL(read_file_->read(index_entry->get_offset(), data_buf, @@ -569,12 +571,12 @@ bool TsFileIOReader::is_aligned_device( int TsFileIOReader::get_time_column_metadata( std::shared_ptr measurement_node, - TimeseriesIndex *&ret_timeseries_index, PageArena &pa) { + TimeseriesIndex*& ret_timeseries_index, PageArena& pa) { int ret = E_OK; if (!is_aligned_device(measurement_node)) { return ret; } - char *ti_buf = nullptr; + char* ti_buf = nullptr; int start_idx = 0, end_idx = 0; int ret_read_len = 0; if (measurement_node->node_type_ == LEAF_MEASUREMENT) { @@ -597,7 +599,7 @@ int TsFileIOReader::get_time_column_metadata( } } buffer.wrap_from(ti_buf, end_idx - start_idx); - void *buf = pa.alloc(sizeof(TimeseriesIndex)); + void* buf = pa.alloc(sizeof(TimeseriesIndex)); if (IS_NULL(buf)) { return E_OOM; } @@ -621,14 +623,14 @@ int TsFileIOReader::get_time_column_metadata( } int TsFileIOReader::do_load_timeseries_index( - const std::string &measurement_name_str, int64_t start_offset, - int64_t end_offset, PageArena &in_timeseries_index_pa, - ITimeseriesIndex *&ret_timeseries_index, bool is_aligned) { + const std::string& measurement_name_str, int64_t start_offset, + int64_t end_offset, PageArena& in_timeseries_index_pa, + ITimeseriesIndex*& ret_timeseries_index, bool is_aligned) { ASSERT(end_offset > start_offset); int ret = E_OK; int32_t read_size = (int32_t)(end_offset - start_offset); int32_t ret_read_len = 0; - char *ti_buf = (char *)mem_alloc(read_size, MOD_TSFILE_READER); + char* ti_buf = (char*)mem_alloc(read_size, MOD_TSFILE_READER); if (IS_NULL(ti_buf)) { return E_OOM; } @@ -638,7 +640,7 @@ int TsFileIOReader::do_load_timeseries_index( ByteStream bs; bs.wrap_from(ti_buf, read_size); const String target_measurement_name( - (char *)measurement_name_str.c_str(), + (char*)measurement_name_str.c_str(), strlen(measurement_name_str.c_str())); bool found = false; #if DEBUG_SE @@ -654,12 +656,12 @@ int TsFileIOReader::do_load_timeseries_index( } else if (is_aligned && cur_timeseries_index.get_measurement_name().equal_to( target_measurement_name)) { - void *buf = in_timeseries_index_pa.alloc( + void* buf = in_timeseries_index_pa.alloc( sizeof(AlignedTimeseriesIndex)); if (IS_NULL(buf)) { return E_OOM; } - AlignedTimeseriesIndex *aligned_ts_idx = + AlignedTimeseriesIndex* aligned_ts_idx = new (buf) AlignedTimeseriesIndex; buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); if (IS_NULL(buf)) { @@ -674,7 +676,7 @@ int TsFileIOReader::do_load_timeseries_index( } else if (!is_aligned && cur_timeseries_index.get_measurement_name().equal_to( target_measurement_name)) { - void *buf = + void* buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); auto ts_idx = new (buf) TimeseriesIndex; ts_idx->clone_from(cur_timeseries_index, @@ -693,17 +695,17 @@ int TsFileIOReader::do_load_timeseries_index( } int TsFileIOReader::do_load_all_timeseries_index( - std::vector, int64_t> > - &index_node_entry_list, - common::PageArena &in_timeseries_index_pa, - std::vector &ts_indexs) { + std::vector, int64_t>>& + index_node_entry_list, + common::PageArena& in_timeseries_index_pa, + std::vector& ts_indexs) { int ret = E_OK; - for (const auto &index_node_entry : index_node_entry_list) { + for (const auto& index_node_entry : index_node_entry_list) { int64_t start_offset = index_node_entry.first->get_offset(), end_offset = index_node_entry.second; int32_t read_size = (int32_t)(end_offset - start_offset); int32_t ret_read_len = 0; - char *ti_buf = in_timeseries_index_pa.alloc(read_size); + char* ti_buf = in_timeseries_index_pa.alloc(read_size); if (IS_NULL(ti_buf)) { return E_OOM; } @@ -714,7 +716,7 @@ int TsFileIOReader::do_load_all_timeseries_index( ByteStream bs; bs.wrap_from(ti_buf, read_size); while (bs.has_remaining()) { - void *buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); + void* buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); auto ts_idx = new (buf) TimeseriesIndex; if (RET_FAIL( ts_idx->deserialize_from(bs, &in_timeseries_index_pa))) { @@ -729,8 +731,8 @@ int TsFileIOReader::do_load_all_timeseries_index( int TsFileIOReader::get_all_leaf( std::shared_ptr index_node, - std::vector, int64_t> > - &index_node_entry_list) { + std::vector, int64_t>>& + index_node_entry_list) { int ret = E_OK; if (index_node->node_type_ == LEAF_MEASUREMENT || index_node->node_type_ == LEAF_DEVICE) { @@ -760,12 +762,12 @@ int TsFileIOReader::get_all_leaf( #endif ASSERT(read_size > 0 && read_size < (1 << 30)); PageArena cur_level_index_node_pa; - void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); - char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size); + void* buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)cur_level_index_node_pa.alloc(read_size); if (IS_NULL(buf) || IS_NULL(data_buf)) { return E_OOM; } - auto *cur_level_index_node_ptr = + auto* cur_level_index_node_ptr = new (buf) MetaIndexNode(&cur_level_index_node_pa); auto cur_level_index_node = std::shared_ptr( cur_level_index_node_ptr, MetaIndexNode::self_deleter); diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc b/cpp/src/reader/block/single_device_tsblock_reader.cc index 1df563cd8..8ae22b9ba 100644 --- a/cpp/src/reader/block/single_device_tsblock_reader.cc +++ b/cpp/src/reader/block/single_device_tsblock_reader.cc @@ -82,8 +82,8 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, device_query_task->get_column_mapping()->get_id_columns()) { const auto& column_pos_in_result = device_query_task->get_column_mapping()->get_column_pos(id_column); - int column_pos_in_id = - table_schema->find_id_column_order(id_column) + 1; + int column_pos_in_id = table_schema->find_id_column_order(id_column) + + (!table_schema->is_virtual_table()); id_column_contexts_.insert(std::make_pair( id_column, IdColumnContext(column_pos_in_result, column_pos_in_id))); @@ -214,8 +214,14 @@ int SingleDeviceTsBlockReader::fill_ids() { const auto& id_column_context = entry.second; for (int32_t pos : id_column_context.pos_in_result_) { std::string* device_tag = nullptr; - device_tag = device_query_task_->get_device_id()->get_segments().at( - id_column_context.pos_in_device_id_); + const auto& segments = + device_query_task_->get_device_id()->get_segments(); + int32_t pos_in_device_id = id_column_context.pos_in_device_id_; + if (pos_in_device_id >= 0 && + static_cast(pos_in_device_id) < segments.size()) { + device_tag = segments[pos_in_device_id]; + } + if (device_tag == nullptr) { ret = col_appenders_[pos + 1]->fill_null( current_block_->get_row_count()); diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index 4f47341c1..642afdbc3 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -43,7 +43,7 @@ int DeviceMetaIterator::next( } int DeviceMetaIterator::load_results() { - bool is_root_idx_node = true; + int root_num = meta_index_nodes_.size(); while (!meta_index_nodes_.empty()) { // To avoid ASan overflow. // using `const auto&` creates a reference @@ -58,12 +58,9 @@ int DeviceMetaIterator::load_results() { } else { return common::E_INVALID_NODE_TYPE; } - // The first MetaIndexNode is the root and is not loaded here, so no - // need to destruct it here. - if (!is_root_idx_node) { + if (root_num-- <= 0) { meta_data_index_node->~MetaIndexNode(); } - is_root_idx_node = false; } return common::E_OK; diff --git a/cpp/src/reader/device_meta_iterator.h b/cpp/src/reader/device_meta_iterator.h index 55f209134..8b39006b2 100644 --- a/cpp/src/reader/device_meta_iterator.h +++ b/cpp/src/reader/device_meta_iterator.h @@ -37,6 +37,16 @@ class DeviceMetaIterator { pa_.init(512, common::MOD_DEVICE_META_ITER); } + DeviceMetaIterator(TsFileIOReader *io_reader, + const std::vector &meta_index_node_list, + const Filter *id_filter) + : io_reader_(io_reader), id_filter_(id_filter) { + for (auto meta_index_node : meta_index_node_list) { + meta_index_nodes_.push(meta_index_node); + } + pa_.init(512, common::MOD_DEVICE_META_ITER); + } + ~DeviceMetaIterator() { pa_.destroy(); } bool has_next(); diff --git a/cpp/src/reader/imeta_data_querier.h b/cpp/src/reader/imeta_data_querier.h index 73a005e84..c034f9151 100644 --- a/cpp/src/reader/imeta_data_querier.h +++ b/cpp/src/reader/imeta_data_querier.h @@ -57,7 +57,10 @@ class IMetadataQuerier { virtual std::unique_ptr device_iterator( MetaIndexNode* root, const Filter* id_filter) = 0; -}; + // FIXME(Colin): refine this. + virtual std::unique_ptr device_iterator( + std::vector root, const Filter* id_filter) = 0; +}; } // end namespace storage #endif // READER_IMETA_DATA_QUERIER_H diff --git a/cpp/src/reader/meta_data_querier.cc b/cpp/src/reader/meta_data_querier.cc index 5a32b9221..0accbdde9 100644 --- a/cpp/src/reader/meta_data_querier.cc +++ b/cpp/src/reader/meta_data_querier.cc @@ -98,6 +98,12 @@ std::unique_ptr MetadataQuerier::device_iterator( new DeviceMetaIterator(io_reader_, root, id_filter)); } +std::unique_ptr MetadataQuerier::device_iterator( + std::vector root, const Filter* id_filter) { + return std::unique_ptr( + new DeviceMetaIterator(io_reader_, root, id_filter)); +} + int MetadataQuerier::load_chunk_meta( const std::pair& key, std::vector& chunk_meta_list) { diff --git a/cpp/src/reader/meta_data_querier.h b/cpp/src/reader/meta_data_querier.h index b4eed3501..525ecf86e 100644 --- a/cpp/src/reader/meta_data_querier.h +++ b/cpp/src/reader/meta_data_querier.h @@ -61,6 +61,9 @@ class MetadataQuerier : public IMetadataQuerier { std::unique_ptr device_iterator( MetaIndexNode* root, const Filter* id_filter) override; + std::unique_ptr device_iterator( + std::vector root, const Filter* id_filter) override; + void clear() override; private: diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index d09a5c904..78c682db2 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -19,17 +19,19 @@ #include "reader/table_query_executor.h" +#include "utils/db_utils.h" + namespace storage { -int TableQueryExecutor::query(const std::string &table_name, - const std::vector &columns, - Filter *time_filter, Filter *id_filter, - Filter *field_filter, ResultSet *&ret_qds) { +int TableQueryExecutor::query(const std::string& table_name, + const std::vector& columns, + Filter* time_filter, Filter* id_filter, + Filter* field_filter, ResultSet*& ret_qds) { int ret = common::E_OK; - TsFileMeta *file_metadata = nullptr; + TsFileMeta* file_metadata = nullptr; file_metadata = tsfile_io_reader_->get_tsfile_meta(); common::PageArena pa; pa.init(512, common::MOD_TSFILE_READER); - MetaIndexNode *table_root = nullptr; + MetaIndexNode* table_root = nullptr; std::shared_ptr table_schema; if (RET_FAIL( file_metadata->get_table_metaindex_node(table_name, table_root))) { @@ -42,7 +44,7 @@ int TableQueryExecutor::query(const std::string &table_name, return ret; } std::vector lower_case_column_names(columns); - for (auto &column : lower_case_column_names) { + for (auto& column : lower_case_column_names) { to_lowercase_inplace(column); } std::shared_ptr column_mapping = @@ -85,6 +87,117 @@ int TableQueryExecutor::query(const std::string &table_name, return ret; } -void TableQueryExecutor::destroy_query_data_set(ResultSet *qds) { delete qds; } +int TableQueryExecutor::query_on_tree( + const std::vector>& devices, + const std::vector& tag_columns, + const std::vector& field_columns, Filter* time_filter, + ResultSet*& ret_qds) { + common::PageArena pa; + pa.init(512, common::MOD_TSFILE_READER); + int ret = common::E_OK; + TsFileMeta* file_meta = tsfile_io_reader_->get_tsfile_meta(); + std::unordered_set table_inodes_dedu; + for (auto const& device : devices) { + MetaIndexNode* table_inode; + if (RET_FAIL(file_meta->get_table_metaindex_node( + device->get_table_name(), table_inode))) { + }; + table_inodes_dedu.insert(table_inode); + } + std::vector table_inodes(table_inodes_dedu.begin(), + table_inodes_dedu.end()); + + std::vector col_schema; + for (auto const& tag : tag_columns) { + col_schema.emplace_back(tag, common::TSDataType::STRING, + common::ColumnCategory::TAG); + } + + std::unordered_map column_types_map; + + for (auto const& device : devices) { + bool all_collected = true; + for (const auto& field_col : field_columns) { + if (column_types_map.find(field_col) == column_types_map.end()) { + all_collected = false; + break; + } + } + if (all_collected) { + break; + } + + std::unordered_set measurements(field_columns.begin(), + field_columns.end()); + std::vector index(measurements.size()); + if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes( + device, measurements, index, pa))) { + assert(0); + } + + for (auto* ts_index : index) { + if (ts_index != nullptr) { + std::string measurement_name = + ts_index->get_measurement_name().to_std_string(); + if (column_types_map.find(measurement_name) == + column_types_map.end()) { + common::TSDataType type = ts_index->get_data_type(); + // TODO(Colin): Fix type missmatch. + // if (type == common::TSDataType::INT32 || + // type == common::TSDataType::INT64 || + // type == common::TSDataType::TIMESTAMP || + // type == common::TSDataType::DATE) { + // type = common::TSDataType::INT64; + // } else if (type == common::TSDataType::FLOAT) { + // type = common::TSDataType::DOUBLE; + // } + column_types_map[measurement_name] = type; + } + } + } + } + + for (const auto& field_col : field_columns) { + if (column_types_map.find(field_col) != column_types_map.end()) { + col_schema.emplace_back(field_col, column_types_map[field_col], + common::ColumnCategory::FIELD); + } else { + col_schema.emplace_back(field_col, + common::TSDataType::INVALID_DATATYPE, + common::ColumnCategory::FIELD); + } + } + + auto schema = std::make_shared("default", col_schema); + schema->set_virtual_table(); + std::shared_ptr column_mapping = + std::make_shared(); + for (size_t i = 0; i < col_schema.size(); ++i) { + column_mapping->add(col_schema[i].column_name_, i, *schema); + } + std::vector datatypes = schema->get_data_types(); + auto device_task_iterator = + std::unique_ptr(new DeviceTaskIterator( + schema->get_measurement_names(), table_inodes, column_mapping, + meta_data_querier_, nullptr, schema)); + std::unique_ptr tsblock_reader; + switch (table_query_ordering_) { + case TableQueryOrdering::DEVICE: + tsblock_reader = std::unique_ptr( + new DeviceOrderedTsBlockReader( + std::move(device_task_iterator), meta_data_querier_, + block_size_, tsfile_io_reader_, time_filter, nullptr)); + break; + case TableQueryOrdering::TIME: + default: + ret = common::E_UNSUPPORTED_ORDER; + } + assert(tsblock_reader != nullptr); + ret_qds = new TableResultSet(std::move(tsblock_reader), + schema->get_measurement_names(), + schema->get_data_types()); + return ret; +} +void TableQueryExecutor::destroy_query_data_set(ResultSet* qds) { delete qds; } } // end namespace storage diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 83a82fe56..974e6b45b 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -37,15 +37,15 @@ class TableQueryExecutor { public: enum class TableQueryOrdering { TIME, DEVICE }; - TableQueryExecutor(IMetadataQuerier *meta_data_querier, - TsFileIOReader *tsfile_io_reader, + TableQueryExecutor(IMetadataQuerier* meta_data_querier, + TsFileIOReader* tsfile_io_reader, TableQueryOrdering table_query_ordering, int block_size = 1024) : meta_data_querier_(meta_data_querier), tsfile_io_reader_(tsfile_io_reader), table_query_ordering_(table_query_ordering), block_size_(block_size) {} - TableQueryExecutor(ReadFile *read_file) { + TableQueryExecutor(ReadFile* read_file) { tsfile_io_reader_ = new TsFileIOReader(); tsfile_io_reader_->init(read_file); meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_); @@ -62,14 +62,18 @@ class TableQueryExecutor { tsfile_io_reader_ = nullptr; } } - int query(const std::string &table_name, - const std::vector &columns, Filter *time_filter, - Filter *id_filter, Filter *field_filter, ResultSet *&ret_qds); - void destroy_query_data_set(ResultSet *qds); + int query(const std::string& table_name, + const std::vector& columns, Filter* time_filter, + Filter* id_filter, Filter* field_filter, ResultSet*& ret_qds); + int query_on_tree(const std::vector>& devices, + const std::vector& tag_columns, + const std::vector& field_columns, + Filter* time_filter, ResultSet*& ret_qds); + void destroy_query_data_set(ResultSet* qds); private: - IMetadataQuerier *meta_data_querier_; - TsFileIOReader *tsfile_io_reader_; + IMetadataQuerier* meta_data_querier_; + TsFileIOReader* tsfile_io_reader_; TableQueryOrdering table_query_ordering_; int32_t block_size_; }; diff --git a/cpp/src/reader/task/device_task_iterator.h b/cpp/src/reader/task/device_task_iterator.h index a5079877f..ec30a472d 100644 --- a/cpp/src/reader/task/device_task_iterator.h +++ b/cpp/src/reader/task/device_task_iterator.h @@ -43,6 +43,21 @@ class DeviceTaskIterator { table_schema_(table_schema) { pa_.init(512, common::MOD_DEVICE_TASK_ITER); } + + DeviceTaskIterator(std::vector column_names, + std::vector index_roots, + std::shared_ptr column_mapping, + IMetadataQuerier *metadata_querier, + const Filter *id_filter, + std::shared_ptr table_schema) + : column_names_(column_names), + column_mapping_(column_mapping), + device_meta_iterator_( + metadata_querier->device_iterator(index_roots, id_filter)), + table_schema_(table_schema) { + pa_.init(512, common::MOD_DEVICE_TASK_ITER); + } + ~DeviceTaskIterator() { pa_.destroy(); } bool has_next() const; diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index 6da094303..769b7072c 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -99,8 +99,6 @@ int TsFileReader::query(const std::string& table_name, return E_TABLE_NOT_EXIST; } - std::vector data_types = table_schema->get_data_types(); - Filter* time_filter = new TimeBetween(start_time, end_time, false); ret = table_query_executor_->query(to_lower(table_name), columns_names, @@ -108,6 +106,61 @@ int TsFileReader::query(const std::string& table_name, return ret; } +int TsFileReader::query_table_on_tree( + const std::vector& measurement_names, int64_t star_time, + int64_t end_time, ResultSet*& result_set) { + int ret = E_OK; + TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); + if (tsfile_meta == nullptr) { + return E_TSFILE_WRITER_META_ERR; + } + auto device_names = this->get_all_device_ids(); + std::vector> device_ids; + std::unordered_set measurement_names_dudp; + size_t max_len = 0; + for (auto& device_name : device_names) { + std::vector schemas; + this->get_timeseries_schema(device_name, schemas); + if (measurement_names.empty()) { + device_ids.push_back(device_name); + } + for (auto schema : schemas) { + if (measurement_names.empty()) { + measurement_names_dudp.insert(schema.measurement_name_); + if (device_name->get_segments().size() > max_len) { + max_len = device_name->get_segments().size(); + } + continue; + } + if (std::find(measurement_names.begin(), measurement_names.end(), + schema.measurement_name_) != + measurement_names.end()) { + device_ids.push_back(device_name); + if (device_name->get_segments().size() > max_len) { + max_len = device_name->get_segments().size(); + } + break; + } + } + } + std::vector mea_names; + if (measurement_names.empty() && !measurement_names_dudp.empty()) { + for (auto& measurement_name : measurement_names_dudp) { + mea_names.push_back(measurement_name); + } + } else { + mea_names = measurement_names; + } + std::vector columns_names(max_len); + for (int i = 0; i < max_len; i++) { + columns_names[i] = "l_" + std::to_string(i); + } + Filter* time_filter = new TimeBetween(star_time, end_time, false); + ret = table_query_executor_->query_on_tree( + device_ids, columns_names, mea_names, time_filter, result_set); + return ret; +} + void TsFileReader::destroy_query_data_set(storage::ResultSet* qds) { tsfile_executor_->destroy_query_data_set(qds); } diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index eb6a7b70e..d316ce3a6 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -54,7 +54,7 @@ class TsFileReader { * @param file_path the path of the tsfile which will be opened * @return Returns 0 on success, or a non-zero error code on failure. */ - int open(const std::string &file_path); + int open(const std::string& file_path); /** * @brief close the tsfile, this method should be called after the * query is finished @@ -70,7 +70,7 @@ class TsFileReader { * @param [out] ret_qds the result set * @return Returns 0 on success, or a non-zero error code on failure. */ - int query(storage::QueryExpression *qe, ResultSet *&ret_qds); + int query(storage::QueryExpression* qe, ResultSet*& ret_qds); /** * @brief query the tsfile by the path list, start time and end time * this method is used to query the tsfile by the tree model. @@ -80,8 +80,8 @@ class TsFileReader { * @param [in] end_time the end time * @param [out] result_set the result set */ - int query(std::vector &path_list, int64_t start_time, - int64_t end_time, ResultSet *&result_set); + int query(std::vector& path_list, int64_t start_time, + int64_t end_time, ResultSet*& result_set); /** * @brief query the tsfile by the table name, columns names, start time * and end time. this method is used to query the tsfile by the table @@ -93,19 +93,23 @@ class TsFileReader { * @param [in] end_time the end time * @param [out] result_set the result set */ - int query(const std::string &table_name, - const std::vector &columns_names, int64_t start_time, - int64_t end_time, ResultSet *&result_set); + int query(const std::string& table_name, + const std::vector& columns_names, int64_t start_time, + int64_t end_time, ResultSet*& result_set); + + int query_table_on_tree(const std::vector& measurement_names, + int64_t star_time, int64_t end_time, + ResultSet*& result_set); /** * @brief destroy the result set, this method should be called after the * query is finished and result_set * * @param qds the result set */ - void destroy_query_data_set(ResultSet *qds); - ResultSet *read_timeseries( - const std::shared_ptr &device_id, - const std::vector &measurement_name); + void destroy_query_data_set(ResultSet* qds); + ResultSet* read_timeseries( + const std::shared_ptr& device_id, + const std::vector& measurement_name); /** * @brief get all devices in the tsfile * @@ -131,7 +135,7 @@ class TsFileReader { * @return Returns 0 on success, or a non-zero error code on failure. */ int get_timeseries_schema(std::shared_ptr device_id, - std::vector &result); + std::vector& result); /** * @brief get the table schema by the table name * @@ -139,7 +143,7 @@ class TsFileReader { * @return std::shared_ptr the table schema */ std::shared_ptr get_table_schema( - const std::string &table_name); + const std::string& table_name); /** * @brief get all table schemas in the tsfile * @@ -148,12 +152,12 @@ class TsFileReader { std::vector> get_all_table_schemas(); private: - int get_all_devices(std::vector> &device_ids, + int get_all_devices(std::vector>& device_ids, std::shared_ptr index_node, - common::PageArena &pa); - storage::ReadFile *read_file_; - storage::TsFileExecutor *tsfile_executor_; - storage::TableQueryExecutor *table_query_executor_; + common::PageArena& pa); + storage::ReadFile* read_file_; + storage::TsFileExecutor* tsfile_executor_; + storage::TableQueryExecutor* table_query_executor_; }; } // namespace storage diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 1beb310d7..c48866413 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -80,6 +80,7 @@ include_directories( ${LIBRARY_INCLUDE_DIR} ${THIRD_PARTY_INCLUDE} ${THIRD_PARTY_INCLUDE}/google_snappy + ${THIRD_PARTY_INCLUDE}/zlib-1.2.13 ${CMAKE_SOURCE_DIR}/third_party/lz4 ${CMAKE_SOURCE_DIR}/third_party/google_snappy ${CMAKE_SOURCE_DIR}/third_party/lzokay diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index 90a93fb42..644dbfb60 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -182,4 +182,43 @@ TEST_F(CWrapperTest, WriterFlushTabletAndReadData) { free(data_types); free_write_file(&file); } +TEST_F(CWrapperTest, WriterFlushTabletAndReadData1) { + int code = RET_OK; + TsFileReader reader = tsfile_reader_new( + "/Users/colin/dev/tsfile/python/tests/record_write_and_read.tsfile", + &code); + ASSERT_EQ(code, 0); + char** columns = (char**)malloc(2 * sizeof(char*)); + // columns[0] = strdup("level2"); + // columns[1] = strdup("level1"); + // ResultSet result_set = + // tsfile_query_table_on_tree(reader, columns, 2, 0, 100, &code); + + ResultSet result_set = + tsfile_query_table_on_tree(reader, nullptr, 0, 0, 100, &code); + auto schema = tsfile_result_set_get_metadata(result_set); + while (tsfile_result_set_next(result_set, &code) && code == RET_OK) { + for (int i = 1; i < schema.column_num + 1; i++) { + char* ret = nullptr; + if (tsfile_result_set_is_null_by_index(result_set, i)) { + std::cout << "null "; + continue; + } + switch (schema.data_types[i - 1]) { + case TS_DATATYPE_STRING: + std::cout << tsfile_result_set_get_value_by_index_string( + result_set, i); + break; + case TS_DATATYPE_INT64: + std::cout << tsfile_result_set_get_value_by_index_int64_t( + result_set, i); + break; + default: + break; + } + } + } + ASSERT_EQ(code, 0); +} + } // namespace cwrapper \ No newline at end of file diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc index 477ab24b5..c5bb77b3a 100644 --- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc +++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc @@ -32,6 +32,7 @@ namespace storage { class QDSWithoutTimeGenerator; } + using namespace storage; using namespace common; @@ -49,7 +50,8 @@ class TsFileTreeReaderTest : public ::testing::Test { mode_t mode = 0666; write_file_.create(file_name_, flags, mode); } - void TearDown() override { remove(file_name_.c_str()); } + + void TearDown() override {} std::string file_name_; WriteFile write_file_; @@ -108,6 +110,138 @@ TEST_F(TsFileTreeReaderTest, BasicTest) { reader.close(); } +TEST_F(TsFileTreeReaderTest, ReadTreeByTable) { + TsFileTreeWriter writer(&write_file_); + std::vector device_ids = {"root.db1.t1", "root.db2.t1", + "root.db3.t2.t3", "root.db3.t3", + "device"}; + std::vector measurement_ids = {"temperature", "hudi", "level"}; + for (auto& device_id : device_ids) { + TsRecord record(device_id, 0); + TsRecord record1(device_id, 1); + for (auto const& measurement : measurement_ids) { + auto schema = + new storage::MeasurementSchema(measurement, TSDataType::INT32); + ASSERT_EQ(E_OK, writer.register_timeseries(device_id, schema)); + delete schema; + record.add_point(measurement, static_cast(1)); + record1.add_point(measurement, static_cast(2)); + } + ASSERT_EQ(E_OK, writer.write(record)); + ASSERT_EQ(E_OK, writer.write(record1)); + } + writer.flush(); + writer.close(); + + TsFileReader reader; + reader.open(file_name_); + ResultSet* result; + int ret = reader.query_table_on_tree({"temperature", "hudi"}, INT64_MIN, + INT64_MAX, result); + ASSERT_EQ(ret, E_OK); + + auto* table_result_set = (storage::TableResultSet*)result; + bool has_next = false; + int num = table_result_set->get_metadata()->get_column_count(); + std::unordered_map res; + res["root.db1"] = "t1"; + res["root.db2"] = "t1"; + res["root.db3.t2"] = "t3"; + res["root.db3"] = "t3"; + res["device"] = "null"; + int cnt = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + auto t = table_result_set->get_value(1); + ASSERT_TRUE(t == 0 || t == 1); + std::string key = ""; + std::string value = ""; + for (int i = 1; i < num + 1; ++i) { + switch (table_result_set->get_metadata()->get_column_type(i)) { + case INT64: + ASSERT_TRUE(table_result_set->get_value(i) == 1 || + table_result_set->get_value(i) == 0); + break; + case INT32: + ASSERT_TRUE(table_result_set->get_value(i) == 1 || + table_result_set->get_value(i) == 2); + break; + case STRING: { + common::String* str = + table_result_set->get_value(i); + if (i == 2) { + key = std::string(str->buf_, str->len_); + ASSERT_TRUE(res.find(key) != res.end()); + } + if (i == 3) { + if (str == nullptr) { + value = "null"; + } else { + value = std::string(str->buf_, str->len_); + } + ASSERT_TRUE(res.find(key) != res.end()); + ASSERT_TRUE(res[key] == value); + } + } break; + default: + break; + } + } + cnt++; + } + ASSERT_EQ(cnt, 10); + reader.destroy_query_data_set(result); + std::vector t; + ret = reader.query_table_on_tree(t, INT64_MIN, INT64_MAX, result); + + ASSERT_EQ(ret, E_OK); + table_result_set = (storage::TableResultSet*)result; + has_next = false; + num = table_result_set->get_metadata()->get_column_count(); + ASSERT_EQ(num, 6); + + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + auto t = table_result_set->get_value(1); + ASSERT_TRUE(t == 0 || t == 1); + std::string key = ""; + std::string value = ""; + for (int i = 1; i < num + 1; ++i) { + switch (table_result_set->get_metadata()->get_column_type(i)) { + case INT64: + ASSERT_TRUE(table_result_set->get_value(i) == 1 || + table_result_set->get_value(i) == 0); + break; + case INT32: + ASSERT_TRUE(table_result_set->get_value(i) == 1 || + table_result_set->get_value(i) == 2); + break; + case STRING: { + common::String* str = + table_result_set->get_value(i); + if (i == 2) { + key = std::string(str->buf_, str->len_); + ASSERT_TRUE(res.find(key) != res.end()); + } + if (i == 3) { + if (str == nullptr) { + value = "null"; + } else { + value = std::string(str->buf_, str->len_); + } + ASSERT_TRUE(res.find(key) != res.end()); + ASSERT_TRUE(res[key] == value); + } + } break; + default: + break; + } + } + cnt++; + } + + reader.destroy_query_data_set(result); + reader.close(); +} + TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) { TsFileTreeWriter writer(&write_file_); std::vector device_ids = {"device_1", "device_2", "device_3"}; diff --git a/pom.xml b/pom.xml index 51fbe3145..f85b9f245 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ org.apache.tsfile tsfile-parent - 2.2.0-SNAPSHOT + 2.1.5 pom Apache TsFile Project Parent POM @@ -135,6 +135,10 @@ **/tsfile.egg-info/** **/third_party/** + **/.python-version + **/**venv-py**/** + **/.python-version + python/.python-version @@ -445,6 +449,8 @@ 4 **/target/** + python/.python-version + **/.python-version @@ -587,7 +593,6 @@ with-python - cpp python @@ -751,8 +756,8 @@ win windows-amd64 MinGW Makefiles - venv/Scripts/ - python + + python.exe diff --git a/python/how_to_build_and_release_py_tsfile.md b/python/how_to_build_and_release_py_tsfile.md new file mode 100644 index 000000000..ea7cea42b --- /dev/null +++ b/python/how_to_build_and_release_py_tsfile.md @@ -0,0 +1,28 @@ + + +## Build on local + +## Build on docker + +## Build for release + +## Release TsFile to Pypi and Conda \ No newline at end of file diff --git a/python/pom.xml b/python/pom.xml index 765084f76..aa251adb0 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -22,7 +22,7 @@ org.apache.tsfile tsfile-parent - 2.2.0-SNAPSHOT + 2.1.5 tsfile-python pom @@ -41,21 +41,6 @@ exec-maven-plugin - - python-venv - initialize - - exec - - - ${python.exe.bin} - - -m - venv - ${project.basedir}/venv - - - python-upgrade-pip initialize @@ -63,7 +48,7 @@ exec - ${python.venv.bin}${python.exe.bin} + ${python.exe.bin} -m pip @@ -80,7 +65,7 @@ exec - ${python.venv.bin}${python.exe.bin} + ${python.exe.bin} -m pip @@ -97,7 +82,7 @@ exec - ${python.venv.bin}${python.exe.bin} + ${python.exe.bin} setup.py build_ext @@ -112,7 +97,7 @@ exec - ${python.venv.bin}${python.exe.bin} + ${python.exe.bin} -m pip @@ -121,19 +106,6 @@ - - run-python-tests - test - - exec - - - ${python.venv.bin}pytest - - ${project.basedir}/tests - - - build-whl package @@ -141,7 +113,7 @@ exec - ${python.venv.bin}${python.exe.bin} + ${python.exe.bin} setup.py bdist_wheel diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 000000000..f14350b66 --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +[build-system] +requires = [ + "setuptools>=69", + "wheel", + "Cython>=3", + "numpy>=1.20" + ] + +build-backend = "setuptools.build_meta" + +[project] +name = "tsfile" +version = "2.1.5" +requires-python = ">=3.9" +description = "TsFile Python" +readme = {file = "README.md", content-type = "text/markdown"} +maintainers = [ + {name = "Apache TsFile Developers", email = "dev@tsfile.apache.org"} +] +dependencies = [ + "numpy>=1.20", + "pandas>=2.0" +] + +[project.urls] +Homepage = "https://tsfile.apache.org/" +Documentation = "https://tsfile.apache.org/zh/UserGuide/latest/QuickStart/Navigating_Time_Series_Data.html" +Repository = "https://github.com/apache/tsfile" +Issues = "https://github.com/apache/tsfile/issues" + +[tool.setuptools] +package-dir = {"" = "."} + +[tool.setuptools.packages.find] +where = ["."] +include = ["tsfile*"] + +[tool.setuptools.package-data] +tsfile = [ + "*.pxd", + "*.pxi", + "*.so", + "*.so.*", + "*.dylib", + "*.dylib.*", + "*.dll", + "*.dll.a", + "*.lib", + "*.lib.a", + "include/**/*" +] diff --git a/python/requirements.txt b/python/requirements.txt index 06f90808e..08baaac71 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -18,8 +18,8 @@ # cython==3.0.10 -numpy==1.26.4 -pandas==2.2.2 +numpy +pandas setuptools==78.1.1 wheel==0.45.1 diff --git a/python/setup.py b/python/setup.py index 329cc2aae..bfbe8a704 100644 --- a/python/setup.py +++ b/python/setup.py @@ -19,151 +19,120 @@ import os import platform import shutil - +import sys import numpy as np + +from pathlib import Path from Cython.Build import cythonize from setuptools import setup, Extension from setuptools.command.build_ext import build_ext -version = "2.1.0.dev0" -system = platform.system() - - -def copy_tsfile_lib(source_dir, target_dir, suffix): - lib_file_name = f"libtsfile.{suffix}" - source = os.path.join(source_dir, lib_file_name) - target = os.path.join(target_dir, lib_file_name) - - if os.path.exists(source): - shutil.copyfile(source, target) - - if system == "Linux": - link_name = os.path.join(target_dir, "libtsfile.so") - if os.path.exists(link_name): - os.remove(link_name) - os.symlink(lib_file_name, link_name) - elif system == "Darwin": - link_name = os.path.join(target_dir, "libtsfile.dylib") - if os.path.exists(link_name): - os.remove(link_name) - os.symlink(lib_file_name, link_name) - - -project_dir = os.path.dirname(os.path.abspath(__file__)) -tsfile_py_include = os.path.join(project_dir, "tsfile", "include") - -if os.path.exists(tsfile_py_include): - shutil.rmtree(tsfile_py_include) - -shutil.copytree( - os.path.join(project_dir, "..", "cpp", "target", "build", "include"), - os.path.join(tsfile_py_include, ""), -) - - -def copy_tsfile_header(source): - for file in source: - if os.path.exists(file): - target = os.path.join(tsfile_py_include, os.path.basename(file)) - shutil.copyfile(file, target) +ROOT = Path(__file__).parent.resolve() +PKG = ROOT / "tsfile" +CPP_OUT = ROOT / ".." / "cpp" / "target" / "build" +CPP_LIB = CPP_OUT / "lib" +CPP_INC = CPP_OUT / "include" +version = "2.1.5" +system = platform.system() -## Copy C wrapper header. -# tsfile/cpp/src/cwrapper/tsfile_cwrapper.h -source_headers = [ - os.path.join(project_dir, "..", "cpp", "src", "cwrapper", "tsfile_cwrapper.h"), -] - -copy_tsfile_header(source_headers) - -## Copy shared library -tsfile_shared_source_dir = os.path.join(project_dir, "..", "cpp", "target", "build", "lib") -tsfile_shared_dir = os.path.join(project_dir, "tsfile") - -if system == "Darwin": - copy_tsfile_lib(tsfile_shared_source_dir, tsfile_shared_dir, version + ".dylib") -elif system == "Linux": - copy_tsfile_lib(tsfile_shared_source_dir, tsfile_shared_dir, "so." + version) +(PKG / "include").mkdir(exist_ok=True) +if (PKG / "include").exists() and CPP_INC.exists(): + shutil.rmtree(PKG / "include") + shutil.copytree(CPP_INC, PKG / "include") +if sys.platform.startswith("linux"): + candidates = sorted(CPP_LIB.glob("libtsfile.so*"), key=lambda p: len(p.name), reverse=True) + if not candidates: + raise FileNotFoundError("missing libtsfile.so* in build output") + src = candidates[0] + dst = PKG / src.name + shutil.copy2(src, dst) + link_name = PKG / "libtsfile.so" + shutil.copy2(src, link_name) + +elif sys.platform == "darwin": + candidates = sorted(CPP_LIB.glob("libtsfile.*.dylib")) or list(CPP_LIB.glob("libtsfile.dylib")) + if not candidates: + raise FileNotFoundError("missing libtsfile*.dylib in build output") + src = candidates[0] + dst = PKG / src.name + shutil.copy2(src, dst) + link_name = PKG / "libtsfile.dylib" + shutil.copy2(src, link_name) +elif sys.platform == "win32": + for base_name in ("libtsfile",): + dll_candidates = sorted(CPP_LIB.glob(f"{base_name}*.dll"), key=lambda p: len(p.name), reverse=True) + dll_a_candidates = sorted(CPP_LIB.glob(f"{base_name}*.dll.a"), key=lambda p: len(p.name), reverse=True) + + if not dll_candidates: + raise FileNotFoundError(f"missing {base_name}*.dll in build output") + if not dll_a_candidates: + raise FileNotFoundError(f"missing {base_name}*.dll.a in build output") + + dll_src = dll_candidates[0] + dll_a_src = dll_a_candidates[0] + + shutil.copy2(dll_src, PKG / f"{base_name}.dll") + shutil.copy2(dll_a_src, PKG / f"{base_name}.dll.a") else: - copy_tsfile_lib(tsfile_shared_source_dir, tsfile_shared_dir, "dll") - -tsfile_include_dir = os.path.join(project_dir, "tsfile", "include") - -ext_modules_tsfile = [ - # utils: from python to c or c to python. - Extension( - "tsfile.tsfile_py_cpp", - sources=[os.path.join("tsfile", "tsfile_py_cpp.pyx")], - libraries=["tsfile"], - library_dirs=[tsfile_shared_dir], - include_dirs=[tsfile_include_dir, np.get_include()], - runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, - extra_compile_args=( - ["-std=c++11"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] - ), - language="c++", - ), - # query data and describe schema: tsfile reader module - Extension( - "tsfile.tsfile_reader", - sources=[os.path.join("tsfile", "tsfile_reader.pyx")], - libraries=["tsfile"], - library_dirs=[tsfile_shared_dir], - depends=[os.path.join("tsfile", "tsfile_py_cpp.pxd")], - include_dirs=[tsfile_include_dir, np.get_include()], - runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, - extra_compile_args=( - ["-std=c++11"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] - ), - language="c++", - ), - # write data and register schema: tsfile writer module - Extension( - "tsfile.tsfile_writer", - sources=[os.path.join("tsfile", "tsfile_writer.pyx")], - libraries=["tsfile"], - library_dirs=[tsfile_shared_dir], - depends=[os.path.join("tsfile", "tsfile_py_cpp.pxd")], - include_dirs=[tsfile_include_dir, np.get_include()], - runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, - extra_compile_args=( - ["-std=c++11"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] - ), - language="c++", - ) -] + raise RuntimeError(f"Unsupported platform: {sys.platform}") class BuildExt(build_ext): - def build_extensions(self): - numpy_include = np.get_include() - for ext in self.extensions: - ext.include_dirs.append(numpy_include) - super().build_extensions() + def run(self): + super().run() def finalize_options(self): - if system == "Windows": + if sys.platform == "win32": self.compiler = "mingw32" super().finalize_options() +extra_compile_args = [] +extra_link_args = [] +runtime_library_dirs = [] +libraries = [] +library_dirs = [str(PKG)] +include_dirs = [str(PKG), np.get_include(), str(PKG / "include")] + +if sys.platform.startswith("linux"): + libraries = ["tsfile"] + extra_compile_args += ["-O3", "-std=c++11", "-fvisibility=hidden", "-fPIC"] + runtime_library_dirs = ["$ORIGIN"] + extra_link_args += ["-Wl,-rpath,$ORIGIN"] +elif sys.platform == "darwin": + libraries = ["tsfile"] + extra_compile_args += ["-O3", "-std=c++11", "-fvisibility=hidden", "-fPIC"] + extra_link_args += ["-Wl,-rpath,@loader_path", "-stdlib=libc++"] +elif sys.platform == "win32": + libraries = ["Tsfile"] + extra_compile_args += ["-O2", "-std=c++11", "-DSIZEOF_VOID_P=8", "-D__USE_MINGW_ANSI_STDIO=1", "-DMS_WIN64", + "-D_WIN64"] +else: + raise RuntimeError(f"Unsupported platform: {sys.platform}") + +common = dict( + language="c++", + include_dirs=include_dirs, + library_dirs=library_dirs, + libraries=libraries, + extra_compile_args=extra_compile_args, + extra_link_args=extra_link_args, + runtime_library_dirs=runtime_library_dirs, +) + +exts = [ + Extension("tsfile.tsfile_py_cpp", ["tsfile/tsfile_py_cpp.pyx"], **common), + Extension("tsfile.tsfile_reader", ["tsfile/tsfile_reader.pyx"], **common), + Extension("tsfile.tsfile_writer", ["tsfile/tsfile_writer.pyx"], **common), +] + setup( name="tsfile", - version=version, - description="Tsfile reader and writer for python", - url="https://tsfile.apache.org", - author='"Apache TsFile"', + version="2.1.0", packages=["tsfile"], - license="Apache 2.0", - ext_modules=cythonize(ext_modules_tsfile), - cmdclass={"build_ext": BuildExt}, - include_dirs=[np.get_include()], - package_dir={"tsfile": "./tsfile"}, - package_data={ - "tsfile": [ - "libtsfile.*", - "*.pxd" - ] - }, + package_dir={"": "."}, include_package_data=True, + ext_modules=cythonize(exts, compiler_directives={"language_level": 3}), + cmdclass={"build_ext": BuildExt}, ) diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index e5c87ab92..b5f4e07b6 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -20,6 +20,7 @@ import numpy as np import pytest + from tsfile import ColumnSchema, TableSchema, TSEncoding from tsfile import Compressor from tsfile import TSDataType @@ -34,17 +35,18 @@ def test_row_record_write_and_read(): try: writer = TsFileWriter("record_write_and_read.tsfile") - timeseries = TimeseriesSchema("level1", TSDataType.INT64) - writer.register_timeseries("root.device1", timeseries) + writer.register_timeseries("root.device1", TimeseriesSchema("level1", TSDataType.INT64)) writer.register_timeseries("root.device1", TimeseriesSchema("level2", TSDataType.DOUBLE)) - writer.register_timeseries("root.device1", TimeseriesSchema("level3", TSDataType.INT32)) + writer.register_timeseries("root.device2", TimeseriesSchema("level1", TSDataType.INT32)) max_row_num = 1000 for i in range(max_row_num): row = RowRecord("root.device1", i, [Field("level1", i + 1, TSDataType.INT64), - Field("level2", i * 1.1, TSDataType.DOUBLE), - Field("level3", i * 2, TSDataType.INT32)]) + Field("level2", i * 1.1, TSDataType.DOUBLE)]) + writer.write_row_record(row) + row = RowRecord("root.device2", i, + [Field("level1", i + 1, TSDataType.INT32)]) writer.write_row_record(row) writer.close() @@ -52,12 +54,16 @@ def test_row_record_write_and_read(): reader = TsFileReader("record_write_and_read.tsfile") result = reader.query_timeseries("root.device1", ["level1", "level2"], 10, 100) i = 10 - while result.next(): - print(result.get_value_by_index(1)) - print(reader.get_active_query_result()) result.close() + result2 = reader.query_table_on_tree(["level1", "level2"], 40, 50) + ret = result2.read_data_frame() + print(ret) + result2.close() print(reader.get_active_query_result()) reader.close() + + + finally: if os.path.exists("record_write_and_read.tsfile"): os.remove("record_write_and_read.tsfile") @@ -305,3 +311,30 @@ def test_tsfile_to_df(): to_dataframe("table_write_to_df.tsfile", "test_table", ["device1"]) finally: os.remove("table_write_to_df.tsfile") + + +def test_tsfile_tree_to_df(): + try: + if os.path.exists("record_write_and_read.tsfile"): + os.remove("record_write_and_read.tsfile") + writer = TsFileWriter("record_write_and_read.tsfile") + writer.register_timeseries("root.device1", TimeseriesSchema("level1", TSDataType.INT64)) + writer.register_timeseries("root.device1", TimeseriesSchema("level2", TSDataType.DOUBLE)) + writer.register_timeseries("root.device2", TimeseriesSchema("level1", TSDataType.INT64)) + + max_row_num = 100 + for i in range(max_row_num): + row = RowRecord("root.device1", i, + [Field("level1", i + 1, TSDataType.INT64), + Field("level2", i * 1.1, TSDataType.DOUBLE)]) + writer.write_row_record(row) + row = RowRecord("root.device2", i, + [Field("level1", i + 1, TSDataType.INT64)]) + writer.write_row_record(row) + + writer.close() + + df = to_dataframe("record_write_and_read.tsfile") + assert df.shape == (2 * max_row_num, 5) + finally: + print("error") diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index bf755fcef..5b120c0db 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -19,10 +19,6 @@ import ctypes import os import platform -system = platform.system() -if system == "Windows": - ctypes.WinDLL(os.path.join(os.path.dirname(__file__), "libtsfile.dll"), winmode=0) - from .constants import * from .schema import * from .row_record import * diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 1b04051c9..e7f45fea3 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -22,7 +22,7 @@ from libc.stdint cimport uint32_t, int32_t, int64_t, uint64_t, uint8_t ctypedef int32_t ErrorCode # import symbols from tsfile_cwrapper.h -cdef extern from "./tsfile_cwrapper.h": +cdef extern from "cwrapper/tsfile_cwrapper.h": # common ctypedef int64_t timestamp @@ -167,6 +167,12 @@ cdef extern from "./tsfile_cwrapper.h": const char * table_name, const char** columns, uint32_t column_num, int64_t start_time, int64_t end_time, ErrorCode *err_code) + + ResultSet tsfile_query_table_on_tree(TsFileReader reader, + char** columns, uint32_t column_num, + int64_t start_time, int64_t end_time, + ErrorCode* err_code); + ResultSet _tsfile_reader_query_device(TsFileReader reader, const char *device_name, char ** sensor_name, uint32_t sensor_num, @@ -196,7 +202,7 @@ cdef extern from "./tsfile_cwrapper.h": -cdef extern from "./common/config/config.h" namespace "common": +cdef extern from "common/config/config.h" namespace "common": cdef cppclass ConfigValue: uint32_t tsblock_mem_inc_step_size_ uint32_t tsblock_max_memory_ @@ -218,7 +224,7 @@ cdef extern from "./common/config/config.h" namespace "common": uint8_t string_encoding_type_; uint8_t default_compression_type_; -cdef extern from "./common/global.h" namespace "common": +cdef extern from "common/global.h" namespace "common": ConfigValue g_config_value_ int set_datatype_encoding(uint8_t data_type, uint8_t encoding) int set_global_compression(uint8_t compression) diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index ce907a796..6ed038382 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -49,6 +49,8 @@ cdef public api ErrorCode tsfile_writer_register_table_py_cpp(TsFileWriter write cdef public api bint tsfile_result_set_is_null_by_name_c(ResultSet result_set, object name) cdef public api ResultSet tsfile_reader_query_table_c(TsFileReader reader, object table_name, object column_list, int64_t start_time, int64_t end_time) +cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object column_list, + int64_t start_time, int64_t end_time) cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time) cdef public api object get_table_schema(TsFileReader reader, object table_name) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index e17430399..1d7af3859 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -515,6 +515,32 @@ cdef ResultSet tsfile_reader_query_table_c(TsFileReader reader, object table_nam free(columns) columns = NULL +cdef ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object column_list, + int64_t start_time, int64_t end_time): + cdef ResultSet result + cdef int column_num = len(column_list) + cdef char** columns = malloc(sizeof(char*) * column_num) + cdef int i + cdef ErrorCode code = 0 + if columns == NULL: + raise MemoryError("Failed to allocate memory for columns") + try: + for i in range(column_num): + columns[i] = strdup((column_list[i]).encode('utf-8')) + if columns[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_query_table_on_tree(reader, columns, column_num, start_time, end_time, &code) + check_error(code) + return result + finally: + if columns != NULL: + for i in range(column_num): + free(columns[i]) + columns[i] = NULL + free(columns) + columns = NULL + + cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time): cdef ResultSet result diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index e8d38d7df..730305d6f 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -97,7 +97,6 @@ cdef class ResultSetPy: self.check_result_set_invalid() column_names = self.metadata.get_column_list() column_num = self.metadata.get_column_num() - date_columns = [ column_names[i] for i in range(column_num) @@ -135,7 +134,6 @@ cdef class ResultSetPy: cur_line += 1 else: break - df = pd.DataFrame(data_container) data_type_dict = {col: dtype for col, dtype in zip(column_names, data_type)} df = df.astype(data_type_dict) @@ -291,6 +289,21 @@ cdef class TsFileReaderPy: pyresult.init_c(result, table_name) self.activate_result_set_list.add(pyresult) return pyresult + + def query_table_on_tree(self, column_names : List[str], + start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy: + """ + Execute a time range query on specified columns on tree structure. + :return: query result handler. + """ + cdef ResultSet result; + result = tsfile_reader_query_table_on_tree_c(self.reader, + [column_name.lower() for column_name in column_names], start_time, + end_time) + pyresult = ResultSetPy(self, True) + pyresult.init_c(result, "root") + self.activate_result_set_list.add(pyresult) + return pyresult def query_timeseries(self, device_name : str, sensor_list : List[str], start_time : int = 0, end_time : int = 0) -> ResultSetPy: diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index 3d2366061..2f9e96e80 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -29,28 +29,39 @@ def to_dataframe(file_path: str, with TsFileReaderPy(file_path) as reader: total_rows = 0 table_schema = reader.get_all_table_schemas() - if len(table_schema) == 0: - raise TableNotExistError("Not found any table in the TsFile.") - if table_name is None: - # get the first table name by default - table_name, columns = next(iter(table_schema.items())) + + is_tree_model = len(table_schema) == 0 + + if is_tree_model: + if column_names is None: + print("columns name is None, return all columns") else: - if table_name not in table_schema: - raise TableNotExistError(table_name) - columns = table_schema[table_name] + if table_name is None: + table_name, columns = next(iter(table_schema.items())) + else: + if table_name not in table_schema: + raise TableNotExistError(table_name) + columns = table_schema[table_name] - column_names_in_file = columns.get_column_names() + column_names_in_file = columns.get_column_names() - if column_names is not None: - for column in column_names: - if column not in column_names_in_file: - raise ColumnNotExistError(column) - else: - column_names = column_names_in_file + if column_names is not None: + for column in column_names: + if column not in column_names_in_file: + raise ColumnNotExistError(column) + else: + column_names = column_names_in_file df_list: list[pd.DataFrame] = [] - - with reader.query_table(table_name, column_names) as result: + + if is_tree_model: + if column_names is None: + column_names = [] + query_result = reader.query_table_on_tree(column_names) + else: + query_result = reader.query_table(table_name, column_names) + + with query_result as result: while result.next(): if max_row_num is not None: remaining_rows = max_row_num - total_rows @@ -63,5 +74,6 @@ def to_dataframe(file_path: str, else: df = result.read_data_frame() df_list.append(df) + df = pd.concat(df_list, ignore_index=True) return df