001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.ServiceConfigurationError;
028import java.util.ServiceLoader;
029
030import com.google.common.annotations.VisibleForTesting;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.mapred.JobConf;
040import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
041import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
042import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
043import org.apache.hadoop.mapreduce.util.ConfigUtil;
044import org.apache.hadoop.mapreduce.v2.LogParams;
045import org.apache.hadoop.security.UserGroupInformation;
046import org.apache.hadoop.security.token.SecretManager.InvalidToken;
047import org.apache.hadoop.security.token.Token;
048
049/**
050 * Provides a way to access information about the map/reduce cluster.
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Evolving
054public class Cluster {
055  
056  @InterfaceStability.Evolving
057  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
058
059  private ClientProtocolProvider clientProtocolProvider;
060  private ClientProtocol client;
061  private UserGroupInformation ugi;
062  private Configuration conf;
063  private FileSystem fs = null;
064  private Path sysDir = null;
065  private Path stagingAreaDir = null;
066  private Path jobHistoryDir = null;
067  private static final Log LOG = LogFactory.getLog(Cluster.class);
068
069  @VisibleForTesting
070  static Iterable<ClientProtocolProvider> frameworkLoader =
071      ServiceLoader.load(ClientProtocolProvider.class);
072  private volatile List<ClientProtocolProvider> providerList = null;
073
074  private void initProviderList() {
075    if (providerList == null) {
076      synchronized (frameworkLoader) {
077        if (providerList == null) {
078          List<ClientProtocolProvider> localProviderList =
079              new ArrayList<ClientProtocolProvider>();
080          try {
081            for (ClientProtocolProvider provider : frameworkLoader) {
082              localProviderList.add(provider);
083            }
084          } catch(ServiceConfigurationError e) {
085            LOG.info("Failed to instantiate ClientProtocolProvider, please "
086                         + "check the /META-INF/services/org.apache."
087                         + "hadoop.mapreduce.protocol.ClientProtocolProvider "
088                         + "files on the classpath", e);
089          }
090          providerList = localProviderList;
091        }
092      }
093    }
094  }
095
096  static {
097    ConfigUtil.loadResources();
098  }
099  
100  public Cluster(Configuration conf) throws IOException {
101    this(null, conf);
102  }
103
104  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
105      throws IOException {
106    this.conf = conf;
107    this.ugi = UserGroupInformation.getCurrentUser();
108    initialize(jobTrackAddr, conf);
109  }
110  
111  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
112      throws IOException {
113
114    initProviderList();
115    final IOException initEx = new IOException(
116        "Cannot initialize Cluster. Please check your configuration for "
117            + MRConfig.FRAMEWORK_NAME
118            + " and the correspond server addresses.");
119    if (jobTrackAddr != null) {
120      LOG.info(
121          "Initializing cluster for Job Tracker=" + jobTrackAddr.toString());
122    }
123    for (ClientProtocolProvider provider : providerList) {
124      LOG.debug("Trying ClientProtocolProvider : "
125          + provider.getClass().getName());
126      ClientProtocol clientProtocol = null;
127      try {
128        if (jobTrackAddr == null) {
129          clientProtocol = provider.create(conf);
130        } else {
131          clientProtocol = provider.create(jobTrackAddr, conf);
132        }
133
134        if (clientProtocol != null) {
135          clientProtocolProvider = provider;
136          client = clientProtocol;
137          LOG.debug("Picked " + provider.getClass().getName()
138              + " as the ClientProtocolProvider");
139          break;
140        } else {
141          LOG.debug("Cannot pick " + provider.getClass().getName()
142              + " as the ClientProtocolProvider - returned null protocol");
143        }
144      } catch (Exception e) {
145        final String errMsg = "Failed to use " + provider.getClass().getName()
146            + " due to error: ";
147        initEx.addSuppressed(new IOException(errMsg, e));
148        LOG.info(errMsg, e);
149      }
150    }
151
152    if (null == clientProtocolProvider || null == client) {
153      throw initEx;
154    }
155  }
156
157  ClientProtocol getClient() {
158    return client;
159  }
160  
161  Configuration getConf() {
162    return conf;
163  }
164  
165  /**
166   * Close the <code>Cluster</code>.
167   * @throws IOException
168   */
169  public synchronized void close() throws IOException {
170    clientProtocolProvider.close(client);
171  }
172
173  private Job[] getJobs(JobStatus[] stats) throws IOException {
174    List<Job> jobs = new ArrayList<Job>();
175    for (JobStatus stat : stats) {
176      jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
177    }
178    return jobs.toArray(new Job[0]);
179  }
180
181  /**
182   * Get the file system where job-specific files are stored
183   * 
184   * @return object of FileSystem
185   * @throws IOException
186   * @throws InterruptedException
187   */
188  public synchronized FileSystem getFileSystem() 
189      throws IOException, InterruptedException {
190    if (this.fs == null) {
191      try {
192        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
193          public FileSystem run() throws IOException, InterruptedException {
194            final Path sysDir = new Path(client.getSystemDir());
195            return sysDir.getFileSystem(getConf());
196          }
197        });
198      } catch (InterruptedException e) {
199        throw new RuntimeException(e);
200      }
201    }
202    return fs;
203  }
204
205  /**
206   * Get job corresponding to jobid.
207   * 
208   * @param jobId
209   * @return object of {@link Job}
210   * @throws IOException
211   * @throws InterruptedException
212   */
213  public Job getJob(JobID jobId) throws IOException, InterruptedException {
214    JobStatus status = client.getJobStatus(jobId);
215    if (status != null) {
216      final JobConf conf = new JobConf();
217      final Path jobPath = new Path(client.getFilesystemName(),
218          status.getJobFile());
219      final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf());
220      try {
221        conf.addResource(fs.open(jobPath), jobPath.toString());
222      } catch (FileNotFoundException fnf) {
223        if (LOG.isWarnEnabled()) {
224          LOG.warn("Job conf missing on cluster", fnf);
225        }
226      }
227      return Job.getInstance(this, status, conf);
228    }
229    return null;
230  }
231  
232  /**
233   * Get all the queues in cluster.
234   * 
235   * @return array of {@link QueueInfo}
236   * @throws IOException
237   * @throws InterruptedException
238   */
239  public QueueInfo[] getQueues() throws IOException, InterruptedException {
240    return client.getQueues();
241  }
242  
243  /**
244   * Get queue information for the specified name.
245   * 
246   * @param name queuename
247   * @return object of {@link QueueInfo}
248   * @throws IOException
249   * @throws InterruptedException
250   */
251  public QueueInfo getQueue(String name) 
252      throws IOException, InterruptedException {
253    return client.getQueue(name);
254  }
255
256  /**
257   * Get log parameters for the specified jobID or taskAttemptID
258   * @param jobID the job id.
259   * @param taskAttemptID the task attempt id. Optional.
260   * @return the LogParams
261   * @throws IOException
262   * @throws InterruptedException
263   */
264  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
265      throws IOException, InterruptedException {
266    return client.getLogFileParams(jobID, taskAttemptID);
267  }
268
269  /**
270   * Get current cluster status.
271   * 
272   * @return object of {@link ClusterMetrics}
273   * @throws IOException
274   * @throws InterruptedException
275   */
276  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
277    return client.getClusterMetrics();
278  }
279  
280  /**
281   * Get all active trackers in the cluster.
282   * 
283   * @return array of {@link TaskTrackerInfo}
284   * @throws IOException
285   * @throws InterruptedException
286   */
287  public TaskTrackerInfo[] getActiveTaskTrackers() 
288      throws IOException, InterruptedException  {
289    return client.getActiveTrackers();
290  }
291  
292  /**
293   * Get blacklisted trackers.
294   * 
295   * @return array of {@link TaskTrackerInfo}
296   * @throws IOException
297   * @throws InterruptedException
298   */
299  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
300      throws IOException, InterruptedException  {
301    return client.getBlacklistedTrackers();
302  }
303  
304  /**
305   * Get all the jobs in cluster.
306   * 
307   * @return array of {@link Job}
308   * @throws IOException
309   * @throws InterruptedException
310   * @deprecated Use {@link #getAllJobStatuses()} instead.
311   */
312  @Deprecated
313  public Job[] getAllJobs() throws IOException, InterruptedException {
314    return getJobs(client.getAllJobs());
315  }
316
317  /**
318   * Get job status for all jobs in the cluster.
319   * @return job status for all jobs in cluster
320   * @throws IOException
321   * @throws InterruptedException
322   */
323  public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
324    return client.getAllJobs();
325  }
326
327  /**
328   * Grab the jobtracker system directory path where 
329   * job-specific files will  be placed.
330   * 
331   * @return the system directory where job-specific files are to be placed.
332   */
333  public Path getSystemDir() throws IOException, InterruptedException {
334    if (sysDir == null) {
335      sysDir = new Path(client.getSystemDir());
336    }
337    return sysDir;
338  }
339  
340  /**
341   * Grab the jobtracker's view of the staging directory path where 
342   * job-specific files will  be placed.
343   * 
344   * @return the staging directory where job-specific files are to be placed.
345   */
346  public Path getStagingAreaDir() throws IOException, InterruptedException {
347    if (stagingAreaDir == null) {
348      stagingAreaDir = new Path(client.getStagingAreaDir());
349    }
350    return stagingAreaDir;
351  }
352
353  /**
354   * Get the job history file path for a given job id. The job history file at 
355   * this path may or may not be existing depending on the job completion state.
356   * The file is present only for the completed jobs.
357   * @param jobId the JobID of the job submitted by the current user.
358   * @return the file path of the job history file
359   * @throws IOException
360   * @throws InterruptedException
361   */
362  public String getJobHistoryUrl(JobID jobId) throws IOException, 
363    InterruptedException {
364    if (jobHistoryDir == null) {
365      jobHistoryDir = new Path(client.getJobHistoryDir());
366    }
367    return new Path(jobHistoryDir, jobId.toString() + "_"
368                    + ugi.getShortUserName()).toString();
369  }
370
371  /**
372   * Gets the Queue ACLs for current user
373   * @return array of QueueAclsInfo object for current user.
374   * @throws IOException
375   */
376  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
377      throws IOException, InterruptedException  {
378    return client.getQueueAclsForCurrentUser();
379  }
380
381  /**
382   * Gets the root level queues.
383   * @return array of JobQueueInfo object.
384   * @throws IOException
385   */
386  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
387    return client.getRootQueues();
388  }
389  
390  /**
391   * Returns immediate children of queueName.
392   * @param queueName
393   * @return array of JobQueueInfo which are children of queueName
394   * @throws IOException
395   */
396  public QueueInfo[] getChildQueues(String queueName) 
397      throws IOException, InterruptedException {
398    return client.getChildQueues(queueName);
399  }
400  
401  /**
402   * Get the JobTracker's status.
403   * 
404   * @return {@link JobTrackerStatus} of the JobTracker
405   * @throws IOException
406   * @throws InterruptedException
407   */
408  public JobTrackerStatus getJobTrackerStatus() throws IOException,
409      InterruptedException {
410    return client.getJobTrackerStatus();
411  }
412  
413  /**
414   * Get the tasktracker expiry interval for the cluster
415   * @return the expiry interval in msec
416   */
417  public long getTaskTrackerExpiryInterval() throws IOException,
418      InterruptedException {
419    return client.getTaskTrackerExpiryInterval();
420  }
421
422  /**
423   * Get a delegation token for the user from the JobTracker.
424   * @param renewer the user who can renew the token
425   * @return the new token
426   * @throws IOException
427   */
428  public Token<DelegationTokenIdentifier> 
429      getDelegationToken(Text renewer) throws IOException, InterruptedException{
430    // client has already set the service
431    return client.getDelegationToken(renewer);
432  }
433
434  /**
435   * Renew a delegation token
436   * @param token the token to renew
437   * @return the new expiration time
438   * @throws InvalidToken
439   * @throws IOException
440   * @deprecated Use {@link Token#renew} instead
441   */
442  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
443                                   ) throws InvalidToken, IOException,
444                                            InterruptedException {
445    return token.renew(getConf());
446  }
447
448  /**
449   * Cancel a delegation token from the JobTracker
450   * @param token the token to cancel
451   * @throws IOException
452   * @deprecated Use {@link Token#cancel} instead
453   */
454  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
455                                    ) throws IOException,
456                                             InterruptedException {
457    token.cancel(getConf());
458  }
459
460}