diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java index e6a728790..0628cdea7 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.TreeMap; /** TSFileMetaData collects all metadata info and saves in its data structure. */ @@ -43,6 +44,7 @@ public class TsFileMetadata { // List of private Map tableMetadataIndexNodeMap; private Map tableSchemaMap; + private int tableSchemaNum; private boolean hasTableSchemaMapCache; private Map tsFileProperties; @@ -57,6 +59,8 @@ public class TsFileMetadata { private String encryptType; + private long tableStatisticsOffset = -1; + public static TsFileMetadata deserializeAndCacheTableSchemaMap( ByteBuffer buffer, DeserializeConfig context) { return deserializeFrom(buffer, context, true); @@ -90,9 +94,9 @@ public static TsFileMetadata deserializeFrom( fileMetaData.setTableMetadataIndexNodeMap(tableIndexNodeMap); // tableSchemas - int tableSchemaNum = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); + fileMetaData.tableSchemaNum = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); Map tableSchemaMap = new HashMap<>(); - for (int i = 0; i < tableSchemaNum; i++) { + for (int i = 0; i < fileMetaData.tableSchemaNum; i++) { String tableName = ReadWriteIOUtils.readVarIntString(buffer); TableSchema tableSchema = context.tableSchemaBufferDeserializer.deserialize(buffer, context); if (needTableSchemaMap) { @@ -122,6 +126,10 @@ public static TsFileMetadata deserializeFrom( String value = ReadWriteIOUtils.readVarIntString(buffer); propertiesMap.put(key, value); } + String tableStatisticsOffsetStr = propertiesMap.get("tableStatisticsOffset"); + if (tableStatisticsOffsetStr != null) { + fileMetaData.tableStatisticsOffset = Long.parseLong(tableStatisticsOffsetStr); + } // if the file is not encrypted, set the default value(for compatible reason) if (!propertiesMap.containsKey("encryptLevel") || propertiesMap.get("encryptLevel") == null) { propertiesMap.put("encryptLevel", "0"); @@ -289,4 +297,12 @@ public Map getTableSchemaMap() { public Map getTsFileProperties() { return tsFileProperties; } + + public Optional getTableStatisticsOffset() { + return tableStatisticsOffset > 0 ? Optional.of(tableStatisticsOffset) : Optional.empty(); + } + + public int getTableSchemaNum() { + return tableSchemaNum; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/BinaryStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/BinaryStatistics.java index 9915121d2..129668867 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/BinaryStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/BinaryStatistics.java @@ -81,9 +81,9 @@ private void updateLastStats(Binary lastValue) { private void updateStats(Binary firstValue, Binary lastValue, long startTime, long endTime) { // only if endTime greater or equals to the current endTime need we update the last value - // only if startTime less or equals to the current startTime need we update the first value + // only if startTime less to the current startTime need we update the first value // otherwise, just ignore - if (startTime <= this.getStartTime()) { + if (startTime < this.getStartTime()) { this.firstValue = firstValue; } if (endTime >= this.getEndTime()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/BooleanStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/BooleanStatistics.java index d519a6ca9..87d22dd2e 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/BooleanStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/BooleanStatistics.java @@ -75,9 +75,9 @@ private void updateStats(boolean lastValue, long sum) { private void updateStats( boolean firstValue, boolean lastValue, long startTime, long endTime, long sum) { // only if endTime greater or equals to the current endTime need we update the last value - // only if startTime less or equals to the current startTime need we update the first value + // only if startTime less to the current startTime need we update the first value // otherwise, just ignore - if (startTime <= this.getStartTime()) { + if (startTime < this.getStartTime()) { this.firstValue = firstValue; } if (endTime >= this.getEndTime()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/DoubleStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/DoubleStatistics.java index 6d406d5f1..30d12d014 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/DoubleStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/DoubleStatistics.java @@ -101,9 +101,9 @@ private void updateStats( } this.sumValue += sumValue; // only if endTime greater or equals to the current endTime need we update the last value - // only if startTime less or equals to the current startTime need we update the first value + // only if startTime less to the current startTime need we update the first value // otherwise, just ignore - if (startTime <= this.getStartTime()) { + if (startTime < this.getStartTime()) { this.firstValue = firstValue; } if (endTime >= this.getEndTime()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/FloatStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/FloatStatistics.java index fd7643493..43ab8680b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/FloatStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/FloatStatistics.java @@ -92,9 +92,9 @@ private void updateStats( } this.sumValue += sumValue; // only if endTime greater or equals to the current endTime need we update the last value - // only if startTime less or equals to the current startTime need we update the first value + // only if startTime less to the current startTime need we update the first value // otherwise, just ignore - if (startTime <= this.getStartTime()) { + if (startTime < this.getStartTime()) { this.firstValue = first; } if (endTime >= this.getEndTime()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/IntegerStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/IntegerStatistics.java index b5cf408be..a3a5d683a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/IntegerStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/IntegerStatistics.java @@ -93,9 +93,9 @@ private void updateStats( } this.sumValue += sumValue; // only if endTime greater or equals to the current endTime need we update the last value - // only if startTime less or equals to the current startTime need we update the first value + // only if startTime less to the current startTime need we update the first value // otherwise, just ignore - if (startTime <= this.getStartTime()) { + if (startTime < this.getStartTime()) { this.firstValue = firstValue; } if (endTime >= this.getEndTime()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/LongStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/LongStatistics.java index 5f3624253..ea3488f8f 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/LongStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/LongStatistics.java @@ -150,9 +150,9 @@ private void updateStats( } this.sumValue += sumValue; // only if endTime greater or equals to the current endTime need we update the last value - // only if startTime less or equals to the current startTime need we update the first value + // only if startTime less to the current startTime need we update the first value // otherwise, just ignore - if (startTime <= this.getStartTime()) { + if (startTime < this.getStartTime()) { this.firstValue = firstValue; } if (endTime >= this.getEndTime()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/StringStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/StringStatistics.java index 5da57af15..c93d15217 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/StringStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/StringStatistics.java @@ -95,7 +95,7 @@ private void updateStats( long startTime, long endTime) { // only if endTime greater or equals to the current endTime need we update the last value - // only if startTime less or equals to the current startTime need we update the first value + // only if startTime less to the current startTime need we update the first value // otherwise, just ignore if (this.minValue.compareTo(minValue) > 0) { this.minValue = minValue; @@ -103,7 +103,7 @@ private void updateStats( if (this.maxValue.compareTo(maxValue) < 0) { this.maxValue = maxValue; } - if (startTime <= this.getStartTime()) { + if (startTime < this.getStartTime()) { this.firstValue = firstValue; } if (endTime >= this.getEndTime()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TableStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TableStatistics.java new file mode 100644 index 000000000..9917a4f68 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TableStatistics.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata.statistics; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class TableStatistics { + private final Map> fieldColumnStatisticsMap = + new TreeMap<>(); + + public void updateStatistics( + String fieldColumnName, Statistics statistics) { + fieldColumnStatisticsMap + .computeIfAbsent(fieldColumnName, k -> Statistics.getStatsByType(statistics.getType())) + .mergeStatistics(statistics); + } + + public int columnCount() { + return fieldColumnStatisticsMap.size(); + } + + public TimeStatistics getTimeStatistics() { + return (TimeStatistics) fieldColumnStatisticsMap.get(TsFileConstant.TIME_COLUMN_ID); + } + + public Statistics getStatistics(String fieldName) { + return fieldColumnStatisticsMap.get(fieldName); + } + + public static TableStatistics deserialize(InputStream inputStream, Set queriedColumns) + throws IOException { + TableStatistics tableStatistics = new TableStatistics(); + List columnNameList = ReadWriteIOUtils.readStringList(inputStream); + List dataTypeList = new ArrayList<>(columnNameList.size()); + List statisticsSizeList = new ArrayList<>(columnNameList.size()); + for (int i = 0; i < columnNameList.size(); i++) { + dataTypeList.add(ReadWriteIOUtils.readDataType(inputStream)); + } + for (int i = 0; i < columnNameList.size(); i++) { + statisticsSizeList.add(ReadWriteForEncodingUtils.readVarInt(inputStream)); + } + + for (int i = 0; i < columnNameList.size(); i++) { + String columnName = columnNameList.get(i); + if (queriedColumns != null) { + if (tableStatistics.columnCount() >= queriedColumns.size()) { + break; + } + if (!queriedColumns.contains(columnName)) { + inputStream.skip(statisticsSizeList.get(i)); + continue; + } + } + Statistics columnStatistics = + Statistics.deserialize(inputStream, dataTypeList.get(i)); + tableStatistics.updateStatistics(columnName, columnStatistics); + } + return tableStatistics; + } + + public static TableStatistics deserialize(ByteBuffer byteBuffer, Set queriedColumns) + throws IOException { + TableStatistics tableStatistics = new TableStatistics(); + List columnNameList = ReadWriteIOUtils.readStringList(byteBuffer); + List dataTypeList = new ArrayList<>(columnNameList.size()); + List statisticsSizeList = new ArrayList<>(columnNameList.size()); + for (int i = 0; i < columnNameList.size(); i++) { + dataTypeList.add(ReadWriteIOUtils.readDataType(byteBuffer)); + } + for (int i = 0; i < columnNameList.size(); i++) { + statisticsSizeList.add(ReadWriteForEncodingUtils.readVarInt(byteBuffer)); + } + + for (int i = 0; i < columnNameList.size(); i++) { + String columnName = columnNameList.get(i); + if (queriedColumns != null) { + if (tableStatistics.columnCount() >= queriedColumns.size()) { + break; + } + if (!queriedColumns.contains(columnName)) { + byteBuffer.position(byteBuffer.position() + statisticsSizeList.get(i)); + continue; + } + } + Statistics columnStatistics = + Statistics.deserialize(byteBuffer, dataTypeList.get(i)); + tableStatistics.updateStatistics(columnName, columnStatistics); + } + return tableStatistics; + } + + public void serializeTo(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(fieldColumnStatisticsMap.size(), outputStream); + for (String fieldName : fieldColumnStatisticsMap.keySet()) { + ReadWriteIOUtils.write(fieldName, outputStream); + } + for (Statistics statistics : fieldColumnStatisticsMap.values()) { + ReadWriteIOUtils.write(statistics.getType(), outputStream); + } + for (Statistics statistics : fieldColumnStatisticsMap.values()) { + ReadWriteForEncodingUtils.writeVarInt(statistics.getSerializedSize(), outputStream); + } + for (Statistics statistics : fieldColumnStatisticsMap.values()) { + statistics.serialize(outputStream); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/ITsFileTableStatisticsReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/ITsFileTableStatisticsReader.java new file mode 100644 index 000000000..168280337 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/ITsFileTableStatisticsReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.file.metadata.statistics.TableStatistics; + +import java.io.IOException; +import java.util.Map; + +public interface ITsFileTableStatisticsReader { + TableStatistics getTableStatistics(String tableName) throws IOException; + + TableStatistics getTableFieldColumnStatistics(String tableName, String... fieldNames) + throws IOException; + + Map getAllTableStatistics() throws IOException; +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index 73b5082a4..b5f5f297a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -101,6 +101,7 @@ import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.TreeMap; @@ -145,6 +146,8 @@ public class TsFileSequenceReader implements AutoCloseable { private EncryptParameter dataEncryptParam = null; + private volatile ITsFileTableStatisticsReader tsFileTableStatisticsReader = null; + /** * Create a file reader of the given file. The reader will read the tail of the file to get the * file metadata size.Then the reader will skip the first @@ -685,14 +688,16 @@ public TimeseriesMetadata readTimeseriesMetadata( } // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList - tsFileInput.position(metadataIndexPair.left.getOffset()); - while (tsFileInput.position() < metadataIndexPair.right) { - try { - timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(tsFileInput, true)); - } catch (Exception e1) { - logger.error( - "Something error happened while deserializing TimeseriesMetadata of file {}", file); - throw e1; + synchronized (this) { + tsFileInput.position(metadataIndexPair.left.getOffset()); + while (tsFileInput.position() < metadataIndexPair.right) { + try { + timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(tsFileInput, true)); + } catch (Exception e1) { + logger.error( + "Something error happened while deserializing TimeseriesMetadata of file {}", file); + throw e1; + } } } } @@ -1540,7 +1545,7 @@ private void generateMetadataIndexUsingTsFileInput( needChunkMetadata); } - private void generateMetadataIndexUsingTsFileInput( + private synchronized void generateMetadataIndexUsingTsFileInput( IMetadataIndexEntry metadataIndex, long start, long end, @@ -2732,6 +2737,33 @@ public List getChunkMetadataList(Path path) throws IOException { return getChunkMetadataList(path, true); } + public TsFileInput getTsFileInput() { + return tsFileInput; + } + + public boolean hasTableStatistics() throws IOException { + return readFileMetadata().getTableStatisticsOffset().isPresent(); + } + + public Optional getTsFileTableStatisticsReader() + throws IOException { + TsFileMetadata tsFileMetadata = readFileMetadata(); + Optional tableStatisticsBlockOffset = tsFileMetadata.getTableStatisticsOffset(); + if (!tableStatisticsBlockOffset.isPresent()) { + return Optional.empty(); + } + + if (tsFileTableStatisticsReader == null) { + synchronized (this) { + if (tsFileTableStatisticsReader == null) { + tsFileTableStatisticsReader = + new TsFileTableStatisticsReader(this, tableStatisticsBlockOffset.get()); + } + } + } + return Optional.of(this.tsFileTableStatisticsReader); + } + /** * Get AlignedChunkMetadata of sensors under one device. Notice: if all the value chunks is empty * chunk, then return empty list. diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileTableStatisticsReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileTableStatisticsReader.java new file mode 100644 index 000000000..a1b05525e --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileTableStatisticsReader.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.file.metadata.statistics.TableStatistics; +import org.apache.tsfile.read.reader.TsFileInput; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class TsFileTableStatisticsReader implements ITsFileTableStatisticsReader { + + private static final String treeModelStartStr = + TsFileConstant.PATH_ROOT + TsFileConstant.PATH_SEPARATOR; + private final TsFileSequenceReader reader; + private final List tableStatisticOffsets; + private final List tableStatisticSizes; + private final boolean hasTableAndTreeData; + + public TsFileTableStatisticsReader(TsFileSequenceReader reader, long tableStatisticsBlockOffset) + throws IOException { + this.reader = reader; + int tableNum = reader.readFileMetadata().getTableSchemaNum(); + this.hasTableAndTreeData = + tableNum != reader.readFileMetadata().getTableMetadataIndexNodeMap().size(); + this.tableStatisticOffsets = new ArrayList<>(tableNum); + this.tableStatisticSizes = new ArrayList<>(tableNum); + ByteBuffer offsetListBuffer = + reader.readData(tableStatisticsBlockOffset, tableNum * Long.BYTES * 2); + for (int i = 0; i < tableNum; i++) { + this.tableStatisticOffsets.add(ReadWriteIOUtils.readLong(offsetListBuffer)); + } + for (int i = 0; i < tableNum; i++) { + this.tableStatisticSizes.add(ReadWriteIOUtils.readLong(offsetListBuffer)); + } + } + + public TableStatistics getTableStatistics(String tableName) throws IOException { + Optional tableIndex = findTableIndex(tableName); + if (!tableIndex.isPresent()) { + return null; + } + int index = tableIndex.get(); + return readTableStatistics( + tableStatisticOffsets.get(index), tableStatisticSizes.get(index), null); + } + + public TableStatistics getTableFieldColumnStatistics(String tableName, String... fieldNames) + throws IOException { + Optional tableIndex = findTableIndex(tableName); + if (!tableIndex.isPresent()) { + return null; + } + int index = tableIndex.get(); + Set queriedColumns = new HashSet<>(fieldNames.length + 1); + queriedColumns.add(TsFileConstant.TIME_COLUMN_ID); + queriedColumns.addAll(Arrays.asList(fieldNames)); + return readTableStatistics( + tableStatisticOffsets.get(index), tableStatisticSizes.get(index), queriedColumns); + } + + public Map getAllTableStatistics() throws IOException { + Map tableStatisticsMap = new LinkedHashMap<>(); + int i = 0; + for (String tableName : reader.tsFileMetaData.getTableMetadataIndexNodeMap().keySet()) { + if (hasTableAndTreeData + && tableName.startsWith(TsFileConstant.PATH_ROOT + TsFileConstant.PATH_SEPARATOR)) { + continue; + } + long offset = tableStatisticOffsets.get(i); + long size = tableStatisticSizes.get(i++); + tableStatisticsMap.put(tableName, readTableStatistics(offset, size, null)); + } + return tableStatisticsMap; + } + + private Optional findTableIndex(String tableName) { + int index = 0; + boolean found = false; + for (String key : reader.tsFileMetaData.getTableMetadataIndexNodeMap().keySet()) { + if (hasTableAndTreeData + && key.startsWith(TsFileConstant.PATH_ROOT + TsFileConstant.PATH_SEPARATOR)) { + continue; + } + if (key.equals(tableName)) { + found = true; + break; + } + index++; + } + if (!found) { + return Optional.empty(); + } + return Optional.of(index); + } + + private TableStatistics readTableStatistics( + long tableStatisticsOffset, long length, Set queriedColumns) throws IOException { + if (length > Integer.MAX_VALUE) { + synchronized (reader) { + TsFileInput tsFileInput = reader.getTsFileInput(); + tsFileInput.position(tableStatisticsOffset + Long.BYTES); + InputStream inputStream = tsFileInput.wrapAsInputStream(); + return TableStatistics.deserialize(inputStream, queriedColumns); + } + } else { + ByteBuffer contentBuffer = + reader.readData(tableStatisticsOffset, tableStatisticsOffset + length); + return TableStatistics.deserialize(contentBuffer, queriedColumns); + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 96cc383e6..af30f4c5a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -41,6 +41,7 @@ import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.file.metadata.statistics.TableStatistics; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.Path; @@ -58,6 +59,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; import java.util.ArrayDeque; import java.util.ArrayList; @@ -474,6 +476,9 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { TSMIterator tsmIterator = getTSMIterator(); Map deviceMetadataIndexMap = new TreeMap<>(); Queue measurementMetadataIndexQueue = new ArrayDeque<>(); + String prevTableName = null; + TreeMap> tableStatisticsMap = new TreeMap<>(); + TableStatistics currentTableStatistics = new TableStatistics(); IDeviceID currentDevice = null; IDeviceID prevDevice = null; Path currentPath = null; @@ -495,6 +500,7 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { filter.add(currentPath); // construct the index tree node for the series currentDevice = currentPath.getIDeviceID(); + boolean isTableModel = currentDevice.isTableModel(); if (!currentDevice.equals(prevDevice)) { if (prevDevice != null) { addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); @@ -503,6 +509,19 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { generateRootNode( measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + + String currentTableName = isTableModel ? currentDevice.getTableName() : null; + if (!Objects.equals(currentTableName, prevTableName)) { + if (prevTableName != null) { + long statisticsStartPosition = out.getPosition(); + currentTableStatistics.serializeTo(out.wrapAsStream()); + currentTableStatistics = new TableStatistics(); + tableStatisticsMap.put( + prevTableName, + new Pair<>(statisticsStartPosition, out.getPosition() - statisticsStartPosition)); + } + prevTableName = currentTableName; + } } measurementMetadataIndexQueue = new ArrayDeque<>(); seriesIdxForCurrDevice = 0; @@ -525,6 +544,10 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { seriesIdxForCurrDevice++; // serialize the timeseries metadata to file timeseriesMetadata.serializeTo(out.wrapAsStream()); + if (isTableModel) { + currentTableStatistics.updateStatistics( + timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getStatistics()); + } } addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); @@ -533,6 +556,15 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { prevDevice, generateRootNode( measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); + + prevTableName = prevDevice.isTableModel() ? prevDevice.getTableName() : null; + if (prevTableName != null) { + long statisticsStartPosition = out.getPosition(); + currentTableStatistics.serializeTo(out.wrapAsStream()); + tableStatisticsMap.put( + prevTableName, + new Pair<>(statisticsStartPosition, out.getPosition() - statisticsStartPosition)); + } } Map> tableDeviceNodesMap = @@ -553,6 +585,12 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { tsFileMetadata.addProperty("encryptType", encryptType); tsFileMetadata.addProperty("encryptKey", encryptKey); + if (!tableStatisticsMap.isEmpty()) { + long position = out.getPosition(); + serializeTableStatisticsBlock(tableStatisticsMap); + tsFileMetadata.addProperty("tableStatisticsOffset", position + ""); + } + int size = tsFileMetadata.serializeTo(out.wrapAsStream()); // write TsFileMetaData size @@ -566,6 +604,17 @@ protected TSMIterator getTSMIterator() throws IOException { : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList); } + private void serializeTableStatisticsBlock( + TreeMap> statisticsOffsetSizeMap) throws IOException { + OutputStream outputStream = out.wrapAsStream(); + for (Pair pair : statisticsOffsetSizeMap.values()) { + ReadWriteIOUtils.write(pair.left, outputStream); + } + for (Pair pair : statisticsOffsetSizeMap.values()) { + ReadWriteIOUtils.write(pair.right, outputStream); + } + } + /** * get the length of normal OutputStream. * diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java index 5dd1a096c..4340633fe 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java @@ -28,7 +28,7 @@ import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.utils.Pair; -import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.utils.TsFileGeneratorForTest; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.apache.tsfile.write.writer.TsFileIOWriter; @@ -39,7 +39,6 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -112,30 +111,9 @@ private void generateDevice(TsFileIOWriter writer, String tableName, int deviceN IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(new String[] {tableName, "d" + i}); writer.startChunkGroup(deviceID); - generateSimpleAlignedSeriesToCurrentDevice( + TsFileGeneratorForTest.generateSimpleInt64AlignedSeriesToCurrentDevice( writer, Arrays.asList("s1", "s2", "s3", "s4"), new TimeRange[] {new TimeRange(10, 20)}); writer.endChunkGroup(); } } - - public void generateSimpleAlignedSeriesToCurrentDevice( - TsFileIOWriter writer, List measurementNames, TimeRange[] toGenerateChunkTimeRanges) - throws IOException { - List measurementSchemas = new ArrayList<>(); - for (String measurementName : measurementNames) { - measurementSchemas.add( - new MeasurementSchema( - measurementName, TSDataType.INT64, TSEncoding.RLE, CompressionType.LZ4)); - } - for (TimeRange toGenerateChunk : toGenerateChunkTimeRanges) { - AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemas); - for (long time = toGenerateChunk.getMin(); time <= toGenerateChunk.getMax(); time++) { - alignedChunkWriter.getTimeChunkWriter().write(time); - for (int i = 0; i < measurementNames.size(); i++) { - alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time, false); - } - } - alignedChunkWriter.writeToFileWriter(writer); - } - } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTableStatisticsTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTableStatisticsTest.java new file mode 100644 index 000000000..b35e877ab --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTableStatisticsTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.constant.TestConstant; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ColumnSchema; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.file.metadata.statistics.LongStatistics; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.file.metadata.statistics.TableStatistics; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.TsFileGeneratorForTest; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class TsFileTableStatisticsTest { + + private static final String FILE_PATH = + TestConstant.BASE_OUTPUT_PATH.concat("TsFileTableStatisticsTest.tsfile"); + + @After + public void teardown() { + new File(FILE_PATH).delete(); + } + + @Test + public void testTableModelTsFile() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH))) { + for (int i = 1; i <= 10; i++) { + String tableName = "table" + i; + registerTableSchema(writer, tableName, i); + int deviceNum = i; + if (i % 2 == 0) { + deviceNum *= 10000; + } else { + deviceNum *= 10; + } + generateDevice(writer, tableName, deviceNum, i); + } + writer.endFile(); + } + try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) { + Assert.assertTrue(reader.hasTableStatistics()); + Optional optional = reader.getTsFileTableStatisticsReader(); + Assert.assertTrue(optional.isPresent()); + ITsFileTableStatisticsReader tsFileTableStatisticsReader = optional.get(); + Map allTableStatistics = + tsFileTableStatisticsReader.getAllTableStatistics(); + Assert.assertEquals(10, allTableStatistics.size()); + for (int i = 1; i <= 10; i++) { + String tableName = "table" + i; + TableStatistics tableStatistics = allTableStatistics.get(tableName); + int deviceNum = i; + if (i % 2 == 0) { + deviceNum *= 10000; + } else { + deviceNum *= 10; + } + Statistics timeStatistics = tableStatistics.getStatistics(""); + Assert.assertEquals(deviceNum, timeStatistics.getCount()); + Assert.assertEquals(0L, timeStatistics.getMinValue()); + Assert.assertEquals((long) deviceNum - 1, timeStatistics.getMaxValue()); + Assert.assertEquals(0L, timeStatistics.getStartTime()); + Assert.assertEquals(deviceNum - 1, timeStatistics.getEndTime()); + int measurementNum = i; + for (int j = 0; j < measurementNum; j++) { + String measurement = "s" + j; + Statistics fieldStatistics = + tableStatistics.getStatistics(measurement); + Assert.assertEquals(deviceNum, fieldStatistics.getCount()); + Assert.assertEquals(0L, fieldStatistics.getMinValue()); + Assert.assertEquals((long) deviceNum - 1, fieldStatistics.getMaxValue()); + Assert.assertEquals(0L, fieldStatistics.getStartTime()); + Assert.assertEquals(deviceNum - 1, fieldStatistics.getEndTime()); + Assert.assertEquals(0L, fieldStatistics.getFirstValue()); + Assert.assertEquals((long) deviceNum - 1, fieldStatistics.getLastValue()); + } + } + } + } + + @Test + public void testTableAndTreeTsFile() throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(new File(FILE_PATH))) { + writer.registerTimeseries("root.test.d1", new MeasurementSchema("s1", TSDataType.INT64)); + writer.registerTableSchema( + new TableSchema( + "t1", + Arrays.asList( + new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("s1", TSDataType.INT64, ColumnCategory.FIELD), + new ColumnSchema("s2", TSDataType.INT64, ColumnCategory.FIELD)))); + Tablet treeTablet = + new Tablet( + "root.test.d1", + Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT64))); + treeTablet.addTimestamp(0, 1); + treeTablet.addValue("s1", 0, 1L); + writer.writeTree(treeTablet); + Tablet tableTablet = + new Tablet( + "t1", + Arrays.asList("device", "s1", "s2"), + Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.INT64), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD)); + tableTablet.addTimestamp(0, 1); + tableTablet.addValue("device", 0, new Binary("d1", TSFileConfig.STRING_CHARSET)); + tableTablet.addValue("s1", 0, 2L); + tableTablet.addValue("s2", 0, 3L); + writer.writeTable(tableTablet); + writer.flush(); + } + try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) { + Assert.assertTrue(reader.hasTableStatistics()); + Optional optional = reader.getTsFileTableStatisticsReader(); + Assert.assertTrue(optional.isPresent()); + ITsFileTableStatisticsReader tsFileTableStatisticsReader = optional.get(); + Statistics timeStatistics = + tsFileTableStatisticsReader.getTableStatistics("t1").getTimeStatistics(); + Assert.assertEquals(1, timeStatistics.getCount()); + Assert.assertEquals(1, timeStatistics.getStartTime()); + Assert.assertEquals(1, timeStatistics.getEndTime()); + LongStatistics s1Statistics = + (LongStatistics) + tsFileTableStatisticsReader + .getTableFieldColumnStatistics("t1", "s2") + .getStatistics("s2"); + Assert.assertEquals(1, s1Statistics.getCount()); + Assert.assertEquals(3L, s1Statistics.getMinValue().longValue()); + Assert.assertEquals(3L, s1Statistics.getMaxValue().longValue()); + Assert.assertNull( + tsFileTableStatisticsReader + .getTableFieldColumnStatistics("t1", "s2") + .getStatistics("s1")); + + Map allTableStatistics = + tsFileTableStatisticsReader.getAllTableStatistics(); + Assert.assertEquals(1, allTableStatistics.size()); + } + } + + private void generateDevice( + TsFileIOWriter writer, String tableName, int deviceNum, int measurementNum) + throws IOException { + List measurements = new ArrayList<>(measurementNum); + for (int i = 0; i < measurementNum; i++) { + measurements.add("s" + i); + } + for (int i = 0; i < deviceNum; i++) { + IDeviceID deviceID = + IDeviceID.Factory.DEFAULT_FACTORY.create(new String[] {tableName, "d" + i}); + writer.startChunkGroup(deviceID); + TsFileGeneratorForTest.generateSimpleInt64AlignedSeriesToCurrentDevice( + writer, measurements, new TimeRange[] {new TimeRange(i, i)}); + writer.endChunkGroup(); + } + } + + private void registerTableSchema(TsFileIOWriter writer, String tableName, int measurementNum) { + List schemaList = new ArrayList<>(measurementNum + 1); + schemaList.add( + new MeasurementSchema( + "id", TSDataType.STRING, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED)); + List columnCategories = new ArrayList<>(measurementNum + 1); + columnCategories.add(ColumnCategory.TAG); + for (int i = 0; i < measurementNum; i++) { + schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64)); + columnCategories.add(ColumnCategory.FIELD); + } + TableSchema tableSchema = new TableSchema(tableName, schemaList, columnCategories); + writer.getSchema().registerTableSchema(tableSchema); + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/utils/TsFileGeneratorForTest.java b/java/tsfile/src/test/java/org/apache/tsfile/utils/TsFileGeneratorForTest.java index 4ec6870ed..fa05910d1 100755 --- a/java/tsfile/src/test/java/org/apache/tsfile/utils/TsFileGeneratorForTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/utils/TsFileGeneratorForTest.java @@ -29,11 +29,14 @@ import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.record.TSRecord; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.apache.tsfile.write.schema.Schema; +import org.apache.tsfile.write.writer.TsFileIOWriter; import org.junit.Assert; import org.junit.Ignore; @@ -330,6 +333,27 @@ public static void generateAlignedTsFile(int rowCount, int chunkGroupSize, int p } } + public static void generateSimpleInt64AlignedSeriesToCurrentDevice( + TsFileIOWriter writer, List measurementNames, TimeRange[] toGenerateChunkTimeRanges) + throws IOException { + List measurementSchemas = new ArrayList<>(); + for (String measurementName : measurementNames) { + measurementSchemas.add( + new MeasurementSchema( + measurementName, TSDataType.INT64, TSEncoding.RLE, CompressionType.LZ4)); + } + for (TimeRange toGenerateChunk : toGenerateChunkTimeRanges) { + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemas); + for (long time = toGenerateChunk.getMin(); time <= toGenerateChunk.getMax(); time++) { + alignedChunkWriter.getTimeChunkWriter().write(time); + for (int i = 0; i < measurementNames.size(); i++) { + alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time, false); + } + } + alignedChunkWriter.writeToFileWriter(writer); + } + } + public static void closeAlignedTsFile() { File file = fsFactory.getFile(alignedOutputDataFile); if (file.exists()) {