Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,26 @@ protected Transformation<Object> translateToPlanInternal(
}

@Override
protected RowType getPhysicalRowType(ResolvedSchema schema) {
protected final RowType getInputRowType() {
// row-level modification may only write partial columns,
// so we try to prune the RowType to get the real RowType containing
// the physical columns to be written
if (tableSinkSpec.getSinkAbilities() != null) {
final ResolvedSchema schema =
tableSinkSpec.getContextResolvedTable().getResolvedSchema();
for (SinkAbilitySpec sinkAbilitySpec : tableSinkSpec.getSinkAbilities()) {
if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
RowLevelUpdateSpec rowLevelUpdateSpec = (RowLevelUpdateSpec) sinkAbilitySpec;
return getPhysicalRowType(
return getPersistedRowType(
schema, rowLevelUpdateSpec.getRequiredPhysicalColumnIndices());
} else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
RowLevelDeleteSpec rowLevelDeleteSpec = (RowLevelDeleteSpec) sinkAbilitySpec;
return getPhysicalRowType(
return getPersistedRowType(
schema, rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());
}
}
}
return (RowType) schema.toPhysicalRowDataType().getLogicalType();
return (RowType) getInputEdges().get(0).getOutputType();
}

@Override
Expand Down Expand Up @@ -183,7 +185,7 @@ protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema)
}

/** Get the physical row type with given column indices. */
private RowType getPhysicalRowType(ResolvedSchema schema, int[] columnIndices) {
private RowType getPersistedRowType(ResolvedSchema schema, int[] columnIndices) {
List<Column> columns = schema.getColumns();
List<Column> requireColumns = new ArrayList<>();
for (int columnIndex : columnIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ protected Transformation<Object> createSinkTransformation(
tableSink.getSinkRuntimeProvider(
new SinkRuntimeProviderContext(
isBounded, tableSinkSpec.getTargetColumns()));
final RowType physicalRowType = getPhysicalRowType(schema);
final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema);
final RowType inputRowType = getInputRowType();
final int[] primaryKeys = getPrimaryKeyIndices(inputRowType, schema);
final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider);
sinkParallelismConfigured = isParallelismConfigured(runtimeProvider);
final int inputParallelism = inputTransform.getParallelism();
Expand Down Expand Up @@ -190,7 +190,7 @@ protected Transformation<Object> createSinkTransformation(
final boolean needMaterialization = !inputInsertOnly && upsertMaterialize;

Transformation<RowData> sinkTransform =
applyConstraintValidations(inputTransform, config, physicalRowType);
applyConstraintValidations(inputTransform, config, inputRowType);

if (hasPk) {
sinkTransform =
Expand All @@ -212,7 +212,7 @@ protected Transformation<Object> createSinkTransformation(
sinkParallelism,
config,
classLoader,
physicalRowType,
inputRowType,
inputUpsertKey);
}

Expand Down Expand Up @@ -542,9 +542,7 @@ protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema)
.orElse(new int[0]);
}

protected RowType getPhysicalRowType(ResolvedSchema schema) {
return (RowType) schema.toPhysicalRowDataType().getLogicalType();
}
protected abstract RowType getInputRowType();

/**
* Get the target row-kind that the row data should change to, assuming the current row kind is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ protected Transformation<RowData> applyUpsertMaterialize(
return materializeTransform;
}

@Override
protected RowType getInputRowType() {
return (RowType) getInputEdges().get(0).getOutputType();
}

private OneInputStreamOperator<RowData, RowData> createSumOperator(
ExecNodeConfig config,
RowType physicalRowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class SinkSemanticTests extends SemanticTestBase {
public List<TableTestProgram> programs() {
return List.of(
SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK,
SinkTestPrograms.INSERT_RETRACT_WITH_PK);
SinkTestPrograms.INSERT_RETRACT_WITH_PK,
SinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY;

/** Tests for verifying sink semantics. */
public class SinkTestPrograms {

Expand Down Expand Up @@ -86,4 +90,56 @@ public class SinkTestPrograms {
.runSql(
"INSERT INTO sink_t SELECT UPPER(name), SUM(score) FROM source_t GROUP BY name")
.build();

public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA =
TableTestProgram.of(
"insert-into-upsert-with-sink-upsert-materializer-writable-metadata",
"The query requires a sink upsert materializer and the sink"
+ " uses writable metadata columns. The scenario showcases a"
+ " bug where a wrong type was used in sinks which did not"
+ " consider metadata columns. There needs to be multiple"
+ " requirements for the bug to show up. 1. We need to use "
+ " rocksdb, so that we use a serializer when putting records"
+ " into state in SinkUpsertMaterializer. 2. We need to retract"
+ " to a previous value taken from the state, otherwise we"
+ " forward the incoming record. 3. There need to be persisted"
+ " metadata columns.")
.setupConfig(
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY,
SinkUpsertMaterializeStrategy.LEGACY)
.setupConfig(STATE_BACKEND, "rocksdb")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("name STRING", "score INT")
.addOption("changelog-mode", "I,UB,UA,D")
.producedValues(
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.INSERT, "Bob", 6),
// retract the last record, which should roll back to
// the previous state
Row.ofKind(RowKind.DELETE, "Bob", 6))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema(
"name STRING PRIMARY KEY NOT ENFORCED",
"score BIGINT",
"scoreMetadata BIGINT METADATA",
"nameMetadata STRING METADATA")
.addOption("sink-changelog-mode-enforced", "I,UA,D")
// The test sink lists metadata columns
// (SupportsWritingMetadata#listWritableMetadata) in
// alphabetical order, this is also the order in the record of
// a sink, irrespective of the table schema
.addOption(
"writable-metadata",
"nameMetadata:STRING,scoreMetadata:BIGINT")
.consumedValues(
"+I[BOB, 5, Bob, 5]",
"+U[BOB, 6, Bob, 6]",
"+U[BOB, 5, Bob, 5]")
.build())
.runSql(
"INSERT INTO sink_t SELECT UPPER(name), score, score, name FROM source_t")
.build();
}