001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022import java.net.URI;
023import java.security.PrivilegedExceptionAction;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.classification.InterfaceAudience.Private;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configuration.IntegerRanges;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.RawComparator;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038import org.apache.hadoop.mapreduce.task.JobContextImpl;
039import org.apache.hadoop.mapreduce.util.ConfigUtil;
040import org.apache.hadoop.util.StringUtils;
041import org.apache.hadoop.yarn.api.records.ReservationId;
042
043/**
044 * The job submitter's view of the Job.
045 * 
046 * <p>It allows the user to configure the
047 * job, submit it, control its execution, and query the state. The set methods
048 * only work until the job is submitted, afterwards they will throw an 
049 * IllegalStateException. </p>
050 * 
051 * <p>
052 * Normally the user creates the application, describes various facets of the
053 * job via {@link Job} and then submits the job and monitor its progress.</p>
054 * 
055 * <p>Here is an example on how to submit a job:</p>
056 * <p><blockquote><pre>
057 *     // Create a new Job
058 *     Job job = Job.getInstance();
059 *     job.setJarByClass(MyJob.class);
060 *     
061 *     // Specify various job-specific parameters     
062 *     job.setJobName("myjob");
063 *     
064 *     job.setInputPath(new Path("in"));
065 *     job.setOutputPath(new Path("out"));
066 *     
067 *     job.setMapperClass(MyJob.MyMapper.class);
068 *     job.setReducerClass(MyJob.MyReducer.class);
069 *
070 *     // Submit the job, then poll for progress until the job is complete
071 *     job.waitForCompletion(true);
072 * </pre></blockquote>
073 * 
074 * 
075 */
076@InterfaceAudience.Public
077@InterfaceStability.Evolving
078public class Job extends JobContextImpl implements JobContext {  
079  private static final Log LOG = LogFactory.getLog(Job.class);
080
081  @InterfaceStability.Evolving
082  public static enum JobState {DEFINE, RUNNING};
083  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
084  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
085  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
086  public static final String COMPLETION_POLL_INTERVAL_KEY = 
087    "mapreduce.client.completion.pollinterval";
088  
089  /** Default completionPollIntervalMillis is 5000 ms. */
090  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
091  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
092  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
093    "mapreduce.client.progressmonitor.pollinterval";
094  /** Default progMonitorPollIntervalMillis is 1000 ms. */
095  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
096
097  public static final String USED_GENERIC_PARSER = 
098      "mapreduce.client.genericoptionsparser.used";
099  public static final String SUBMIT_REPLICATION = 
100      "mapreduce.client.submit.file.replication";
101  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
102  public static final String USE_WILDCARD_FOR_LIBJARS =
103      "mapreduce.client.libjars.wildcard";
104  public static final boolean DEFAULT_USE_WILDCARD_FOR_LIBJARS = true;
105
106  @InterfaceStability.Evolving
107  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
108
109  static {
110    ConfigUtil.loadResources();
111  }
112
113  private JobState state = JobState.DEFINE;
114  private JobStatus status;
115  private long statustime;
116  private Cluster cluster;
117  private ReservationId reservationId;
118
119  /**
120   * @deprecated Use {@link #getInstance()}
121   */
122  @Deprecated
123  public Job() throws IOException {
124    this(new JobConf(new Configuration()));
125  }
126
127  /**
128   * @deprecated Use {@link #getInstance(Configuration)}
129   */
130  @Deprecated
131  public Job(Configuration conf) throws IOException {
132    this(new JobConf(conf));
133  }
134
135  /**
136   * @deprecated Use {@link #getInstance(Configuration, String)}
137   */
138  @Deprecated
139  public Job(Configuration conf, String jobName) throws IOException {
140    this(new JobConf(conf));
141    setJobName(jobName);
142  }
143
144  Job(JobConf conf) throws IOException {
145    super(conf, null);
146    // propagate existing user credentials to job
147    this.credentials.mergeAll(this.ugi.getCredentials());
148    this.cluster = null;
149  }
150
151  Job(JobStatus status, JobConf conf) throws IOException {
152    this(conf);
153    setJobID(status.getJobID());
154    this.status = status;
155    state = JobState.RUNNING;
156  }
157
158      
159  /**
160   * Creates a new {@link Job} with no particular {@link Cluster} .
161   * A Cluster will be created with a generic {@link Configuration}.
162   * 
163   * @return the {@link Job} , with no connection to a cluster yet.
164   * @throws IOException
165   */
166  public static Job getInstance() throws IOException {
167    // create with a null Cluster
168    return getInstance(new Configuration());
169  }
170      
171  /**
172   * Creates a new {@link Job} with no particular {@link Cluster} and a 
173   * given {@link Configuration}.
174   * 
175   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
176   * that any necessary internal modifications do not reflect on the incoming 
177   * parameter.
178   * 
179   * A Cluster will be created from the conf parameter only when it's needed.
180   * 
181   * @param conf the configuration
182   * @return the {@link Job} , with no connection to a cluster yet.
183   * @throws IOException
184   */
185  public static Job getInstance(Configuration conf) throws IOException {
186    // create with a null Cluster
187    JobConf jobConf = new JobConf(conf);
188    return new Job(jobConf);
189  }
190
191      
192  /**
193   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
194   * A Cluster will be created from the conf parameter only when it's needed.
195   *
196   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
197   * that any necessary internal modifications do not reflect on the incoming 
198   * parameter.
199   * 
200   * @param conf the configuration
201   * @return the {@link Job} , with no connection to a cluster yet.
202   * @throws IOException
203   */
204  public static Job getInstance(Configuration conf, String jobName)
205           throws IOException {
206    // create with a null Cluster
207    Job result = getInstance(conf);
208    result.setJobName(jobName);
209    return result;
210  }
211  
212  /**
213   * Creates a new {@link Job} with no particular {@link Cluster} and given
214   * {@link Configuration} and {@link JobStatus}.
215   * A Cluster will be created from the conf parameter only when it's needed.
216   * 
217   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
218   * that any necessary internal modifications do not reflect on the incoming 
219   * parameter.
220   * 
221   * @param status job status
222   * @param conf job configuration
223   * @return the {@link Job} , with no connection to a cluster yet.
224   * @throws IOException
225   */
226  public static Job getInstance(JobStatus status, Configuration conf) 
227  throws IOException {
228    return new Job(status, new JobConf(conf));
229  }
230
231  /**
232   * Creates a new {@link Job} with no particular {@link Cluster}.
233   * A Cluster will be created from the conf parameter only when it's needed.
234   *
235   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
236   * that any necessary internal modifications do not reflect on the incoming 
237   * parameter.
238   * 
239   * @param ignored
240   * @return the {@link Job} , with no connection to a cluster yet.
241   * @throws IOException
242   * @deprecated Use {@link #getInstance()}
243   */
244  @Deprecated
245  public static Job getInstance(Cluster ignored) throws IOException {
246    return getInstance();
247  }
248  
249  /**
250   * Creates a new {@link Job} with no particular {@link Cluster} and given
251   * {@link Configuration}.
252   * A Cluster will be created from the conf parameter only when it's needed.
253   * 
254   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
255   * that any necessary internal modifications do not reflect on the incoming 
256   * parameter.
257   * 
258   * @param ignored
259   * @param conf job configuration
260   * @return the {@link Job} , with no connection to a cluster yet.
261   * @throws IOException
262   * @deprecated Use {@link #getInstance(Configuration)}
263   */
264  @Deprecated
265  public static Job getInstance(Cluster ignored, Configuration conf) 
266      throws IOException {
267    return getInstance(conf);
268  }
269  
270  /**
271   * Creates a new {@link Job} with no particular {@link Cluster} and given
272   * {@link Configuration} and {@link JobStatus}.
273   * A Cluster will be created from the conf parameter only when it's needed.
274   * 
275   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
276   * that any necessary internal modifications do not reflect on the incoming 
277   * parameter.
278   * 
279   * @param cluster cluster
280   * @param status job status
281   * @param conf job configuration
282   * @return the {@link Job} , with no connection to a cluster yet.
283   * @throws IOException
284   */
285  @Private
286  public static Job getInstance(Cluster cluster, JobStatus status, 
287      Configuration conf) throws IOException {
288    Job job = getInstance(status, conf);
289    job.setCluster(cluster);
290    return job;
291  }
292
293  private void ensureState(JobState state) throws IllegalStateException {
294    if (state != this.state) {
295      throw new IllegalStateException("Job in state "+ this.state + 
296                                      " instead of " + state);
297    }
298
299    if (state == JobState.RUNNING && cluster == null) {
300      throw new IllegalStateException
301        ("Job in state " + this.state
302         + ", but it isn't attached to any job tracker!");
303    }
304  }
305
306  /**
307   * Some methods rely on having a recent job status object.  Refresh
308   * it, if necessary
309   */
310  synchronized void ensureFreshStatus() 
311      throws IOException {
312    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
313      updateStatus();
314    }
315  }
316    
317  /** Some methods need to update status immediately. So, refresh
318   * immediately
319   * @throws IOException
320   */
321  synchronized void updateStatus() throws IOException {
322    try {
323      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
324        @Override
325        public JobStatus run() throws IOException, InterruptedException {
326          return cluster.getClient().getJobStatus(status.getJobID());
327        }
328      });
329    }
330    catch (InterruptedException ie) {
331      throw new IOException(ie);
332    }
333    if (this.status == null) {
334      throw new IOException("Job status not available ");
335    }
336    this.statustime = System.currentTimeMillis();
337  }
338  
339  public JobStatus getStatus() throws IOException, InterruptedException {
340    ensureState(JobState.RUNNING);
341    updateStatus();
342    return status;
343  }
344
345  /**
346   * Returns the current state of the Job.
347   * 
348   * @return JobStatus#State
349   * @throws IOException
350   * @throws InterruptedException
351   */
352  public JobStatus.State getJobState() 
353      throws IOException, InterruptedException {
354    ensureState(JobState.RUNNING);
355    updateStatus();
356    return status.getState();
357  }
358  
359  /**
360   * Get the URL where some job progress information will be displayed.
361   * 
362   * @return the URL where some job progress information will be displayed.
363   */
364  public String getTrackingURL(){
365    ensureState(JobState.RUNNING);
366    return status.getTrackingUrl().toString();
367  }
368
369  /**
370   * Get the path of the submitted job configuration.
371   * 
372   * @return the path of the submitted job configuration.
373   */
374  public String getJobFile() {
375    ensureState(JobState.RUNNING);
376    return status.getJobFile();
377  }
378
379  /**
380   * Get start time of the job.
381   * 
382   * @return the start time of the job
383   */
384  public long getStartTime() {
385    ensureState(JobState.RUNNING);
386    return status.getStartTime();
387  }
388
389  /**
390   * Get finish time of the job.
391   * 
392   * @return the finish time of the job
393   */
394  public long getFinishTime() throws IOException, InterruptedException {
395    ensureState(JobState.RUNNING);
396    updateStatus();
397    return status.getFinishTime();
398  }
399
400  /**
401   * Get scheduling info of the job.
402   * 
403   * @return the scheduling info of the job
404   */
405  public String getSchedulingInfo() {
406    ensureState(JobState.RUNNING);
407    return status.getSchedulingInfo();
408  }
409
410  /**
411   * Get scheduling info of the job.
412   * 
413   * @return the priority info of the job
414   */
415  public JobPriority getPriority() throws IOException, InterruptedException {
416    ensureState(JobState.RUNNING);
417    updateStatus();
418    return status.getPriority();
419  }
420
421  /**
422   * The user-specified job name.
423   */
424  public String getJobName() {
425    if (state == JobState.DEFINE || status == null) {
426      return super.getJobName();
427    }
428    ensureState(JobState.RUNNING);
429    return status.getJobName();
430  }
431
432  public String getHistoryUrl() throws IOException, InterruptedException {
433    ensureState(JobState.RUNNING);
434    updateStatus();
435    return status.getHistoryFile();
436  }
437
438  public boolean isRetired() throws IOException, InterruptedException {
439    ensureState(JobState.RUNNING);
440    updateStatus();
441    return status.isRetired();
442  }
443  
444  @Private
445  public Cluster getCluster() {
446    return cluster;
447  }
448
449  /** Only for mocks in unit tests. */
450  @Private
451  private void setCluster(Cluster cluster) {
452    this.cluster = cluster;
453  }
454
455  /**
456   * Dump stats to screen.
457   */
458  @Override
459  public String toString() {
460    ensureState(JobState.RUNNING);
461    String reasonforFailure = " ";
462    int numMaps = 0;
463    int numReduces = 0;
464    try {
465      updateStatus();
466      if (status.getState().equals(JobStatus.State.FAILED))
467        reasonforFailure = getTaskFailureEventString();
468      numMaps = getTaskReports(TaskType.MAP).length;
469      numReduces = getTaskReports(TaskType.REDUCE).length;
470    } catch (IOException e) {
471    } catch (InterruptedException ie) {
472    }
473    StringBuffer sb = new StringBuffer();
474    sb.append("Job: ").append(status.getJobID()).append("\n");
475    sb.append("Job File: ").append(status.getJobFile()).append("\n");
476    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
477    sb.append("\n");
478    sb.append("Uber job : ").append(status.isUber()).append("\n");
479    sb.append("Number of maps: ").append(numMaps).append("\n");
480    sb.append("Number of reduces: ").append(numReduces).append("\n");
481    sb.append("map() completion: ");
482    sb.append(status.getMapProgress()).append("\n");
483    sb.append("reduce() completion: ");
484    sb.append(status.getReduceProgress()).append("\n");
485    sb.append("Job state: ");
486    sb.append(status.getState()).append("\n");
487    sb.append("retired: ").append(status.isRetired()).append("\n");
488    sb.append("reason for failure: ").append(reasonforFailure);
489    return sb.toString();
490  }
491
492  /**
493   * @return taskid which caused job failure
494   * @throws IOException
495   * @throws InterruptedException
496   */
497  String getTaskFailureEventString() throws IOException,
498      InterruptedException {
499    int failCount = 1;
500    TaskCompletionEvent lastEvent = null;
501    TaskCompletionEvent[] events = ugi.doAs(new 
502        PrivilegedExceptionAction<TaskCompletionEvent[]>() {
503          @Override
504          public TaskCompletionEvent[] run() throws IOException,
505          InterruptedException {
506            return cluster.getClient().getTaskCompletionEvents(
507                status.getJobID(), 0, 10);
508          }
509        });
510    for (TaskCompletionEvent event : events) {
511      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
512        failCount++;
513        lastEvent = event;
514      }
515    }
516    if (lastEvent == null) {
517      return "There are no failed tasks for the job. "
518          + "Job is failed due to some other reason and reason "
519          + "can be found in the logs.";
520    }
521    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
522    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
523    return (" task " + taskID + " failed " +
524      failCount + " times " + "For details check tasktracker at: " +
525      lastEvent.getTaskTrackerHttp());
526  }
527
528  /**
529   * Get the information of the current state of the tasks of a job.
530   * 
531   * @param type Type of the task
532   * @return the list of all of the map tips.
533   * @throws IOException
534   */
535  public TaskReport[] getTaskReports(TaskType type) 
536      throws IOException, InterruptedException {
537    ensureState(JobState.RUNNING);
538    final TaskType tmpType = type;
539    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
540      public TaskReport[] run() throws IOException, InterruptedException {
541        return cluster.getClient().getTaskReports(getJobID(), tmpType);
542      }
543    });
544  }
545
546  /**
547   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
548   * and 1.0.  When all map tasks have completed, the function returns 1.0.
549   * 
550   * @return the progress of the job's map-tasks.
551   * @throws IOException
552   */
553  public float mapProgress() throws IOException {
554    ensureState(JobState.RUNNING);
555    ensureFreshStatus();
556    return status.getMapProgress();
557  }
558
559  /**
560   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
561   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
562   * 
563   * @return the progress of the job's reduce-tasks.
564   * @throws IOException
565   */
566  public float reduceProgress() throws IOException {
567    ensureState(JobState.RUNNING);
568    ensureFreshStatus();
569    return status.getReduceProgress();
570  }
571
572  /**
573   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
574   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
575   * 
576   * @return the progress of the job's cleanup-tasks.
577   * @throws IOException
578   */
579  public float cleanupProgress() throws IOException, InterruptedException {
580    ensureState(JobState.RUNNING);
581    ensureFreshStatus();
582    return status.getCleanupProgress();
583  }
584
585  /**
586   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
587   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
588   * 
589   * @return the progress of the job's setup-tasks.
590   * @throws IOException
591   */
592  public float setupProgress() throws IOException {
593    ensureState(JobState.RUNNING);
594    ensureFreshStatus();
595    return status.getSetupProgress();
596  }
597
598  /**
599   * Check if the job is finished or not. 
600   * This is a non-blocking call.
601   * 
602   * @return <code>true</code> if the job is complete, else <code>false</code>.
603   * @throws IOException
604   */
605  public boolean isComplete() throws IOException {
606    ensureState(JobState.RUNNING);
607    updateStatus();
608    return status.isJobComplete();
609  }
610
611  /**
612   * Check if the job completed successfully. 
613   * 
614   * @return <code>true</code> if the job succeeded, else <code>false</code>.
615   * @throws IOException
616   */
617  public boolean isSuccessful() throws IOException {
618    ensureState(JobState.RUNNING);
619    updateStatus();
620    return status.getState() == JobStatus.State.SUCCEEDED;
621  }
622
623  /**
624   * Kill the running job.  Blocks until all job tasks have been
625   * killed as well.  If the job is no longer running, it simply returns.
626   * 
627   * @throws IOException
628   */
629  public void killJob() throws IOException {
630    ensureState(JobState.RUNNING);
631    try {
632      cluster.getClient().killJob(getJobID());
633    }
634    catch (InterruptedException ie) {
635      throw new IOException(ie);
636    }
637  }
638
639  /**
640   * Set the priority of a running job.
641   * @param jobPriority the new priority for the job.
642   * @throws IOException
643   */
644  public void setPriority(JobPriority jobPriority) throws IOException,
645      InterruptedException {
646    if (state == JobState.DEFINE) {
647      if (jobPriority == JobPriority.UNDEFINED_PRIORITY) {
648        conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority));
649      } else {
650        conf.setJobPriority(org.apache.hadoop.mapred.JobPriority
651            .valueOf(jobPriority.name()));
652      }
653    } else {
654      ensureState(JobState.RUNNING);
655      final int tmpPriority = convertPriorityToInteger(jobPriority);
656      ugi.doAs(new PrivilegedExceptionAction<Object>() {
657        @Override
658        public Object run() throws IOException, InterruptedException {
659          cluster.getClient()
660              .setJobPriority(getJobID(), Integer.toString(tmpPriority));
661          return null;
662        }
663      });
664    }
665  }
666
667  /**
668   * Set the priority of a running job.
669   *
670   * @param jobPriority
671   *          the new priority for the job.
672   * @throws IOException
673   */
674  public void setPriorityAsInteger(int jobPriority) throws IOException,
675      InterruptedException {
676    if (state == JobState.DEFINE) {
677      conf.setJobPriorityAsInteger(jobPriority);
678    } else {
679      ensureState(JobState.RUNNING);
680      final int tmpPriority = jobPriority;
681      ugi.doAs(new PrivilegedExceptionAction<Object>() {
682        @Override
683        public Object run() throws IOException, InterruptedException {
684          cluster.getClient()
685              .setJobPriority(getJobID(), Integer.toString(tmpPriority));
686          return null;
687        }
688      });
689    }
690  }
691
692  private int convertPriorityToInteger(JobPriority jobPriority) {
693    switch (jobPriority) {
694    case VERY_HIGH :
695      return 5;
696    case HIGH :
697      return 4;
698    case NORMAL :
699      return 3;
700    case LOW :
701      return 2;
702    case VERY_LOW :
703      return 1;
704    case DEFAULT :
705      return 0;
706    default:
707      break;
708    }
709    // For UNDEFINED_PRIORITY, we can set it to default for better handling
710    return 0;
711  }
712
713  /**
714   * Get events indicating completion (success/failure) of component tasks.
715   *  
716   * @param startFrom index to start fetching events from
717   * @param numEvents number of events to fetch
718   * @return an array of {@link TaskCompletionEvent}s
719   * @throws IOException
720   */
721  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
722      final int numEvents) throws IOException, InterruptedException {
723    ensureState(JobState.RUNNING);
724    return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
725      @Override
726      public TaskCompletionEvent[] run() throws IOException, InterruptedException {
727        return cluster.getClient().getTaskCompletionEvents(getJobID(),
728            startFrom, numEvents); 
729      }
730    });
731  }
732
733  /**
734   * Get events indicating completion (success/failure) of component tasks.
735   *  
736   * @param startFrom index to start fetching events from
737   * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
738   * @throws IOException
739   */
740  public org.apache.hadoop.mapred.TaskCompletionEvent[]
741    getTaskCompletionEvents(final int startFrom) throws IOException {
742    try {
743      TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
744      org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
745          new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
746      for (int i = 0; i < events.length; i++) {
747        retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
748            (events[i]);
749      }
750      return retEvents;
751    } catch (InterruptedException ie) {
752      throw new IOException(ie);
753    }
754  }
755
756  /**
757   * Kill indicated task attempt.
758   * @param taskId the id of the task to kill.
759   * @param shouldFail if <code>true</code> the task is failed and added
760   *                   to failed tasks list, otherwise it is just killed,
761   *                   w/o affecting job failure status.
762   */
763  @Private
764  public boolean killTask(final TaskAttemptID taskId,
765                          final boolean shouldFail) throws IOException {
766    ensureState(JobState.RUNNING);
767    try {
768      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
769        public Boolean run() throws IOException, InterruptedException {
770          return cluster.getClient().killTask(taskId, shouldFail);
771        }
772      });
773    }
774    catch (InterruptedException ie) {
775      throw new IOException(ie);
776    }
777  }
778
779  /**
780   * Kill indicated task attempt.
781   * 
782   * @param taskId the id of the task to be terminated.
783   * @throws IOException
784   */
785  public void killTask(final TaskAttemptID taskId)
786      throws IOException {
787    killTask(taskId, false);
788  }
789
790  /**
791   * Fail indicated task attempt.
792   * 
793   * @param taskId the id of the task to be terminated.
794   * @throws IOException
795   */
796  public void failTask(final TaskAttemptID taskId)
797      throws IOException {
798    killTask(taskId, true);
799  }
800
801  /**
802   * Gets the counters for this job. May return null if the job has been
803   * retired and the job is no longer in the completed job store.
804   * 
805   * @return the counters for this job.
806   * @throws IOException
807   */
808  public Counters getCounters() 
809      throws IOException {
810    ensureState(JobState.RUNNING);
811    try {
812      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
813        @Override
814        public Counters run() throws IOException, InterruptedException {
815          return cluster.getClient().getJobCounters(getJobID());
816        }
817      });
818    }
819    catch (InterruptedException ie) {
820      throw new IOException(ie);
821    }
822  }
823
824  /**
825   * Gets the diagnostic messages for a given task attempt.
826   * @param taskid
827   * @return the list of diagnostic messages for the task
828   * @throws IOException
829   */
830  public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
831      throws IOException, InterruptedException {
832    ensureState(JobState.RUNNING);
833    return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
834      @Override
835      public String[] run() throws IOException, InterruptedException {
836        return cluster.getClient().getTaskDiagnostics(taskid);
837      }
838    });
839  }
840
841  /**
842   * Set the number of reduce tasks for the job.
843   * @param tasks the number of reduce tasks
844   * @throws IllegalStateException if the job is submitted
845   */
846  public void setNumReduceTasks(int tasks) throws IllegalStateException {
847    ensureState(JobState.DEFINE);
848    conf.setNumReduceTasks(tasks);
849  }
850
851  /**
852   * Set the current working directory for the default file system.
853   * 
854   * @param dir the new current working directory.
855   * @throws IllegalStateException if the job is submitted
856   */
857  public void setWorkingDirectory(Path dir) throws IOException {
858    ensureState(JobState.DEFINE);
859    conf.setWorkingDirectory(dir);
860  }
861
862  /**
863   * Set the {@link InputFormat} for the job.
864   * @param cls the <code>InputFormat</code> to use
865   * @throws IllegalStateException if the job is submitted
866   */
867  public void setInputFormatClass(Class<? extends InputFormat> cls
868                                  ) throws IllegalStateException {
869    ensureState(JobState.DEFINE);
870    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
871                  InputFormat.class);
872  }
873
874  /**
875   * Set the {@link OutputFormat} for the job.
876   * @param cls the <code>OutputFormat</code> to use
877   * @throws IllegalStateException if the job is submitted
878   */
879  public void setOutputFormatClass(Class<? extends OutputFormat> cls
880                                   ) throws IllegalStateException {
881    ensureState(JobState.DEFINE);
882    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
883                  OutputFormat.class);
884  }
885
886  /**
887   * Set the {@link Mapper} for the job.
888   * @param cls the <code>Mapper</code> to use
889   * @throws IllegalStateException if the job is submitted
890   */
891  public void setMapperClass(Class<? extends Mapper> cls
892                             ) throws IllegalStateException {
893    ensureState(JobState.DEFINE);
894    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
895  }
896
897  /**
898   * Set the Jar by finding where a given class came from.
899   * @param cls the example class
900   */
901  public void setJarByClass(Class<?> cls) {
902    ensureState(JobState.DEFINE);
903    conf.setJarByClass(cls);
904  }
905
906  /**
907   * Set the job jar 
908   */
909  public void setJar(String jar) {
910    ensureState(JobState.DEFINE);
911    conf.setJar(jar);
912  }
913
914  /**
915   * Set the reported username for this job.
916   * 
917   * @param user the username for this job.
918   */
919  public void setUser(String user) {
920    ensureState(JobState.DEFINE);
921    conf.setUser(user);
922  }
923
924  /**
925   * Set the combiner class for the job.
926   * @param cls the combiner to use
927   * @throws IllegalStateException if the job is submitted
928   */
929  public void setCombinerClass(Class<? extends Reducer> cls
930                               ) throws IllegalStateException {
931    ensureState(JobState.DEFINE);
932    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
933  }
934
935  /**
936   * Set the {@link Reducer} for the job.
937   * @param cls the <code>Reducer</code> to use
938   * @throws IllegalStateException if the job is submitted
939   */
940  public void setReducerClass(Class<? extends Reducer> cls
941                              ) throws IllegalStateException {
942    ensureState(JobState.DEFINE);
943    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
944  }
945
946  /**
947   * Set the {@link Partitioner} for the job.
948   * @param cls the <code>Partitioner</code> to use
949   * @throws IllegalStateException if the job is submitted
950   */
951  public void setPartitionerClass(Class<? extends Partitioner> cls
952                                  ) throws IllegalStateException {
953    ensureState(JobState.DEFINE);
954    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
955                  Partitioner.class);
956  }
957
958  /**
959   * Set the key class for the map output data. This allows the user to
960   * specify the map output key class to be different than the final output
961   * value class.
962   * 
963   * @param theClass the map output key class.
964   * @throws IllegalStateException if the job is submitted
965   */
966  public void setMapOutputKeyClass(Class<?> theClass
967                                   ) throws IllegalStateException {
968    ensureState(JobState.DEFINE);
969    conf.setMapOutputKeyClass(theClass);
970  }
971
972  /**
973   * Set the value class for the map output data. This allows the user to
974   * specify the map output value class to be different than the final output
975   * value class.
976   * 
977   * @param theClass the map output value class.
978   * @throws IllegalStateException if the job is submitted
979   */
980  public void setMapOutputValueClass(Class<?> theClass
981                                     ) throws IllegalStateException {
982    ensureState(JobState.DEFINE);
983    conf.setMapOutputValueClass(theClass);
984  }
985
986  /**
987   * Set the key class for the job output data.
988   * 
989   * @param theClass the key class for the job output data.
990   * @throws IllegalStateException if the job is submitted
991   */
992  public void setOutputKeyClass(Class<?> theClass
993                                ) throws IllegalStateException {
994    ensureState(JobState.DEFINE);
995    conf.setOutputKeyClass(theClass);
996  }
997
998  /**
999   * Set the value class for job outputs.
1000   * 
1001   * @param theClass the value class for job outputs.
1002   * @throws IllegalStateException if the job is submitted
1003   */
1004  public void setOutputValueClass(Class<?> theClass
1005                                  ) throws IllegalStateException {
1006    ensureState(JobState.DEFINE);
1007    conf.setOutputValueClass(theClass);
1008  }
1009
1010  /**
1011   * Define the comparator that controls which keys are grouped together
1012   * for a single call to combiner,
1013   * {@link Reducer#reduce(Object, Iterable,
1014   * org.apache.hadoop.mapreduce.Reducer.Context)}
1015   *
1016   * @param cls the raw comparator to use
1017   * @throws IllegalStateException if the job is submitted
1018   */
1019  public void setCombinerKeyGroupingComparatorClass(
1020      Class<? extends RawComparator> cls) throws IllegalStateException {
1021    ensureState(JobState.DEFINE);
1022    conf.setCombinerKeyGroupingComparator(cls);
1023  }
1024
1025  /**
1026   * Define the comparator that controls how the keys are sorted before they
1027   * are passed to the {@link Reducer}.
1028   * @param cls the raw comparator
1029   * @throws IllegalStateException if the job is submitted
1030   * @see #setCombinerKeyGroupingComparatorClass(Class)
1031   */
1032  public void setSortComparatorClass(Class<? extends RawComparator> cls
1033                                     ) throws IllegalStateException {
1034    ensureState(JobState.DEFINE);
1035    conf.setOutputKeyComparatorClass(cls);
1036  }
1037
1038  /**
1039   * Define the comparator that controls which keys are grouped together
1040   * for a single call to 
1041   * {@link Reducer#reduce(Object, Iterable, 
1042   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
1043   * @param cls the raw comparator to use
1044   * @throws IllegalStateException if the job is submitted
1045   * @see #setCombinerKeyGroupingComparatorClass(Class)
1046   */
1047  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
1048                                         ) throws IllegalStateException {
1049    ensureState(JobState.DEFINE);
1050    conf.setOutputValueGroupingComparator(cls);
1051  }
1052
1053  /**
1054   * Set the user-specified job name.
1055   * 
1056   * @param name the job's new name.
1057   * @throws IllegalStateException if the job is submitted
1058   */
1059  public void setJobName(String name) throws IllegalStateException {
1060    ensureState(JobState.DEFINE);
1061    conf.setJobName(name);
1062  }
1063
1064  /**
1065   * Turn speculative execution on or off for this job. 
1066   * 
1067   * @param speculativeExecution <code>true</code> if speculative execution 
1068   *                             should be turned on, else <code>false</code>.
1069   */
1070  public void setSpeculativeExecution(boolean speculativeExecution) {
1071    ensureState(JobState.DEFINE);
1072    conf.setSpeculativeExecution(speculativeExecution);
1073  }
1074
1075  /**
1076   * Turn speculative execution on or off for this job for map tasks. 
1077   * 
1078   * @param speculativeExecution <code>true</code> if speculative execution 
1079   *                             should be turned on for map tasks,
1080   *                             else <code>false</code>.
1081   */
1082  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1083    ensureState(JobState.DEFINE);
1084    conf.setMapSpeculativeExecution(speculativeExecution);
1085  }
1086
1087  /**
1088   * Turn speculative execution on or off for this job for reduce tasks. 
1089   * 
1090   * @param speculativeExecution <code>true</code> if speculative execution 
1091   *                             should be turned on for reduce tasks,
1092   *                             else <code>false</code>.
1093   */
1094  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1095    ensureState(JobState.DEFINE);
1096    conf.setReduceSpeculativeExecution(speculativeExecution);
1097  }
1098
1099  /**
1100   * Specify whether job-setup and job-cleanup is needed for the job 
1101   * 
1102   * @param needed If <code>true</code>, job-setup and job-cleanup will be
1103   *               considered from {@link OutputCommitter} 
1104   *               else ignored.
1105   */
1106  public void setJobSetupCleanupNeeded(boolean needed) {
1107    ensureState(JobState.DEFINE);
1108    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1109  }
1110
1111  /**
1112   * Set the given set of archives
1113   * @param archives The list of archives that need to be localized
1114   */
1115  public void setCacheArchives(URI[] archives) {
1116    ensureState(JobState.DEFINE);
1117    DistributedCache.setCacheArchives(archives, conf);
1118  }
1119
1120  /**
1121   * Set the given set of files
1122   * @param files The list of files that need to be localized
1123   */
1124  public void setCacheFiles(URI[] files) {
1125    ensureState(JobState.DEFINE);
1126    DistributedCache.setCacheFiles(files, conf);
1127  }
1128
1129  /**
1130   * Add a archives to be localized
1131   * @param uri The uri of the cache to be localized
1132   */
1133  public void addCacheArchive(URI uri) {
1134    ensureState(JobState.DEFINE);
1135    DistributedCache.addCacheArchive(uri, conf);
1136  }
1137  
1138  /**
1139   * Add a file to be localized
1140   * @param uri The uri of the cache to be localized
1141   */
1142  public void addCacheFile(URI uri) {
1143    ensureState(JobState.DEFINE);
1144    DistributedCache.addCacheFile(uri, conf);
1145  }
1146
1147  /**
1148   * Add an file path to the current set of classpath entries It adds the file
1149   * to cache as well.
1150   * 
1151   * Files added with this method will not be unpacked while being added to the
1152   * classpath.
1153   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1154   * method instead.
1155   *
1156   * @param file Path of the file to be added
1157   */
1158  public void addFileToClassPath(Path file)
1159    throws IOException {
1160    ensureState(JobState.DEFINE);
1161    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1162  }
1163
1164  /**
1165   * Add an archive path to the current set of classpath entries. It adds the
1166   * archive to cache as well.
1167   * 
1168   * Archive files will be unpacked and added to the classpath
1169   * when being distributed.
1170   *
1171   * @param archive Path of the archive to be added
1172   */
1173  public void addArchiveToClassPath(Path archive)
1174    throws IOException {
1175    ensureState(JobState.DEFINE);
1176    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1177  }
1178
1179  /**
1180   * Originally intended to enable symlinks, but currently symlinks cannot be
1181   * disabled.
1182   */
1183  @Deprecated
1184  public void createSymlink() {
1185    ensureState(JobState.DEFINE);
1186    DistributedCache.createSymlink(conf);
1187  }
1188  
1189  /** 
1190   * Expert: Set the number of maximum attempts that will be made to run a
1191   * map task.
1192   * 
1193   * @param n the number of attempts per map task.
1194   */
1195  public void setMaxMapAttempts(int n) {
1196    ensureState(JobState.DEFINE);
1197    conf.setMaxMapAttempts(n);
1198  }
1199
1200  /** 
1201   * Expert: Set the number of maximum attempts that will be made to run a
1202   * reduce task.
1203   * 
1204   * @param n the number of attempts per reduce task.
1205   */
1206  public void setMaxReduceAttempts(int n) {
1207    ensureState(JobState.DEFINE);
1208    conf.setMaxReduceAttempts(n);
1209  }
1210
1211  /**
1212   * Set whether the system should collect profiler information for some of 
1213   * the tasks in this job? The information is stored in the user log 
1214   * directory.
1215   * @param newValue true means it should be gathered
1216   */
1217  public void setProfileEnabled(boolean newValue) {
1218    ensureState(JobState.DEFINE);
1219    conf.setProfileEnabled(newValue);
1220  }
1221
1222  /**
1223   * Set the profiler configuration arguments. If the string contains a '%s' it
1224   * will be replaced with the name of the profiling output file when the task
1225   * runs.
1226   *
1227   * This value is passed to the task child JVM on the command line.
1228   *
1229   * @param value the configuration string
1230   */
1231  public void setProfileParams(String value) {
1232    ensureState(JobState.DEFINE);
1233    conf.setProfileParams(value);
1234  }
1235
1236  /**
1237   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1238   * must also be called.
1239   * @param newValue a set of integer ranges of the map ids
1240   */
1241  public void setProfileTaskRange(boolean isMap, String newValue) {
1242    ensureState(JobState.DEFINE);
1243    conf.setProfileTaskRange(isMap, newValue);
1244  }
1245
1246  private void ensureNotSet(String attr, String msg) throws IOException {
1247    if (conf.get(attr) != null) {
1248      throw new IOException(attr + " is incompatible with " + msg + " mode.");
1249    }    
1250  }
1251  
1252  /**
1253   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1254   * tokens upon job completion. Defaults to true.
1255   */
1256  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1257    ensureState(JobState.DEFINE);
1258    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1259  }
1260
1261  /**
1262   * Default to the new APIs unless they are explicitly set or the old mapper or
1263   * reduce attributes are used.
1264   * @throws IOException if the configuration is inconsistent
1265   */
1266  private void setUseNewAPI() throws IOException {
1267    int numReduces = conf.getNumReduceTasks();
1268    String oldMapperClass = "mapred.mapper.class";
1269    String oldReduceClass = "mapred.reducer.class";
1270    conf.setBooleanIfUnset("mapred.mapper.new-api",
1271                           conf.get(oldMapperClass) == null);
1272    if (conf.getUseNewMapper()) {
1273      String mode = "new map API";
1274      ensureNotSet("mapred.input.format.class", mode);
1275      ensureNotSet(oldMapperClass, mode);
1276      if (numReduces != 0) {
1277        ensureNotSet("mapred.partitioner.class", mode);
1278       } else {
1279        ensureNotSet("mapred.output.format.class", mode);
1280      }      
1281    } else {
1282      String mode = "map compatibility";
1283      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1284      ensureNotSet(MAP_CLASS_ATTR, mode);
1285      if (numReduces != 0) {
1286        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1287       } else {
1288        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1289      }
1290    }
1291    if (numReduces != 0) {
1292      conf.setBooleanIfUnset("mapred.reducer.new-api",
1293                             conf.get(oldReduceClass) == null);
1294      if (conf.getUseNewReducer()) {
1295        String mode = "new reduce API";
1296        ensureNotSet("mapred.output.format.class", mode);
1297        ensureNotSet(oldReduceClass, mode);   
1298      } else {
1299        String mode = "reduce compatibility";
1300        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1301        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1302      }
1303    }   
1304  }
1305
1306  private synchronized void connect()
1307          throws IOException, InterruptedException, ClassNotFoundException {
1308    if (cluster == null) {
1309      cluster = 
1310        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1311                   public Cluster run()
1312                          throws IOException, InterruptedException, 
1313                                 ClassNotFoundException {
1314                     return new Cluster(getConfiguration());
1315                   }
1316                 });
1317    }
1318  }
1319
1320  boolean isConnected() {
1321    return cluster != null;
1322  }
1323
1324  /** Only for mocking via unit tests. */
1325  @Private
1326  public JobSubmitter getJobSubmitter(FileSystem fs, 
1327      ClientProtocol submitClient) throws IOException {
1328    return new JobSubmitter(fs, submitClient);
1329  }
1330  /**
1331   * Submit the job to the cluster and return immediately.
1332   * @throws IOException
1333   */
1334  public void submit() 
1335         throws IOException, InterruptedException, ClassNotFoundException {
1336    ensureState(JobState.DEFINE);
1337    setUseNewAPI();
1338    connect();
1339    final JobSubmitter submitter = 
1340        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1341    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1342      public JobStatus run() throws IOException, InterruptedException, 
1343      ClassNotFoundException {
1344        return submitter.submitJobInternal(Job.this, cluster);
1345      }
1346    });
1347    state = JobState.RUNNING;
1348    LOG.info("The url to track the job: " + getTrackingURL());
1349   }
1350  
1351  /**
1352   * Submit the job to the cluster and wait for it to finish.
1353   * @param verbose print the progress to the user
1354   * @return true if the job succeeded
1355   * @throws IOException thrown if the communication with the 
1356   *         <code>JobTracker</code> is lost
1357   */
1358  public boolean waitForCompletion(boolean verbose
1359                                   ) throws IOException, InterruptedException,
1360                                            ClassNotFoundException {
1361    if (state == JobState.DEFINE) {
1362      submit();
1363    }
1364    if (verbose) {
1365      monitorAndPrintJob();
1366    } else {
1367      // get the completion poll interval from the client.
1368      int completionPollIntervalMillis = 
1369        Job.getCompletionPollInterval(cluster.getConf());
1370      while (!isComplete()) {
1371        try {
1372          Thread.sleep(completionPollIntervalMillis);
1373        } catch (InterruptedException ie) {
1374        }
1375      }
1376    }
1377    return isSuccessful();
1378  }
1379  
1380  /**
1381   * Monitor a job and print status in real-time as progress is made and tasks 
1382   * fail.
1383   * @return true if the job succeeded
1384   * @throws IOException if communication to the JobTracker fails
1385   */
1386  public boolean monitorAndPrintJob() 
1387      throws IOException, InterruptedException {
1388    String lastReport = null;
1389    Job.TaskStatusFilter filter;
1390    Configuration clientConf = getConfiguration();
1391    filter = Job.getTaskOutputFilter(clientConf);
1392    JobID jobId = getJobID();
1393    LOG.info("Running job: " + jobId);
1394    int eventCounter = 0;
1395    boolean profiling = getProfileEnabled();
1396    IntegerRanges mapRanges = getProfileTaskRange(true);
1397    IntegerRanges reduceRanges = getProfileTaskRange(false);
1398    int progMonitorPollIntervalMillis = 
1399      Job.getProgressPollInterval(clientConf);
1400    /* make sure to report full progress after the job is done */
1401    boolean reportedAfterCompletion = false;
1402    boolean reportedUberMode = false;
1403    while (!isComplete() || !reportedAfterCompletion) {
1404      if (isComplete()) {
1405        reportedAfterCompletion = true;
1406      } else {
1407        Thread.sleep(progMonitorPollIntervalMillis);
1408      }
1409      if (status.getState() == JobStatus.State.PREP) {
1410        continue;
1411      }      
1412      if (!reportedUberMode) {
1413        reportedUberMode = true;
1414        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1415      }      
1416      String report = 
1417        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1418            " reduce " + 
1419            StringUtils.formatPercent(reduceProgress(), 0));
1420      if (!report.equals(lastReport)) {
1421        LOG.info(report);
1422        lastReport = report;
1423      }
1424
1425      TaskCompletionEvent[] events = 
1426        getTaskCompletionEvents(eventCounter, 10); 
1427      eventCounter += events.length;
1428      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1429    }
1430    boolean success = isSuccessful();
1431    if (success) {
1432      LOG.info("Job " + jobId + " completed successfully");
1433    } else {
1434      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1435          " due to: " + status.getFailureInfo());
1436    }
1437    Counters counters = getCounters();
1438    if (counters != null) {
1439      LOG.info(counters.toString());
1440    }
1441    return success;
1442  }
1443
1444  private void printTaskEvents(TaskCompletionEvent[] events,
1445      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1446      IntegerRanges reduceRanges) throws IOException, InterruptedException {
1447    for (TaskCompletionEvent event : events) {
1448      switch (filter) {
1449      case NONE:
1450        break;
1451      case SUCCEEDED:
1452        if (event.getStatus() == 
1453          TaskCompletionEvent.Status.SUCCEEDED) {
1454          LOG.info(event.toString());
1455        }
1456        break; 
1457      case FAILED:
1458        if (event.getStatus() == 
1459          TaskCompletionEvent.Status.FAILED) {
1460          LOG.info(event.toString());
1461          // Displaying the task diagnostic information
1462          TaskAttemptID taskId = event.getTaskAttemptId();
1463          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1464          if (taskDiagnostics != null) {
1465            for (String diagnostics : taskDiagnostics) {
1466              System.err.println(diagnostics);
1467            }
1468          }
1469        }
1470        break; 
1471      case KILLED:
1472        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1473          LOG.info(event.toString());
1474        }
1475        break; 
1476      case ALL:
1477        LOG.info(event.toString());
1478        break;
1479      }
1480    }
1481  }
1482
1483  /** The interval at which monitorAndPrintJob() prints status */
1484  public static int getProgressPollInterval(Configuration conf) {
1485    // Read progress monitor poll interval from config. Default is 1 second.
1486    int progMonitorPollIntervalMillis = conf.getInt(
1487      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1488    if (progMonitorPollIntervalMillis < 1) {
1489      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1490        " has been set to an invalid value; "
1491        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1492      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1493    }
1494    return progMonitorPollIntervalMillis;
1495  }
1496
1497  /** The interval at which waitForCompletion() should check. */
1498  public static int getCompletionPollInterval(Configuration conf) {
1499    int completionPollIntervalMillis = conf.getInt(
1500      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1501    if (completionPollIntervalMillis < 1) { 
1502      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1503       " has been set to an invalid value; "
1504       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1505      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1506    }
1507    return completionPollIntervalMillis;
1508  }
1509
1510  /**
1511   * Get the task output filter.
1512   * 
1513   * @param conf the configuration.
1514   * @return the filter level.
1515   */
1516  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1517    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1518  }
1519
1520  /**
1521   * Modify the Configuration to set the task output filter.
1522   * 
1523   * @param conf the Configuration to modify.
1524   * @param newValue the value to set.
1525   */
1526  public static void setTaskOutputFilter(Configuration conf, 
1527      TaskStatusFilter newValue) {
1528    conf.set(Job.OUTPUT_FILTER, newValue.toString());
1529  }
1530
1531  public boolean isUber() throws IOException, InterruptedException {
1532    ensureState(JobState.RUNNING);
1533    updateStatus();
1534    return status.isUber();
1535  }
1536
1537  /**
1538   * Get the reservation to which the job is submitted to, if any
1539   *
1540   * @return the reservationId the identifier of the job's reservation, null if
1541   *         the job does not have any reservation associated with it
1542   */
1543  public ReservationId getReservationId() {
1544    return reservationId;
1545  }
1546
1547  /**
1548   * Set the reservation to which the job is submitted to
1549   *
1550   * @param reservationId the reservationId to set
1551   */
1552  public void setReservationId(ReservationId reservationId) {
1553    this.reservationId = reservationId;
1554  }
1555  
1556}