diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index 00cd26e2ce..3d9bb6ae0d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -893,28 +893,29 @@ public static class StaticAMView extends View { @Override public void render() { response().setContentType(MimeType.HTML); - PrintWriter pw = writer(); - pw.write(""); - pw.write(""); - pw.write(""); - pw.write("Redirecting to Tez UI"); - pw.write(""); - pw.write(""); - if (historyUrl == null || historyUrl.isEmpty()) { - pw.write("

Tez UI Url is not defined.

" + - "

To enable tracking url pointing to Tez UI, set the config " + - TezConfiguration.TEZ_HISTORY_URL_BASE + " in the tez-site.xml.

"); - } else { - pw.write("

Redirecting to Tez UI

.

If you are not redirected shortly, click " + - "here

" - ); - pw.write(""); + try(PrintWriter pw = writer()) { + pw.write(""); + pw.write(""); + pw.write(""); + pw.write("Redirecting to Tez UI"); + pw.write(""); + pw.write(""); + if (historyUrl == null || historyUrl.isEmpty()) { + pw.write("

Tez UI Url is not defined.

" + + "

To enable tracking url pointing to Tez UI, set the config " + + TezConfiguration.TEZ_HISTORY_URL_BASE + " in the tez-site.xml.

"); + } else { + pw.write("

Redirecting to Tez UI

.

If you are not redirected shortly, click " + + "here

" + ); + pw.write(""); + } + pw.write(""); + pw.write(""); + pw.flush(); } - pw.write(""); - pw.write(""); - pw.flush(); } } diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java index 5c99f3efbf..ee18977ede 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java @@ -130,12 +130,13 @@ private DAG createDAG(FileSystem fs, TezConfiguration tezConf, int numBroadcastTasks = 2; int numOneToOneTasks = 3; + int numNMs; if (doLocalityCheck) { - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(tezConf); - yarnClient.start(); - int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size(); - yarnClient.stop(); + try (YarnClient yarnClient = YarnClient.createYarnClient()) { + yarnClient.init(tezConf); + yarnClient.start(); + numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size(); + } // create enough 1-1 tasks to run in parallel numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM if (numOneToOneTasks < 1) { diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java index 92be836375..e1dd968f90 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java @@ -82,92 +82,95 @@ static int printUsage() { @SuppressWarnings("deprecation") public int run(String[] args) throws Exception { Configuration conf = getConf(); - JobClient client = new JobClient(conf); - ClusterStatus cluster = client.getClusterStatus(); - int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); - String join_reduces = conf.get(REDUCES_PER_HOST); - if (join_reduces != null) { - num_reduces = cluster.getTaskTrackers() * - Integer.parseInt(join_reduces); - } - Job job = new Job(conf); - job.setJobName("join"); - job.setJarByClass(Sort.class); - - job.setMapperClass(Mapper.class); - job.setReducerClass(Reducer.class); - - Class inputFormatClass = - SequenceFileInputFormat.class; - Class outputFormatClass = - SequenceFileOutputFormat.class; - Class outputKeyClass = BytesWritable.class; - Class outputValueClass = TupleWritable.class; - String op = "inner"; - List otherArgs = new ArrayList(); - for(int i=0; i < args.length; ++i) { - try { - if ("-r".equals(args[i])) { - num_reduces = Integer.parseInt(args[++i]); - } else if ("-inFormat".equals(args[i])) { - inputFormatClass = - Class.forName(args[++i]).asSubclass(InputFormat.class); - } else if ("-outFormat".equals(args[i])) { - outputFormatClass = - Class.forName(args[++i]).asSubclass(OutputFormat.class); - } else if ("-outKey".equals(args[i])) { - outputKeyClass = - Class.forName(args[++i]).asSubclass(WritableComparable.class); - } else if ("-outValue".equals(args[i])) { - outputValueClass = - Class.forName(args[++i]).asSubclass(Writable.class); - } else if ("-joinOp".equals(args[i])) { - op = args[++i]; - } else { - otherArgs.add(args[i]); + try (JobClient client = new JobClient(conf)) { + + ClusterStatus cluster = client.getClusterStatus(); + int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); + String join_reduces = conf.get(REDUCES_PER_HOST); + if (join_reduces != null) { + num_reduces = cluster.getTaskTrackers() * + Integer.parseInt(join_reduces); + } + + Job job = new Job(conf); + job.setJobName("join"); + job.setJarByClass(Sort.class); + + job.setMapperClass(Mapper.class); + job.setReducerClass(Reducer.class); + + Class inputFormatClass = + SequenceFileInputFormat.class; + Class outputFormatClass = + SequenceFileOutputFormat.class; + Class outputKeyClass = BytesWritable.class; + Class outputValueClass = TupleWritable.class; + String op = "inner"; + List otherArgs = new ArrayList(); + for (int i = 0; i < args.length; ++i) { + try { + if ("-r".equals(args[i])) { + num_reduces = Integer.parseInt(args[++i]); + } else if ("-inFormat".equals(args[i])) { + inputFormatClass = + Class.forName(args[++i]).asSubclass(InputFormat.class); + } else if ("-outFormat".equals(args[i])) { + outputFormatClass = + Class.forName(args[++i]).asSubclass(OutputFormat.class); + } else if ("-outKey".equals(args[i])) { + outputKeyClass = + Class.forName(args[++i]).asSubclass(WritableComparable.class); + } else if ("-outValue".equals(args[i])) { + outputValueClass = + Class.forName(args[++i]).asSubclass(Writable.class); + } else if ("-joinOp".equals(args[i])) { + op = args[++i]; + } else { + otherArgs.add(args[i]); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i - 1]); + return printUsage(); // exits } - } catch (NumberFormatException except) { - System.out.println("ERROR: Integer expected instead of " + args[i]); - return printUsage(); - } catch (ArrayIndexOutOfBoundsException except) { - System.out.println("ERROR: Required parameter missing from " + - args[i-1]); - return printUsage(); // exits } - } - // Set user-supplied (possibly default) job configs - job.setNumReduceTasks(num_reduces); + // Set user-supplied (possibly default) job configs + job.setNumReduceTasks(num_reduces); - if (otherArgs.size() < 2) { - System.out.println("ERROR: Wrong number of parameters: "); - return printUsage(); - } + if (otherArgs.size() < 2) { + System.out.println("ERROR: Wrong number of parameters: "); + return printUsage(); + } - FileOutputFormat.setOutputPath(job, - new Path(otherArgs.remove(otherArgs.size() - 1))); - List plist = new ArrayList(otherArgs.size()); - for (String s : otherArgs) { - plist.add(new Path(s)); - } + FileOutputFormat.setOutputPath(job, + new Path(otherArgs.remove(otherArgs.size() - 1))); + List plist = new ArrayList(otherArgs.size()); + for (String s : otherArgs) { + plist.add(new Path(s)); + } - job.setInputFormatClass(CompositeInputFormat.class); - job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, - CompositeInputFormat.compose(op, inputFormatClass, - plist.toArray(new Path[0]))); - job.setOutputFormatClass(outputFormatClass); - - job.setOutputKeyClass(outputKeyClass); - job.setOutputValueClass(outputValueClass); - - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1 ; - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " + - (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); - return ret; + job.setInputFormatClass(CompositeInputFormat.class); + job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, + CompositeInputFormat.compose(op, inputFormatClass, + plist.toArray(new Path[0]))); + job.setOutputFormatClass(outputFormatClass); + + job.setOutputKeyClass(outputKeyClass); + job.setOutputValueClass(outputValueClass); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds."); + return ret; + } } public static void main(String[] args) throws Exception { diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java index 55404ba1bd..509ebfb6d6 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java @@ -180,71 +180,72 @@ public int run(String[] args) throws Exception { } Configuration conf = getConf(); - JobClient client = new JobClient(conf); - ClusterStatus cluster = client.getClusterStatus(); - int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); - long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, - 1*1024*1024*1024); - if (numBytesToWritePerMap == 0) { - System.err.println("Cannot have " + BYTES_PER_MAP +" set to 0"); - return -2; - } - long totalBytesToWrite = conf.getLong(TOTAL_BYTES, - numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); - int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); - if (numMaps == 0 && totalBytesToWrite > 0) { - numMaps = 1; - conf.setLong(BYTES_PER_MAP, totalBytesToWrite); - } - conf.setInt(MRJobConfig.NUM_MAPS, numMaps); - - Job job = new Job(conf); - - job.setJarByClass(RandomTextWriter.class); - job.setJobName("random-text-writer"); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - - job.setInputFormatClass(RandomWriter.RandomInputFormat.class); - job.setMapperClass(RandomTextMapper.class); - - Class outputFormatClass = - SequenceFileOutputFormat.class; - List otherArgs = new ArrayList(); - for(int i=0; i < args.length; ++i) { - try { - if ("-outFormat".equals(args[i])) { - outputFormatClass = - Class.forName(args[++i]).asSubclass(OutputFormat.class); - } else { - otherArgs.add(args[i]); + try (JobClient client = new JobClient(conf)) { + ClusterStatus cluster = client.getClusterStatus(); + int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, + 1 * 1024 * 1024 * 1024); + if (numBytesToWritePerMap == 0) { + System.err.println("Cannot have " + BYTES_PER_MAP + " set to 0"); + return -2; + } + long totalBytesToWrite = conf.getLong(TOTAL_BYTES, + numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); + int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); + if (numMaps == 0 && totalBytesToWrite > 0) { + numMaps = 1; + conf.setLong(BYTES_PER_MAP, totalBytesToWrite); + } + conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + + Job job = new Job(conf); + + job.setJarByClass(RandomTextWriter.class); + job.setJobName("random-text-writer"); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + job.setInputFormatClass(RandomWriter.RandomInputFormat.class); + job.setMapperClass(RandomTextMapper.class); + + Class outputFormatClass = + SequenceFileOutputFormat.class; + List otherArgs = new ArrayList(); + for (int i = 0; i < args.length; ++i) { + try { + if ("-outFormat".equals(args[i])) { + outputFormatClass = + Class.forName(args[++i]).asSubclass(OutputFormat.class); + } else { + otherArgs.add(args[i]); + } + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i - 1]); + return printUsage(); // exits } - } catch (ArrayIndexOutOfBoundsException except) { - System.out.println("ERROR: Required parameter missing from " + - args[i-1]); - return printUsage(); // exits } - } - job.setOutputFormatClass(outputFormatClass); - FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0))); - - System.out.println("Running " + numMaps + " maps."); - - // reducer NONE - job.setNumReduceTasks(0); - - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1; - Date endTime = new Date(); - System.out.println("Job ended: " + endTime); - System.out.println("The job took " + - (endTime.getTime() - startTime.getTime()) /1000 + - " seconds."); - - return ret; + job.setOutputFormatClass(outputFormatClass); + FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0))); + + System.out.println("Running " + numMaps + " maps."); + + // reducer NONE + job.setNumReduceTasks(0); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) / 1000 + + " seconds."); + + return ret; + } } public static void main(String[] args) throws Exception { diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java index 1627d688ab..e778ed889e 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java @@ -246,51 +246,52 @@ public int run(String[] args) throws Exception { Path outDir = new Path(args[0]); Configuration conf = getConf(); - JobClient client = new JobClient(conf); - ClusterStatus cluster = client.getClusterStatus(); - int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); - long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, - 1*1024*1024*1024); - if (numBytesToWritePerMap == 0) { - System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); - return -2; - } - long totalBytesToWrite = conf.getLong(TOTAL_BYTES, - numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); - int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); - if (numMaps == 0 && totalBytesToWrite > 0) { - numMaps = 1; - conf.setLong(BYTES_PER_MAP, totalBytesToWrite); - } - conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + try (JobClient client = new JobClient(conf)) { + ClusterStatus cluster = client.getClusterStatus(); + int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, + 1 * 1024 * 1024 * 1024); + if (numBytesToWritePerMap == 0) { + System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); + return -2; + } + long totalBytesToWrite = conf.getLong(TOTAL_BYTES, + numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); + int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); + if (numMaps == 0 && totalBytesToWrite > 0) { + numMaps = 1; + conf.setLong(BYTES_PER_MAP, totalBytesToWrite); + } + conf.setInt(MRJobConfig.NUM_MAPS, numMaps); - Job job = new Job(conf); - - job.setJarByClass(RandomWriter.class); - job.setJobName("random-writer"); - FileOutputFormat.setOutputPath(job, outDir); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(BytesWritable.class); - job.setInputFormatClass(RandomInputFormat.class); - job.setMapperClass(RandomMapper.class); - job.setReducerClass(Reducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - - System.out.println("Running " + numMaps + " maps."); - - // reducer NONE - job.setNumReduceTasks(0); - - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1; - Date endTime = new Date(); - System.out.println("Job ended: " + endTime); - System.out.println("The job took " + - (endTime.getTime() - startTime.getTime()) /1000 + - " seconds."); - - return ret; + Job job = new Job(conf); + + job.setJarByClass(RandomWriter.class); + job.setJobName("random-writer"); + FileOutputFormat.setOutputPath(job, outDir); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); + job.setInputFormatClass(RandomInputFormat.class); + job.setMapperClass(RandomMapper.class); + job.setReducerClass(Reducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + System.out.println("Running " + numMaps + " maps."); + + // reducer NONE + job.setNumReduceTasks(0); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) / 1000 + + " seconds."); + + return ret; + } } public static void main(String[] args) throws Exception {