Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ public EdgeManagerPluginDescriptor getEdgeManagerDescriptor() {
return edgeManagerDescriptor;
}

/**
* Returns a new EdgeProperty with the given EdgeManagerPluginDescriptor.
*/
public EdgeProperty withDescriptor(EdgeManagerPluginDescriptor newDescriptor) {
return new EdgeProperty(newDescriptor, this.dataMovementType, this.dataSourceType,
this.schedulingType, this.outputDescriptor, this.inputDescriptor);
}

@Override
public String toString() {
return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1998,7 +1998,12 @@ private void setParallelismWrapper(int parallelism, VertexLocationHint vertexLoc
Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
Edge edge = sourceVertices.get(sourceVertex);
try {
edge.setEdgeProperty(entry.getValue());
if (edge != null) {
edge.setEdgeProperty(entry.getValue());
} else {
LOG.warn("Edge is null, sourceVertex = {}, entry.getValue() = {}",
sourceVertex, entry.getValue());
}
} catch (Exception e) {
throw new TezUncheckedException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPluginContext;
Expand All @@ -52,6 +53,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;

/**
* Starts scheduling tasks when number of completed source tasks crosses
Expand Down Expand Up @@ -520,6 +522,30 @@ ReconfigVertexParams computeRouting() {
for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
entry.getValue().newDescriptor = descriptor;
}

// Additionally, update custom edges.
Map<String, EdgeProperty> outputEdges = getContext().getOutputVertexEdgeProperties();
Map<String, EdgeProperty> updatedEdges = new HashMap<>();
for (Map.Entry<String, EdgeProperty> entry : outputEdges.entrySet()) {
if (entry.getValue().getDataMovementType() == EdgeProperty.DataMovementType.CUSTOM) {
// Build a new custom edge manager configuration with updated parallelism.
CustomShuffleEdgeManagerConfig customConfig = new CustomShuffleEdgeManagerConfig(
currentParallelism, finalTaskParallelism, basePartitionRange,
(remainderRangeForLastShuffler > 0 ? remainderRangeForLastShuffler : basePartitionRange));
EdgeManagerPluginDescriptor newDescriptor = EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName());
newDescriptor.setUserPayload(customConfig.toUserPayload());

// Update the EdgeProperty with the new descriptor.
EdgeProperty updatedProp = entry.getValue().withDescriptor(newDescriptor);
updatedEdges.put(entry.getKey(), updatedProp);
}
}

// If any custom edges were updated, propagate the new configuration.
if (!updatedEdges.isEmpty()) {
getContext().reconfigureVertex(finalTaskParallelism, null, updatedEdges);
}

ReconfigVertexParams params =
new ReconfigVertexParams(finalTaskParallelism, null);
return params;
Expand Down