001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022import java.net.URI; 023import java.security.PrivilegedExceptionAction; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.classification.InterfaceAudience.Private; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configuration.IntegerRanges; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.RawComparator; 035import org.apache.hadoop.mapred.JobConf; 036import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038import org.apache.hadoop.mapreduce.task.JobContextImpl; 039import org.apache.hadoop.mapreduce.util.ConfigUtil; 040import org.apache.hadoop.util.StringUtils; 041import org.apache.hadoop.yarn.api.records.ReservationId; 042 043/** 044 * The job submitter's view of the Job. 045 * 046 * <p>It allows the user to configure the 047 * job, submit it, control its execution, and query the state. The set methods 048 * only work until the job is submitted, afterwards they will throw an 049 * IllegalStateException. </p> 050 * 051 * <p> 052 * Normally the user creates the application, describes various facets of the 053 * job via {@link Job} and then submits the job and monitor its progress.</p> 054 * 055 * <p>Here is an example on how to submit a job:</p> 056 * <p><blockquote><pre> 057 * // Create a new Job 058 * Job job = Job.getInstance(); 059 * job.setJarByClass(MyJob.class); 060 * 061 * // Specify various job-specific parameters 062 * job.setJobName("myjob"); 063 * 064 * job.setInputPath(new Path("in")); 065 * job.setOutputPath(new Path("out")); 066 * 067 * job.setMapperClass(MyJob.MyMapper.class); 068 * job.setReducerClass(MyJob.MyReducer.class); 069 * 070 * // Submit the job, then poll for progress until the job is complete 071 * job.waitForCompletion(true); 072 * </pre></blockquote> 073 * 074 * 075 */ 076@InterfaceAudience.Public 077@InterfaceStability.Evolving 078public class Job extends JobContextImpl implements JobContext { 079 private static final Log LOG = LogFactory.getLog(Job.class); 080 081 @InterfaceStability.Evolving 082 public static enum JobState {DEFINE, RUNNING}; 083 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 084 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 085 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 086 public static final String COMPLETION_POLL_INTERVAL_KEY = 087 "mapreduce.client.completion.pollinterval"; 088 089 /** Default completionPollIntervalMillis is 5000 ms. */ 090 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 091 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 092 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 093 "mapreduce.client.progressmonitor.pollinterval"; 094 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 095 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 096 097 public static final String USED_GENERIC_PARSER = 098 "mapreduce.client.genericoptionsparser.used"; 099 public static final String SUBMIT_REPLICATION = 100 "mapreduce.client.submit.file.replication"; 101 public static final int DEFAULT_SUBMIT_REPLICATION = 10; 102 public static final String USE_WILDCARD_FOR_LIBJARS = 103 "mapreduce.client.libjars.wildcard"; 104 public static final boolean DEFAULT_USE_WILDCARD_FOR_LIBJARS = true; 105 106 @InterfaceStability.Evolving 107 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 108 109 static { 110 ConfigUtil.loadResources(); 111 } 112 113 private JobState state = JobState.DEFINE; 114 private JobStatus status; 115 private long statustime; 116 private Cluster cluster; 117 private ReservationId reservationId; 118 119 /** 120 * @deprecated Use {@link #getInstance()} 121 */ 122 @Deprecated 123 public Job() throws IOException { 124 this(new JobConf(new Configuration())); 125 } 126 127 /** 128 * @deprecated Use {@link #getInstance(Configuration)} 129 */ 130 @Deprecated 131 public Job(Configuration conf) throws IOException { 132 this(new JobConf(conf)); 133 } 134 135 /** 136 * @deprecated Use {@link #getInstance(Configuration, String)} 137 */ 138 @Deprecated 139 public Job(Configuration conf, String jobName) throws IOException { 140 this(new JobConf(conf)); 141 setJobName(jobName); 142 } 143 144 Job(JobConf conf) throws IOException { 145 super(conf, null); 146 // propagate existing user credentials to job 147 this.credentials.mergeAll(this.ugi.getCredentials()); 148 this.cluster = null; 149 } 150 151 Job(JobStatus status, JobConf conf) throws IOException { 152 this(conf); 153 setJobID(status.getJobID()); 154 this.status = status; 155 state = JobState.RUNNING; 156 } 157 158 159 /** 160 * Creates a new {@link Job} with no particular {@link Cluster} . 161 * A Cluster will be created with a generic {@link Configuration}. 162 * 163 * @return the {@link Job} , with no connection to a cluster yet. 164 * @throws IOException 165 */ 166 public static Job getInstance() throws IOException { 167 // create with a null Cluster 168 return getInstance(new Configuration()); 169 } 170 171 /** 172 * Creates a new {@link Job} with no particular {@link Cluster} and a 173 * given {@link Configuration}. 174 * 175 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 176 * that any necessary internal modifications do not reflect on the incoming 177 * parameter. 178 * 179 * A Cluster will be created from the conf parameter only when it's needed. 180 * 181 * @param conf the configuration 182 * @return the {@link Job} , with no connection to a cluster yet. 183 * @throws IOException 184 */ 185 public static Job getInstance(Configuration conf) throws IOException { 186 // create with a null Cluster 187 JobConf jobConf = new JobConf(conf); 188 return new Job(jobConf); 189 } 190 191 192 /** 193 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 194 * A Cluster will be created from the conf parameter only when it's needed. 195 * 196 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 197 * that any necessary internal modifications do not reflect on the incoming 198 * parameter. 199 * 200 * @param conf the configuration 201 * @return the {@link Job} , with no connection to a cluster yet. 202 * @throws IOException 203 */ 204 public static Job getInstance(Configuration conf, String jobName) 205 throws IOException { 206 // create with a null Cluster 207 Job result = getInstance(conf); 208 result.setJobName(jobName); 209 return result; 210 } 211 212 /** 213 * Creates a new {@link Job} with no particular {@link Cluster} and given 214 * {@link Configuration} and {@link JobStatus}. 215 * A Cluster will be created from the conf parameter only when it's needed. 216 * 217 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 218 * that any necessary internal modifications do not reflect on the incoming 219 * parameter. 220 * 221 * @param status job status 222 * @param conf job configuration 223 * @return the {@link Job} , with no connection to a cluster yet. 224 * @throws IOException 225 */ 226 public static Job getInstance(JobStatus status, Configuration conf) 227 throws IOException { 228 return new Job(status, new JobConf(conf)); 229 } 230 231 /** 232 * Creates a new {@link Job} with no particular {@link Cluster}. 233 * A Cluster will be created from the conf parameter only when it's needed. 234 * 235 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 236 * that any necessary internal modifications do not reflect on the incoming 237 * parameter. 238 * 239 * @param ignored 240 * @return the {@link Job} , with no connection to a cluster yet. 241 * @throws IOException 242 * @deprecated Use {@link #getInstance()} 243 */ 244 @Deprecated 245 public static Job getInstance(Cluster ignored) throws IOException { 246 return getInstance(); 247 } 248 249 /** 250 * Creates a new {@link Job} with no particular {@link Cluster} and given 251 * {@link Configuration}. 252 * A Cluster will be created from the conf parameter only when it's needed. 253 * 254 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 255 * that any necessary internal modifications do not reflect on the incoming 256 * parameter. 257 * 258 * @param ignored 259 * @param conf job configuration 260 * @return the {@link Job} , with no connection to a cluster yet. 261 * @throws IOException 262 * @deprecated Use {@link #getInstance(Configuration)} 263 */ 264 @Deprecated 265 public static Job getInstance(Cluster ignored, Configuration conf) 266 throws IOException { 267 return getInstance(conf); 268 } 269 270 /** 271 * Creates a new {@link Job} with no particular {@link Cluster} and given 272 * {@link Configuration} and {@link JobStatus}. 273 * A Cluster will be created from the conf parameter only when it's needed. 274 * 275 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 276 * that any necessary internal modifications do not reflect on the incoming 277 * parameter. 278 * 279 * @param cluster cluster 280 * @param status job status 281 * @param conf job configuration 282 * @return the {@link Job} , with no connection to a cluster yet. 283 * @throws IOException 284 */ 285 @Private 286 public static Job getInstance(Cluster cluster, JobStatus status, 287 Configuration conf) throws IOException { 288 Job job = getInstance(status, conf); 289 job.setCluster(cluster); 290 return job; 291 } 292 293 private void ensureState(JobState state) throws IllegalStateException { 294 if (state != this.state) { 295 throw new IllegalStateException("Job in state "+ this.state + 296 " instead of " + state); 297 } 298 299 if (state == JobState.RUNNING && cluster == null) { 300 throw new IllegalStateException 301 ("Job in state " + this.state 302 + ", but it isn't attached to any job tracker!"); 303 } 304 } 305 306 /** 307 * Some methods rely on having a recent job status object. Refresh 308 * it, if necessary 309 */ 310 synchronized void ensureFreshStatus() 311 throws IOException { 312 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 313 updateStatus(); 314 } 315 } 316 317 /** Some methods need to update status immediately. So, refresh 318 * immediately 319 * @throws IOException 320 */ 321 synchronized void updateStatus() throws IOException { 322 try { 323 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 324 @Override 325 public JobStatus run() throws IOException, InterruptedException { 326 return cluster.getClient().getJobStatus(status.getJobID()); 327 } 328 }); 329 } 330 catch (InterruptedException ie) { 331 throw new IOException(ie); 332 } 333 if (this.status == null) { 334 throw new IOException("Job status not available "); 335 } 336 this.statustime = System.currentTimeMillis(); 337 } 338 339 public JobStatus getStatus() throws IOException, InterruptedException { 340 ensureState(JobState.RUNNING); 341 updateStatus(); 342 return status; 343 } 344 345 /** 346 * Returns the current state of the Job. 347 * 348 * @return JobStatus#State 349 * @throws IOException 350 * @throws InterruptedException 351 */ 352 public JobStatus.State getJobState() 353 throws IOException, InterruptedException { 354 ensureState(JobState.RUNNING); 355 updateStatus(); 356 return status.getState(); 357 } 358 359 /** 360 * Get the URL where some job progress information will be displayed. 361 * 362 * @return the URL where some job progress information will be displayed. 363 */ 364 public String getTrackingURL(){ 365 ensureState(JobState.RUNNING); 366 return status.getTrackingUrl().toString(); 367 } 368 369 /** 370 * Get the path of the submitted job configuration. 371 * 372 * @return the path of the submitted job configuration. 373 */ 374 public String getJobFile() { 375 ensureState(JobState.RUNNING); 376 return status.getJobFile(); 377 } 378 379 /** 380 * Get start time of the job. 381 * 382 * @return the start time of the job 383 */ 384 public long getStartTime() { 385 ensureState(JobState.RUNNING); 386 return status.getStartTime(); 387 } 388 389 /** 390 * Get finish time of the job. 391 * 392 * @return the finish time of the job 393 */ 394 public long getFinishTime() throws IOException, InterruptedException { 395 ensureState(JobState.RUNNING); 396 updateStatus(); 397 return status.getFinishTime(); 398 } 399 400 /** 401 * Get scheduling info of the job. 402 * 403 * @return the scheduling info of the job 404 */ 405 public String getSchedulingInfo() { 406 ensureState(JobState.RUNNING); 407 return status.getSchedulingInfo(); 408 } 409 410 /** 411 * Get scheduling info of the job. 412 * 413 * @return the priority info of the job 414 */ 415 public JobPriority getPriority() throws IOException, InterruptedException { 416 ensureState(JobState.RUNNING); 417 updateStatus(); 418 return status.getPriority(); 419 } 420 421 /** 422 * The user-specified job name. 423 */ 424 public String getJobName() { 425 if (state == JobState.DEFINE || status == null) { 426 return super.getJobName(); 427 } 428 ensureState(JobState.RUNNING); 429 return status.getJobName(); 430 } 431 432 public String getHistoryUrl() throws IOException, InterruptedException { 433 ensureState(JobState.RUNNING); 434 updateStatus(); 435 return status.getHistoryFile(); 436 } 437 438 public boolean isRetired() throws IOException, InterruptedException { 439 ensureState(JobState.RUNNING); 440 updateStatus(); 441 return status.isRetired(); 442 } 443 444 @Private 445 public Cluster getCluster() { 446 return cluster; 447 } 448 449 /** Only for mocks in unit tests. */ 450 @Private 451 private void setCluster(Cluster cluster) { 452 this.cluster = cluster; 453 } 454 455 /** 456 * Dump stats to screen. 457 */ 458 @Override 459 public String toString() { 460 ensureState(JobState.RUNNING); 461 String reasonforFailure = " "; 462 int numMaps = 0; 463 int numReduces = 0; 464 try { 465 updateStatus(); 466 if (status.getState().equals(JobStatus.State.FAILED)) 467 reasonforFailure = getTaskFailureEventString(); 468 numMaps = getTaskReports(TaskType.MAP).length; 469 numReduces = getTaskReports(TaskType.REDUCE).length; 470 } catch (IOException e) { 471 } catch (InterruptedException ie) { 472 } 473 StringBuffer sb = new StringBuffer(); 474 sb.append("Job: ").append(status.getJobID()).append("\n"); 475 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 476 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 477 sb.append("\n"); 478 sb.append("Uber job : ").append(status.isUber()).append("\n"); 479 sb.append("Number of maps: ").append(numMaps).append("\n"); 480 sb.append("Number of reduces: ").append(numReduces).append("\n"); 481 sb.append("map() completion: "); 482 sb.append(status.getMapProgress()).append("\n"); 483 sb.append("reduce() completion: "); 484 sb.append(status.getReduceProgress()).append("\n"); 485 sb.append("Job state: "); 486 sb.append(status.getState()).append("\n"); 487 sb.append("retired: ").append(status.isRetired()).append("\n"); 488 sb.append("reason for failure: ").append(reasonforFailure); 489 return sb.toString(); 490 } 491 492 /** 493 * @return taskid which caused job failure 494 * @throws IOException 495 * @throws InterruptedException 496 */ 497 String getTaskFailureEventString() throws IOException, 498 InterruptedException { 499 int failCount = 1; 500 TaskCompletionEvent lastEvent = null; 501 TaskCompletionEvent[] events = ugi.doAs(new 502 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 503 @Override 504 public TaskCompletionEvent[] run() throws IOException, 505 InterruptedException { 506 return cluster.getClient().getTaskCompletionEvents( 507 status.getJobID(), 0, 10); 508 } 509 }); 510 for (TaskCompletionEvent event : events) { 511 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 512 failCount++; 513 lastEvent = event; 514 } 515 } 516 if (lastEvent == null) { 517 return "There are no failed tasks for the job. " 518 + "Job is failed due to some other reason and reason " 519 + "can be found in the logs."; 520 } 521 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 522 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 523 return (" task " + taskID + " failed " + 524 failCount + " times " + "For details check tasktracker at: " + 525 lastEvent.getTaskTrackerHttp()); 526 } 527 528 /** 529 * Get the information of the current state of the tasks of a job. 530 * 531 * @param type Type of the task 532 * @return the list of all of the map tips. 533 * @throws IOException 534 */ 535 public TaskReport[] getTaskReports(TaskType type) 536 throws IOException, InterruptedException { 537 ensureState(JobState.RUNNING); 538 final TaskType tmpType = type; 539 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 540 public TaskReport[] run() throws IOException, InterruptedException { 541 return cluster.getClient().getTaskReports(getJobID(), tmpType); 542 } 543 }); 544 } 545 546 /** 547 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 548 * and 1.0. When all map tasks have completed, the function returns 1.0. 549 * 550 * @return the progress of the job's map-tasks. 551 * @throws IOException 552 */ 553 public float mapProgress() throws IOException { 554 ensureState(JobState.RUNNING); 555 ensureFreshStatus(); 556 return status.getMapProgress(); 557 } 558 559 /** 560 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 561 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 562 * 563 * @return the progress of the job's reduce-tasks. 564 * @throws IOException 565 */ 566 public float reduceProgress() throws IOException { 567 ensureState(JobState.RUNNING); 568 ensureFreshStatus(); 569 return status.getReduceProgress(); 570 } 571 572 /** 573 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 574 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 575 * 576 * @return the progress of the job's cleanup-tasks. 577 * @throws IOException 578 */ 579 public float cleanupProgress() throws IOException, InterruptedException { 580 ensureState(JobState.RUNNING); 581 ensureFreshStatus(); 582 return status.getCleanupProgress(); 583 } 584 585 /** 586 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 587 * and 1.0. When all setup tasks have completed, the function returns 1.0. 588 * 589 * @return the progress of the job's setup-tasks. 590 * @throws IOException 591 */ 592 public float setupProgress() throws IOException { 593 ensureState(JobState.RUNNING); 594 ensureFreshStatus(); 595 return status.getSetupProgress(); 596 } 597 598 /** 599 * Check if the job is finished or not. 600 * This is a non-blocking call. 601 * 602 * @return <code>true</code> if the job is complete, else <code>false</code>. 603 * @throws IOException 604 */ 605 public boolean isComplete() throws IOException { 606 ensureState(JobState.RUNNING); 607 updateStatus(); 608 return status.isJobComplete(); 609 } 610 611 /** 612 * Check if the job completed successfully. 613 * 614 * @return <code>true</code> if the job succeeded, else <code>false</code>. 615 * @throws IOException 616 */ 617 public boolean isSuccessful() throws IOException { 618 ensureState(JobState.RUNNING); 619 updateStatus(); 620 return status.getState() == JobStatus.State.SUCCEEDED; 621 } 622 623 /** 624 * Kill the running job. Blocks until all job tasks have been 625 * killed as well. If the job is no longer running, it simply returns. 626 * 627 * @throws IOException 628 */ 629 public void killJob() throws IOException { 630 ensureState(JobState.RUNNING); 631 try { 632 cluster.getClient().killJob(getJobID()); 633 } 634 catch (InterruptedException ie) { 635 throw new IOException(ie); 636 } 637 } 638 639 /** 640 * Set the priority of a running job. 641 * @param jobPriority the new priority for the job. 642 * @throws IOException 643 */ 644 public void setPriority(JobPriority jobPriority) throws IOException, 645 InterruptedException { 646 if (state == JobState.DEFINE) { 647 if (jobPriority == JobPriority.UNDEFINED_PRIORITY) { 648 conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority)); 649 } else { 650 conf.setJobPriority(org.apache.hadoop.mapred.JobPriority 651 .valueOf(jobPriority.name())); 652 } 653 } else { 654 ensureState(JobState.RUNNING); 655 final int tmpPriority = convertPriorityToInteger(jobPriority); 656 ugi.doAs(new PrivilegedExceptionAction<Object>() { 657 @Override 658 public Object run() throws IOException, InterruptedException { 659 cluster.getClient() 660 .setJobPriority(getJobID(), Integer.toString(tmpPriority)); 661 return null; 662 } 663 }); 664 } 665 } 666 667 /** 668 * Set the priority of a running job. 669 * 670 * @param jobPriority 671 * the new priority for the job. 672 * @throws IOException 673 */ 674 public void setPriorityAsInteger(int jobPriority) throws IOException, 675 InterruptedException { 676 if (state == JobState.DEFINE) { 677 conf.setJobPriorityAsInteger(jobPriority); 678 } else { 679 ensureState(JobState.RUNNING); 680 final int tmpPriority = jobPriority; 681 ugi.doAs(new PrivilegedExceptionAction<Object>() { 682 @Override 683 public Object run() throws IOException, InterruptedException { 684 cluster.getClient() 685 .setJobPriority(getJobID(), Integer.toString(tmpPriority)); 686 return null; 687 } 688 }); 689 } 690 } 691 692 private int convertPriorityToInteger(JobPriority jobPriority) { 693 switch (jobPriority) { 694 case VERY_HIGH : 695 return 5; 696 case HIGH : 697 return 4; 698 case NORMAL : 699 return 3; 700 case LOW : 701 return 2; 702 case VERY_LOW : 703 return 1; 704 case DEFAULT : 705 return 0; 706 default: 707 break; 708 } 709 // For UNDEFINED_PRIORITY, we can set it to default for better handling 710 return 0; 711 } 712 713 /** 714 * Get events indicating completion (success/failure) of component tasks. 715 * 716 * @param startFrom index to start fetching events from 717 * @param numEvents number of events to fetch 718 * @return an array of {@link TaskCompletionEvent}s 719 * @throws IOException 720 */ 721 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 722 final int numEvents) throws IOException, InterruptedException { 723 ensureState(JobState.RUNNING); 724 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 725 @Override 726 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 727 return cluster.getClient().getTaskCompletionEvents(getJobID(), 728 startFrom, numEvents); 729 } 730 }); 731 } 732 733 /** 734 * Get events indicating completion (success/failure) of component tasks. 735 * 736 * @param startFrom index to start fetching events from 737 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s 738 * @throws IOException 739 */ 740 public org.apache.hadoop.mapred.TaskCompletionEvent[] 741 getTaskCompletionEvents(final int startFrom) throws IOException { 742 try { 743 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10); 744 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents = 745 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length]; 746 for (int i = 0; i < events.length; i++) { 747 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade 748 (events[i]); 749 } 750 return retEvents; 751 } catch (InterruptedException ie) { 752 throw new IOException(ie); 753 } 754 } 755 756 /** 757 * Kill indicated task attempt. 758 * @param taskId the id of the task to kill. 759 * @param shouldFail if <code>true</code> the task is failed and added 760 * to failed tasks list, otherwise it is just killed, 761 * w/o affecting job failure status. 762 */ 763 @Private 764 public boolean killTask(final TaskAttemptID taskId, 765 final boolean shouldFail) throws IOException { 766 ensureState(JobState.RUNNING); 767 try { 768 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 769 public Boolean run() throws IOException, InterruptedException { 770 return cluster.getClient().killTask(taskId, shouldFail); 771 } 772 }); 773 } 774 catch (InterruptedException ie) { 775 throw new IOException(ie); 776 } 777 } 778 779 /** 780 * Kill indicated task attempt. 781 * 782 * @param taskId the id of the task to be terminated. 783 * @throws IOException 784 */ 785 public void killTask(final TaskAttemptID taskId) 786 throws IOException { 787 killTask(taskId, false); 788 } 789 790 /** 791 * Fail indicated task attempt. 792 * 793 * @param taskId the id of the task to be terminated. 794 * @throws IOException 795 */ 796 public void failTask(final TaskAttemptID taskId) 797 throws IOException { 798 killTask(taskId, true); 799 } 800 801 /** 802 * Gets the counters for this job. May return null if the job has been 803 * retired and the job is no longer in the completed job store. 804 * 805 * @return the counters for this job. 806 * @throws IOException 807 */ 808 public Counters getCounters() 809 throws IOException { 810 ensureState(JobState.RUNNING); 811 try { 812 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 813 @Override 814 public Counters run() throws IOException, InterruptedException { 815 return cluster.getClient().getJobCounters(getJobID()); 816 } 817 }); 818 } 819 catch (InterruptedException ie) { 820 throw new IOException(ie); 821 } 822 } 823 824 /** 825 * Gets the diagnostic messages for a given task attempt. 826 * @param taskid 827 * @return the list of diagnostic messages for the task 828 * @throws IOException 829 */ 830 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 831 throws IOException, InterruptedException { 832 ensureState(JobState.RUNNING); 833 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 834 @Override 835 public String[] run() throws IOException, InterruptedException { 836 return cluster.getClient().getTaskDiagnostics(taskid); 837 } 838 }); 839 } 840 841 /** 842 * Set the number of reduce tasks for the job. 843 * @param tasks the number of reduce tasks 844 * @throws IllegalStateException if the job is submitted 845 */ 846 public void setNumReduceTasks(int tasks) throws IllegalStateException { 847 ensureState(JobState.DEFINE); 848 conf.setNumReduceTasks(tasks); 849 } 850 851 /** 852 * Set the current working directory for the default file system. 853 * 854 * @param dir the new current working directory. 855 * @throws IllegalStateException if the job is submitted 856 */ 857 public void setWorkingDirectory(Path dir) throws IOException { 858 ensureState(JobState.DEFINE); 859 conf.setWorkingDirectory(dir); 860 } 861 862 /** 863 * Set the {@link InputFormat} for the job. 864 * @param cls the <code>InputFormat</code> to use 865 * @throws IllegalStateException if the job is submitted 866 */ 867 public void setInputFormatClass(Class<? extends InputFormat> cls 868 ) throws IllegalStateException { 869 ensureState(JobState.DEFINE); 870 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 871 InputFormat.class); 872 } 873 874 /** 875 * Set the {@link OutputFormat} for the job. 876 * @param cls the <code>OutputFormat</code> to use 877 * @throws IllegalStateException if the job is submitted 878 */ 879 public void setOutputFormatClass(Class<? extends OutputFormat> cls 880 ) throws IllegalStateException { 881 ensureState(JobState.DEFINE); 882 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 883 OutputFormat.class); 884 } 885 886 /** 887 * Set the {@link Mapper} for the job. 888 * @param cls the <code>Mapper</code> to use 889 * @throws IllegalStateException if the job is submitted 890 */ 891 public void setMapperClass(Class<? extends Mapper> cls 892 ) throws IllegalStateException { 893 ensureState(JobState.DEFINE); 894 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 895 } 896 897 /** 898 * Set the Jar by finding where a given class came from. 899 * @param cls the example class 900 */ 901 public void setJarByClass(Class<?> cls) { 902 ensureState(JobState.DEFINE); 903 conf.setJarByClass(cls); 904 } 905 906 /** 907 * Set the job jar 908 */ 909 public void setJar(String jar) { 910 ensureState(JobState.DEFINE); 911 conf.setJar(jar); 912 } 913 914 /** 915 * Set the reported username for this job. 916 * 917 * @param user the username for this job. 918 */ 919 public void setUser(String user) { 920 ensureState(JobState.DEFINE); 921 conf.setUser(user); 922 } 923 924 /** 925 * Set the combiner class for the job. 926 * @param cls the combiner to use 927 * @throws IllegalStateException if the job is submitted 928 */ 929 public void setCombinerClass(Class<? extends Reducer> cls 930 ) throws IllegalStateException { 931 ensureState(JobState.DEFINE); 932 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 933 } 934 935 /** 936 * Set the {@link Reducer} for the job. 937 * @param cls the <code>Reducer</code> to use 938 * @throws IllegalStateException if the job is submitted 939 */ 940 public void setReducerClass(Class<? extends Reducer> cls 941 ) throws IllegalStateException { 942 ensureState(JobState.DEFINE); 943 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 944 } 945 946 /** 947 * Set the {@link Partitioner} for the job. 948 * @param cls the <code>Partitioner</code> to use 949 * @throws IllegalStateException if the job is submitted 950 */ 951 public void setPartitionerClass(Class<? extends Partitioner> cls 952 ) throws IllegalStateException { 953 ensureState(JobState.DEFINE); 954 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 955 Partitioner.class); 956 } 957 958 /** 959 * Set the key class for the map output data. This allows the user to 960 * specify the map output key class to be different than the final output 961 * value class. 962 * 963 * @param theClass the map output key class. 964 * @throws IllegalStateException if the job is submitted 965 */ 966 public void setMapOutputKeyClass(Class<?> theClass 967 ) throws IllegalStateException { 968 ensureState(JobState.DEFINE); 969 conf.setMapOutputKeyClass(theClass); 970 } 971 972 /** 973 * Set the value class for the map output data. This allows the user to 974 * specify the map output value class to be different than the final output 975 * value class. 976 * 977 * @param theClass the map output value class. 978 * @throws IllegalStateException if the job is submitted 979 */ 980 public void setMapOutputValueClass(Class<?> theClass 981 ) throws IllegalStateException { 982 ensureState(JobState.DEFINE); 983 conf.setMapOutputValueClass(theClass); 984 } 985 986 /** 987 * Set the key class for the job output data. 988 * 989 * @param theClass the key class for the job output data. 990 * @throws IllegalStateException if the job is submitted 991 */ 992 public void setOutputKeyClass(Class<?> theClass 993 ) throws IllegalStateException { 994 ensureState(JobState.DEFINE); 995 conf.setOutputKeyClass(theClass); 996 } 997 998 /** 999 * Set the value class for job outputs. 1000 * 1001 * @param theClass the value class for job outputs. 1002 * @throws IllegalStateException if the job is submitted 1003 */ 1004 public void setOutputValueClass(Class<?> theClass 1005 ) throws IllegalStateException { 1006 ensureState(JobState.DEFINE); 1007 conf.setOutputValueClass(theClass); 1008 } 1009 1010 /** 1011 * Define the comparator that controls which keys are grouped together 1012 * for a single call to combiner, 1013 * {@link Reducer#reduce(Object, Iterable, 1014 * org.apache.hadoop.mapreduce.Reducer.Context)} 1015 * 1016 * @param cls the raw comparator to use 1017 * @throws IllegalStateException if the job is submitted 1018 */ 1019 public void setCombinerKeyGroupingComparatorClass( 1020 Class<? extends RawComparator> cls) throws IllegalStateException { 1021 ensureState(JobState.DEFINE); 1022 conf.setCombinerKeyGroupingComparator(cls); 1023 } 1024 1025 /** 1026 * Define the comparator that controls how the keys are sorted before they 1027 * are passed to the {@link Reducer}. 1028 * @param cls the raw comparator 1029 * @throws IllegalStateException if the job is submitted 1030 * @see #setCombinerKeyGroupingComparatorClass(Class) 1031 */ 1032 public void setSortComparatorClass(Class<? extends RawComparator> cls 1033 ) throws IllegalStateException { 1034 ensureState(JobState.DEFINE); 1035 conf.setOutputKeyComparatorClass(cls); 1036 } 1037 1038 /** 1039 * Define the comparator that controls which keys are grouped together 1040 * for a single call to 1041 * {@link Reducer#reduce(Object, Iterable, 1042 * org.apache.hadoop.mapreduce.Reducer.Context)} 1043 * @param cls the raw comparator to use 1044 * @throws IllegalStateException if the job is submitted 1045 * @see #setCombinerKeyGroupingComparatorClass(Class) 1046 */ 1047 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 1048 ) throws IllegalStateException { 1049 ensureState(JobState.DEFINE); 1050 conf.setOutputValueGroupingComparator(cls); 1051 } 1052 1053 /** 1054 * Set the user-specified job name. 1055 * 1056 * @param name the job's new name. 1057 * @throws IllegalStateException if the job is submitted 1058 */ 1059 public void setJobName(String name) throws IllegalStateException { 1060 ensureState(JobState.DEFINE); 1061 conf.setJobName(name); 1062 } 1063 1064 /** 1065 * Turn speculative execution on or off for this job. 1066 * 1067 * @param speculativeExecution <code>true</code> if speculative execution 1068 * should be turned on, else <code>false</code>. 1069 */ 1070 public void setSpeculativeExecution(boolean speculativeExecution) { 1071 ensureState(JobState.DEFINE); 1072 conf.setSpeculativeExecution(speculativeExecution); 1073 } 1074 1075 /** 1076 * Turn speculative execution on or off for this job for map tasks. 1077 * 1078 * @param speculativeExecution <code>true</code> if speculative execution 1079 * should be turned on for map tasks, 1080 * else <code>false</code>. 1081 */ 1082 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1083 ensureState(JobState.DEFINE); 1084 conf.setMapSpeculativeExecution(speculativeExecution); 1085 } 1086 1087 /** 1088 * Turn speculative execution on or off for this job for reduce tasks. 1089 * 1090 * @param speculativeExecution <code>true</code> if speculative execution 1091 * should be turned on for reduce tasks, 1092 * else <code>false</code>. 1093 */ 1094 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1095 ensureState(JobState.DEFINE); 1096 conf.setReduceSpeculativeExecution(speculativeExecution); 1097 } 1098 1099 /** 1100 * Specify whether job-setup and job-cleanup is needed for the job 1101 * 1102 * @param needed If <code>true</code>, job-setup and job-cleanup will be 1103 * considered from {@link OutputCommitter} 1104 * else ignored. 1105 */ 1106 public void setJobSetupCleanupNeeded(boolean needed) { 1107 ensureState(JobState.DEFINE); 1108 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 1109 } 1110 1111 /** 1112 * Set the given set of archives 1113 * @param archives The list of archives that need to be localized 1114 */ 1115 public void setCacheArchives(URI[] archives) { 1116 ensureState(JobState.DEFINE); 1117 DistributedCache.setCacheArchives(archives, conf); 1118 } 1119 1120 /** 1121 * Set the given set of files 1122 * @param files The list of files that need to be localized 1123 */ 1124 public void setCacheFiles(URI[] files) { 1125 ensureState(JobState.DEFINE); 1126 DistributedCache.setCacheFiles(files, conf); 1127 } 1128 1129 /** 1130 * Add a archives to be localized 1131 * @param uri The uri of the cache to be localized 1132 */ 1133 public void addCacheArchive(URI uri) { 1134 ensureState(JobState.DEFINE); 1135 DistributedCache.addCacheArchive(uri, conf); 1136 } 1137 1138 /** 1139 * Add a file to be localized 1140 * @param uri The uri of the cache to be localized 1141 */ 1142 public void addCacheFile(URI uri) { 1143 ensureState(JobState.DEFINE); 1144 DistributedCache.addCacheFile(uri, conf); 1145 } 1146 1147 /** 1148 * Add an file path to the current set of classpath entries It adds the file 1149 * to cache as well. 1150 * 1151 * Files added with this method will not be unpacked while being added to the 1152 * classpath. 1153 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1154 * method instead. 1155 * 1156 * @param file Path of the file to be added 1157 */ 1158 public void addFileToClassPath(Path file) 1159 throws IOException { 1160 ensureState(JobState.DEFINE); 1161 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1162 } 1163 1164 /** 1165 * Add an archive path to the current set of classpath entries. It adds the 1166 * archive to cache as well. 1167 * 1168 * Archive files will be unpacked and added to the classpath 1169 * when being distributed. 1170 * 1171 * @param archive Path of the archive to be added 1172 */ 1173 public void addArchiveToClassPath(Path archive) 1174 throws IOException { 1175 ensureState(JobState.DEFINE); 1176 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1177 } 1178 1179 /** 1180 * Originally intended to enable symlinks, but currently symlinks cannot be 1181 * disabled. 1182 */ 1183 @Deprecated 1184 public void createSymlink() { 1185 ensureState(JobState.DEFINE); 1186 DistributedCache.createSymlink(conf); 1187 } 1188 1189 /** 1190 * Expert: Set the number of maximum attempts that will be made to run a 1191 * map task. 1192 * 1193 * @param n the number of attempts per map task. 1194 */ 1195 public void setMaxMapAttempts(int n) { 1196 ensureState(JobState.DEFINE); 1197 conf.setMaxMapAttempts(n); 1198 } 1199 1200 /** 1201 * Expert: Set the number of maximum attempts that will be made to run a 1202 * reduce task. 1203 * 1204 * @param n the number of attempts per reduce task. 1205 */ 1206 public void setMaxReduceAttempts(int n) { 1207 ensureState(JobState.DEFINE); 1208 conf.setMaxReduceAttempts(n); 1209 } 1210 1211 /** 1212 * Set whether the system should collect profiler information for some of 1213 * the tasks in this job? The information is stored in the user log 1214 * directory. 1215 * @param newValue true means it should be gathered 1216 */ 1217 public void setProfileEnabled(boolean newValue) { 1218 ensureState(JobState.DEFINE); 1219 conf.setProfileEnabled(newValue); 1220 } 1221 1222 /** 1223 * Set the profiler configuration arguments. If the string contains a '%s' it 1224 * will be replaced with the name of the profiling output file when the task 1225 * runs. 1226 * 1227 * This value is passed to the task child JVM on the command line. 1228 * 1229 * @param value the configuration string 1230 */ 1231 public void setProfileParams(String value) { 1232 ensureState(JobState.DEFINE); 1233 conf.setProfileParams(value); 1234 } 1235 1236 /** 1237 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1238 * must also be called. 1239 * @param newValue a set of integer ranges of the map ids 1240 */ 1241 public void setProfileTaskRange(boolean isMap, String newValue) { 1242 ensureState(JobState.DEFINE); 1243 conf.setProfileTaskRange(isMap, newValue); 1244 } 1245 1246 private void ensureNotSet(String attr, String msg) throws IOException { 1247 if (conf.get(attr) != null) { 1248 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1249 } 1250 } 1251 1252 /** 1253 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1254 * tokens upon job completion. Defaults to true. 1255 */ 1256 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1257 ensureState(JobState.DEFINE); 1258 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1259 } 1260 1261 /** 1262 * Default to the new APIs unless they are explicitly set or the old mapper or 1263 * reduce attributes are used. 1264 * @throws IOException if the configuration is inconsistent 1265 */ 1266 private void setUseNewAPI() throws IOException { 1267 int numReduces = conf.getNumReduceTasks(); 1268 String oldMapperClass = "mapred.mapper.class"; 1269 String oldReduceClass = "mapred.reducer.class"; 1270 conf.setBooleanIfUnset("mapred.mapper.new-api", 1271 conf.get(oldMapperClass) == null); 1272 if (conf.getUseNewMapper()) { 1273 String mode = "new map API"; 1274 ensureNotSet("mapred.input.format.class", mode); 1275 ensureNotSet(oldMapperClass, mode); 1276 if (numReduces != 0) { 1277 ensureNotSet("mapred.partitioner.class", mode); 1278 } else { 1279 ensureNotSet("mapred.output.format.class", mode); 1280 } 1281 } else { 1282 String mode = "map compatibility"; 1283 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1284 ensureNotSet(MAP_CLASS_ATTR, mode); 1285 if (numReduces != 0) { 1286 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1287 } else { 1288 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1289 } 1290 } 1291 if (numReduces != 0) { 1292 conf.setBooleanIfUnset("mapred.reducer.new-api", 1293 conf.get(oldReduceClass) == null); 1294 if (conf.getUseNewReducer()) { 1295 String mode = "new reduce API"; 1296 ensureNotSet("mapred.output.format.class", mode); 1297 ensureNotSet(oldReduceClass, mode); 1298 } else { 1299 String mode = "reduce compatibility"; 1300 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1301 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1302 } 1303 } 1304 } 1305 1306 private synchronized void connect() 1307 throws IOException, InterruptedException, ClassNotFoundException { 1308 if (cluster == null) { 1309 cluster = 1310 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1311 public Cluster run() 1312 throws IOException, InterruptedException, 1313 ClassNotFoundException { 1314 return new Cluster(getConfiguration()); 1315 } 1316 }); 1317 } 1318 } 1319 1320 boolean isConnected() { 1321 return cluster != null; 1322 } 1323 1324 /** Only for mocking via unit tests. */ 1325 @Private 1326 public JobSubmitter getJobSubmitter(FileSystem fs, 1327 ClientProtocol submitClient) throws IOException { 1328 return new JobSubmitter(fs, submitClient); 1329 } 1330 /** 1331 * Submit the job to the cluster and return immediately. 1332 * @throws IOException 1333 */ 1334 public void submit() 1335 throws IOException, InterruptedException, ClassNotFoundException { 1336 ensureState(JobState.DEFINE); 1337 setUseNewAPI(); 1338 connect(); 1339 final JobSubmitter submitter = 1340 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1341 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1342 public JobStatus run() throws IOException, InterruptedException, 1343 ClassNotFoundException { 1344 return submitter.submitJobInternal(Job.this, cluster); 1345 } 1346 }); 1347 state = JobState.RUNNING; 1348 LOG.info("The url to track the job: " + getTrackingURL()); 1349 } 1350 1351 /** 1352 * Submit the job to the cluster and wait for it to finish. 1353 * @param verbose print the progress to the user 1354 * @return true if the job succeeded 1355 * @throws IOException thrown if the communication with the 1356 * <code>JobTracker</code> is lost 1357 */ 1358 public boolean waitForCompletion(boolean verbose 1359 ) throws IOException, InterruptedException, 1360 ClassNotFoundException { 1361 if (state == JobState.DEFINE) { 1362 submit(); 1363 } 1364 if (verbose) { 1365 monitorAndPrintJob(); 1366 } else { 1367 // get the completion poll interval from the client. 1368 int completionPollIntervalMillis = 1369 Job.getCompletionPollInterval(cluster.getConf()); 1370 while (!isComplete()) { 1371 try { 1372 Thread.sleep(completionPollIntervalMillis); 1373 } catch (InterruptedException ie) { 1374 } 1375 } 1376 } 1377 return isSuccessful(); 1378 } 1379 1380 /** 1381 * Monitor a job and print status in real-time as progress is made and tasks 1382 * fail. 1383 * @return true if the job succeeded 1384 * @throws IOException if communication to the JobTracker fails 1385 */ 1386 public boolean monitorAndPrintJob() 1387 throws IOException, InterruptedException { 1388 String lastReport = null; 1389 Job.TaskStatusFilter filter; 1390 Configuration clientConf = getConfiguration(); 1391 filter = Job.getTaskOutputFilter(clientConf); 1392 JobID jobId = getJobID(); 1393 LOG.info("Running job: " + jobId); 1394 int eventCounter = 0; 1395 boolean profiling = getProfileEnabled(); 1396 IntegerRanges mapRanges = getProfileTaskRange(true); 1397 IntegerRanges reduceRanges = getProfileTaskRange(false); 1398 int progMonitorPollIntervalMillis = 1399 Job.getProgressPollInterval(clientConf); 1400 /* make sure to report full progress after the job is done */ 1401 boolean reportedAfterCompletion = false; 1402 boolean reportedUberMode = false; 1403 while (!isComplete() || !reportedAfterCompletion) { 1404 if (isComplete()) { 1405 reportedAfterCompletion = true; 1406 } else { 1407 Thread.sleep(progMonitorPollIntervalMillis); 1408 } 1409 if (status.getState() == JobStatus.State.PREP) { 1410 continue; 1411 } 1412 if (!reportedUberMode) { 1413 reportedUberMode = true; 1414 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1415 } 1416 String report = 1417 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1418 " reduce " + 1419 StringUtils.formatPercent(reduceProgress(), 0)); 1420 if (!report.equals(lastReport)) { 1421 LOG.info(report); 1422 lastReport = report; 1423 } 1424 1425 TaskCompletionEvent[] events = 1426 getTaskCompletionEvents(eventCounter, 10); 1427 eventCounter += events.length; 1428 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1429 } 1430 boolean success = isSuccessful(); 1431 if (success) { 1432 LOG.info("Job " + jobId + " completed successfully"); 1433 } else { 1434 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1435 " due to: " + status.getFailureInfo()); 1436 } 1437 Counters counters = getCounters(); 1438 if (counters != null) { 1439 LOG.info(counters.toString()); 1440 } 1441 return success; 1442 } 1443 1444 private void printTaskEvents(TaskCompletionEvent[] events, 1445 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1446 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1447 for (TaskCompletionEvent event : events) { 1448 switch (filter) { 1449 case NONE: 1450 break; 1451 case SUCCEEDED: 1452 if (event.getStatus() == 1453 TaskCompletionEvent.Status.SUCCEEDED) { 1454 LOG.info(event.toString()); 1455 } 1456 break; 1457 case FAILED: 1458 if (event.getStatus() == 1459 TaskCompletionEvent.Status.FAILED) { 1460 LOG.info(event.toString()); 1461 // Displaying the task diagnostic information 1462 TaskAttemptID taskId = event.getTaskAttemptId(); 1463 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1464 if (taskDiagnostics != null) { 1465 for (String diagnostics : taskDiagnostics) { 1466 System.err.println(diagnostics); 1467 } 1468 } 1469 } 1470 break; 1471 case KILLED: 1472 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1473 LOG.info(event.toString()); 1474 } 1475 break; 1476 case ALL: 1477 LOG.info(event.toString()); 1478 break; 1479 } 1480 } 1481 } 1482 1483 /** The interval at which monitorAndPrintJob() prints status */ 1484 public static int getProgressPollInterval(Configuration conf) { 1485 // Read progress monitor poll interval from config. Default is 1 second. 1486 int progMonitorPollIntervalMillis = conf.getInt( 1487 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1488 if (progMonitorPollIntervalMillis < 1) { 1489 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1490 " has been set to an invalid value; " 1491 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1492 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1493 } 1494 return progMonitorPollIntervalMillis; 1495 } 1496 1497 /** The interval at which waitForCompletion() should check. */ 1498 public static int getCompletionPollInterval(Configuration conf) { 1499 int completionPollIntervalMillis = conf.getInt( 1500 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1501 if (completionPollIntervalMillis < 1) { 1502 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1503 " has been set to an invalid value; " 1504 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1505 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1506 } 1507 return completionPollIntervalMillis; 1508 } 1509 1510 /** 1511 * Get the task output filter. 1512 * 1513 * @param conf the configuration. 1514 * @return the filter level. 1515 */ 1516 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1517 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1518 } 1519 1520 /** 1521 * Modify the Configuration to set the task output filter. 1522 * 1523 * @param conf the Configuration to modify. 1524 * @param newValue the value to set. 1525 */ 1526 public static void setTaskOutputFilter(Configuration conf, 1527 TaskStatusFilter newValue) { 1528 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1529 } 1530 1531 public boolean isUber() throws IOException, InterruptedException { 1532 ensureState(JobState.RUNNING); 1533 updateStatus(); 1534 return status.isUber(); 1535 } 1536 1537 /** 1538 * Get the reservation to which the job is submitted to, if any 1539 * 1540 * @return the reservationId the identifier of the job's reservation, null if 1541 * the job does not have any reservation associated with it 1542 */ 1543 public ReservationId getReservationId() { 1544 return reservationId; 1545 } 1546 1547 /** 1548 * Set the reservation to which the job is submitted to 1549 * 1550 * @param reservationId the reservationId to set 1551 */ 1552 public void setReservationId(ReservationId reservationId) { 1553 this.reservationId = reservationId; 1554 } 1555 1556}