From ed60b6b8f18a3db5c41ebac4c7b61698cad843f0 Mon Sep 17 00:00:00 2001 From: Dong Li Date: Mon, 7 Jul 2025 12:12:29 +1000 Subject: [PATCH 1/2] Fix the kerberos issue when there is big DAG plan using HDFS --- ...GClientAMProtocolBlockingPBServerImpl.java | 46 ++++++++++--- ...GClientAMProtocolBlockingPBServerImpl.java | 67 +++++++++++++++++++ 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java index 72cf0d5642..00ddbc594e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.security.AccessControlException; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Map; @@ -55,18 +56,33 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProtocolBlockingPB { + private static final Logger LOG = LoggerFactory.getLogger(DAGClientAMProtocolBlockingPBServerImpl.class); + DAGClientHandler real; final FileSystem stagingFs; + UserGroupInformation amUGI = null; + UserGroupInformation rpcUGI = null; + public DAGClientAMProtocolBlockingPBServerImpl(DAGClientHandler real, FileSystem stagingFs) { this.real = real; this.stagingFs = stagingFs; + try{ + amUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + //We do not throw exception because maybe this will not be used if there is no big request + LOG.error("Exception while getting current user", e); + } } private UserGroupInformation getRPCUser() throws ServiceException { + //should always be null exception reflect in unit test + if (rpcUGI != null) return rpcUGI; try { return UserGroupInformation.getCurrentUser(); } catch (IOException e) { @@ -164,17 +180,24 @@ public SubmitDAGResponseProto submitDAG(RpcController controller, real.updateLastHeartbeatTime(); try{ if (request.hasSerializedRequestPath()) { - // need to deserialize large request from hdfs + //Here we will check userGroupInformation to see if its null, should NEVER happened but in case happened + //we will use RPC user instead to do best effort try (May still fail but no other choice) + UserGroupInformation userToReadHDFS = amUGI == null? user : amUGI; Path requestPath = new Path(request.getSerializedRequestPath()); - try (FSDataInputStream fsDataInputStream = stagingFs.open(requestPath)) { - CodedInputStream in = - CodedInputStream.newInstance(fsDataInputStream); - in.setSizeLimit(Integer.MAX_VALUE); - request = SubmitDAGRequestProto.parseFrom(in); - } catch (IOException e) { - throw wrapException(e); - } + LOG.debug("Using the user {} to get the DAG plan from HDFS", userToReadHDFS); + + request = userToReadHDFS.doAs((PrivilegedExceptionAction) () -> { + FileSystem fs = requestPath.getFileSystem(stagingFs.getConf()); + try (FSDataInputStream fsDataInputStream = fs.open(requestPath)) { + CodedInputStream in = CodedInputStream.newInstance(fsDataInputStream); + in.setSizeLimit(Integer.MAX_VALUE); + return SubmitDAGRequestProto.parseFrom(in); + } catch (IOException e) { + throw wrapException(e); + } + }); } + DAGPlan dagPlan = request.getDAGPlan(); Map additionalResources = null; if (request.hasAdditionalAmResources()) { @@ -183,7 +206,10 @@ public SubmitDAGResponseProto submitDAG(RpcController controller, } String dagId = real.submitDAG(dagPlan, additionalResources); return SubmitDAGResponseProto.newBuilder().setDagId(dagId).build(); - } catch(TezException e) { + } catch(IOException | TezException e) { + throw wrapException(e); + } catch (InterruptedException e){ + Thread.currentThread().interrupt(); throw wrapException(e); } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java index 040ca2fb7b..6bfff63eb1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java @@ -133,4 +133,71 @@ public void testSubmitDagInSessionWithLargeDagPlan() throws Exception { assertEquals(lrURL.getPort(), port); assertEquals(lrURL.getFile(), path); } + + @Test(timeout = 5000) + public void testSubmitDAGUserGroupInformation() throws Exception { + // Create a simple DAG plan and write it to a file + String dagPlanName = "test-dag"; + File requestFile = tmpFolder.newFile("request-file"); + TezConfiguration conf = new TezConfiguration(); + + DAGPlan dagPlan = DAG.create(dagPlanName) + .addVertex(Vertex.create("V", ProcessorDescriptor.create("P"), 1)) + .createDag(conf, null, null, null, false); + + // Write DAG plan to file + try (FileOutputStream fileOutputStream = new FileOutputStream(requestFile)) { + SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build().writeTo(fileOutputStream); + } + + // Setup mocks + DAGClientHandler dagClientHandler = mock(DAGClientHandler.class); + ACLManager aclManager = mock(ACLManager.class); + FileSystem mockFs = mock(FileSystem.class); + UserGroupInformation mockAmUgi = mock(UserGroupInformation.class); + UserGroupInformation mockRpcUgi = mock(UserGroupInformation.class); + + // DAG request with file + SubmitDAGRequestProto request = SubmitDAGRequestProto.newBuilder() + .setSerializedRequestPath(requestFile.getAbsolutePath()) + .build(); + + when(mockAmUgi.doAs(any(java.security.PrivilegedExceptionAction.class))).thenReturn(request); + when(mockRpcUgi.doAs(any(java.security.PrivilegedExceptionAction.class))).thenReturn(request); + + // Create spy on server impl with mocked FileSystem + DAGClientAMProtocolBlockingPBServerImpl serverImpl = spy(new DAGClientAMProtocolBlockingPBServerImpl( + dagClientHandler, mockFs)); + + // Mock behavior + when(dagClientHandler.getACLManager()).thenReturn(aclManager); + when(aclManager.checkAMModifyAccess(any(UserGroupInformation.class))).thenReturn(true); + when(dagClientHandler.submitDAG(any(DAGPlan.class), any())).thenReturn("dag-id"); + when(mockFs.getConf()).thenReturn(conf); + + //Set the RPC UGI + java.lang.reflect.Field rpcUGIField = DAGClientAMProtocolBlockingPBServerImpl.class.getDeclaredField("rpcUGI"); + rpcUGIField.setAccessible(true); + rpcUGIField.set(serverImpl, mockRpcUgi); + + // Test Case 1: When amUGI is available + // Set the amUGI field using reflection + java.lang.reflect.Field amUGIField = DAGClientAMProtocolBlockingPBServerImpl.class.getDeclaredField("amUGI"); + amUGIField.setAccessible(true); + amUGIField.set(serverImpl, mockAmUgi); + + serverImpl.submitDAG(null, request); + + // Verify amUGI was used for doAs + verify(mockAmUgi).doAs(any(java.security.PrivilegedExceptionAction.class)); + + // Test Case 2: When amUGI is null + // Set amUGI to null + amUGIField.set(serverImpl, null); + + // Submit DAG with serialized path + serverImpl.submitDAG(null, request); + // Verify RPC user (mockRpcUgi) was used for doAs + verify(mockRpcUgi).doAs(any(java.security.PrivilegedExceptionAction.class)); + } } From a926b41971adf5d17277d85c4af80013d245e60e Mon Sep 17 00:00:00 2001 From: alexdongli0829 <40448063+alexdongli0829@users.noreply.github.com> Date: Mon, 7 Jul 2025 13:31:53 +1000 Subject: [PATCH 2/2] Update TestDAGClientAMProtocolBlockingPBServerImpl.java Remove the space on line 146 --- .../client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java index 8861869385..807ba77129 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java @@ -143,7 +143,7 @@ public void testSubmitDAGUserGroupInformation() throws Exception { String dagPlanName = "test-dag"; File requestFile = tmpFolder.newFile("request-file"); TezConfiguration conf = new TezConfiguration(); - + DAGPlan dagPlan = DAG.create(dagPlanName) .addVertex(Vertex.create("V", ProcessorDescriptor.create("P"), 1)) .createDag(conf, null, null, null, false);