001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.ServiceConfigurationError; 028import java.util.ServiceLoader; 029 030import com.google.common.annotations.VisibleForTesting; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.mapred.JobConf; 040import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 041import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; 042import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 043import org.apache.hadoop.mapreduce.util.ConfigUtil; 044import org.apache.hadoop.mapreduce.v2.LogParams; 045import org.apache.hadoop.security.UserGroupInformation; 046import org.apache.hadoop.security.token.SecretManager.InvalidToken; 047import org.apache.hadoop.security.token.Token; 048 049/** 050 * Provides a way to access information about the map/reduce cluster. 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Evolving 054public class Cluster { 055 056 @InterfaceStability.Evolving 057 public static enum JobTrackerStatus {INITIALIZING, RUNNING}; 058 059 private ClientProtocolProvider clientProtocolProvider; 060 private ClientProtocol client; 061 private UserGroupInformation ugi; 062 private Configuration conf; 063 private FileSystem fs = null; 064 private Path sysDir = null; 065 private Path stagingAreaDir = null; 066 private Path jobHistoryDir = null; 067 private static final Log LOG = LogFactory.getLog(Cluster.class); 068 069 @VisibleForTesting 070 static Iterable<ClientProtocolProvider> frameworkLoader = 071 ServiceLoader.load(ClientProtocolProvider.class); 072 private volatile List<ClientProtocolProvider> providerList = null; 073 074 private void initProviderList() { 075 if (providerList == null) { 076 synchronized (frameworkLoader) { 077 if (providerList == null) { 078 List<ClientProtocolProvider> localProviderList = 079 new ArrayList<ClientProtocolProvider>(); 080 try { 081 for (ClientProtocolProvider provider : frameworkLoader) { 082 localProviderList.add(provider); 083 } 084 } catch(ServiceConfigurationError e) { 085 LOG.info("Failed to instantiate ClientProtocolProvider, please " 086 + "check the /META-INF/services/org.apache." 087 + "hadoop.mapreduce.protocol.ClientProtocolProvider " 088 + "files on the classpath", e); 089 } 090 providerList = localProviderList; 091 } 092 } 093 } 094 } 095 096 static { 097 ConfigUtil.loadResources(); 098 } 099 100 public Cluster(Configuration conf) throws IOException { 101 this(null, conf); 102 } 103 104 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 105 throws IOException { 106 this.conf = conf; 107 this.ugi = UserGroupInformation.getCurrentUser(); 108 initialize(jobTrackAddr, conf); 109 } 110 111 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) 112 throws IOException { 113 114 initProviderList(); 115 final IOException initEx = new IOException( 116 "Cannot initialize Cluster. Please check your configuration for " 117 + MRConfig.FRAMEWORK_NAME 118 + " and the correspond server addresses."); 119 if (jobTrackAddr != null) { 120 LOG.info( 121 "Initializing cluster for Job Tracker=" + jobTrackAddr.toString()); 122 } 123 for (ClientProtocolProvider provider : providerList) { 124 LOG.debug("Trying ClientProtocolProvider : " 125 + provider.getClass().getName()); 126 ClientProtocol clientProtocol = null; 127 try { 128 if (jobTrackAddr == null) { 129 clientProtocol = provider.create(conf); 130 } else { 131 clientProtocol = provider.create(jobTrackAddr, conf); 132 } 133 134 if (clientProtocol != null) { 135 clientProtocolProvider = provider; 136 client = clientProtocol; 137 LOG.debug("Picked " + provider.getClass().getName() 138 + " as the ClientProtocolProvider"); 139 break; 140 } else { 141 LOG.debug("Cannot pick " + provider.getClass().getName() 142 + " as the ClientProtocolProvider - returned null protocol"); 143 } 144 } catch (Exception e) { 145 final String errMsg = "Failed to use " + provider.getClass().getName() 146 + " due to error: "; 147 initEx.addSuppressed(new IOException(errMsg, e)); 148 LOG.info(errMsg, e); 149 } 150 } 151 152 if (null == clientProtocolProvider || null == client) { 153 throw initEx; 154 } 155 } 156 157 ClientProtocol getClient() { 158 return client; 159 } 160 161 Configuration getConf() { 162 return conf; 163 } 164 165 /** 166 * Close the <code>Cluster</code>. 167 * @throws IOException 168 */ 169 public synchronized void close() throws IOException { 170 clientProtocolProvider.close(client); 171 } 172 173 private Job[] getJobs(JobStatus[] stats) throws IOException { 174 List<Job> jobs = new ArrayList<Job>(); 175 for (JobStatus stat : stats) { 176 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile()))); 177 } 178 return jobs.toArray(new Job[0]); 179 } 180 181 /** 182 * Get the file system where job-specific files are stored 183 * 184 * @return object of FileSystem 185 * @throws IOException 186 * @throws InterruptedException 187 */ 188 public synchronized FileSystem getFileSystem() 189 throws IOException, InterruptedException { 190 if (this.fs == null) { 191 try { 192 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 193 public FileSystem run() throws IOException, InterruptedException { 194 final Path sysDir = new Path(client.getSystemDir()); 195 return sysDir.getFileSystem(getConf()); 196 } 197 }); 198 } catch (InterruptedException e) { 199 throw new RuntimeException(e); 200 } 201 } 202 return fs; 203 } 204 205 /** 206 * Get job corresponding to jobid. 207 * 208 * @param jobId 209 * @return object of {@link Job} 210 * @throws IOException 211 * @throws InterruptedException 212 */ 213 public Job getJob(JobID jobId) throws IOException, InterruptedException { 214 JobStatus status = client.getJobStatus(jobId); 215 if (status != null) { 216 final JobConf conf = new JobConf(); 217 final Path jobPath = new Path(client.getFilesystemName(), 218 status.getJobFile()); 219 final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf()); 220 try { 221 conf.addResource(fs.open(jobPath), jobPath.toString()); 222 } catch (FileNotFoundException fnf) { 223 if (LOG.isWarnEnabled()) { 224 LOG.warn("Job conf missing on cluster", fnf); 225 } 226 } 227 return Job.getInstance(this, status, conf); 228 } 229 return null; 230 } 231 232 /** 233 * Get all the queues in cluster. 234 * 235 * @return array of {@link QueueInfo} 236 * @throws IOException 237 * @throws InterruptedException 238 */ 239 public QueueInfo[] getQueues() throws IOException, InterruptedException { 240 return client.getQueues(); 241 } 242 243 /** 244 * Get queue information for the specified name. 245 * 246 * @param name queuename 247 * @return object of {@link QueueInfo} 248 * @throws IOException 249 * @throws InterruptedException 250 */ 251 public QueueInfo getQueue(String name) 252 throws IOException, InterruptedException { 253 return client.getQueue(name); 254 } 255 256 /** 257 * Get log parameters for the specified jobID or taskAttemptID 258 * @param jobID the job id. 259 * @param taskAttemptID the task attempt id. Optional. 260 * @return the LogParams 261 * @throws IOException 262 * @throws InterruptedException 263 */ 264 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) 265 throws IOException, InterruptedException { 266 return client.getLogFileParams(jobID, taskAttemptID); 267 } 268 269 /** 270 * Get current cluster status. 271 * 272 * @return object of {@link ClusterMetrics} 273 * @throws IOException 274 * @throws InterruptedException 275 */ 276 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException { 277 return client.getClusterMetrics(); 278 } 279 280 /** 281 * Get all active trackers in the cluster. 282 * 283 * @return array of {@link TaskTrackerInfo} 284 * @throws IOException 285 * @throws InterruptedException 286 */ 287 public TaskTrackerInfo[] getActiveTaskTrackers() 288 throws IOException, InterruptedException { 289 return client.getActiveTrackers(); 290 } 291 292 /** 293 * Get blacklisted trackers. 294 * 295 * @return array of {@link TaskTrackerInfo} 296 * @throws IOException 297 * @throws InterruptedException 298 */ 299 public TaskTrackerInfo[] getBlackListedTaskTrackers() 300 throws IOException, InterruptedException { 301 return client.getBlacklistedTrackers(); 302 } 303 304 /** 305 * Get all the jobs in cluster. 306 * 307 * @return array of {@link Job} 308 * @throws IOException 309 * @throws InterruptedException 310 * @deprecated Use {@link #getAllJobStatuses()} instead. 311 */ 312 @Deprecated 313 public Job[] getAllJobs() throws IOException, InterruptedException { 314 return getJobs(client.getAllJobs()); 315 } 316 317 /** 318 * Get job status for all jobs in the cluster. 319 * @return job status for all jobs in cluster 320 * @throws IOException 321 * @throws InterruptedException 322 */ 323 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException { 324 return client.getAllJobs(); 325 } 326 327 /** 328 * Grab the jobtracker system directory path where 329 * job-specific files will be placed. 330 * 331 * @return the system directory where job-specific files are to be placed. 332 */ 333 public Path getSystemDir() throws IOException, InterruptedException { 334 if (sysDir == null) { 335 sysDir = new Path(client.getSystemDir()); 336 } 337 return sysDir; 338 } 339 340 /** 341 * Grab the jobtracker's view of the staging directory path where 342 * job-specific files will be placed. 343 * 344 * @return the staging directory where job-specific files are to be placed. 345 */ 346 public Path getStagingAreaDir() throws IOException, InterruptedException { 347 if (stagingAreaDir == null) { 348 stagingAreaDir = new Path(client.getStagingAreaDir()); 349 } 350 return stagingAreaDir; 351 } 352 353 /** 354 * Get the job history file path for a given job id. The job history file at 355 * this path may or may not be existing depending on the job completion state. 356 * The file is present only for the completed jobs. 357 * @param jobId the JobID of the job submitted by the current user. 358 * @return the file path of the job history file 359 * @throws IOException 360 * @throws InterruptedException 361 */ 362 public String getJobHistoryUrl(JobID jobId) throws IOException, 363 InterruptedException { 364 if (jobHistoryDir == null) { 365 jobHistoryDir = new Path(client.getJobHistoryDir()); 366 } 367 return new Path(jobHistoryDir, jobId.toString() + "_" 368 + ugi.getShortUserName()).toString(); 369 } 370 371 /** 372 * Gets the Queue ACLs for current user 373 * @return array of QueueAclsInfo object for current user. 374 * @throws IOException 375 */ 376 public QueueAclsInfo[] getQueueAclsForCurrentUser() 377 throws IOException, InterruptedException { 378 return client.getQueueAclsForCurrentUser(); 379 } 380 381 /** 382 * Gets the root level queues. 383 * @return array of JobQueueInfo object. 384 * @throws IOException 385 */ 386 public QueueInfo[] getRootQueues() throws IOException, InterruptedException { 387 return client.getRootQueues(); 388 } 389 390 /** 391 * Returns immediate children of queueName. 392 * @param queueName 393 * @return array of JobQueueInfo which are children of queueName 394 * @throws IOException 395 */ 396 public QueueInfo[] getChildQueues(String queueName) 397 throws IOException, InterruptedException { 398 return client.getChildQueues(queueName); 399 } 400 401 /** 402 * Get the JobTracker's status. 403 * 404 * @return {@link JobTrackerStatus} of the JobTracker 405 * @throws IOException 406 * @throws InterruptedException 407 */ 408 public JobTrackerStatus getJobTrackerStatus() throws IOException, 409 InterruptedException { 410 return client.getJobTrackerStatus(); 411 } 412 413 /** 414 * Get the tasktracker expiry interval for the cluster 415 * @return the expiry interval in msec 416 */ 417 public long getTaskTrackerExpiryInterval() throws IOException, 418 InterruptedException { 419 return client.getTaskTrackerExpiryInterval(); 420 } 421 422 /** 423 * Get a delegation token for the user from the JobTracker. 424 * @param renewer the user who can renew the token 425 * @return the new token 426 * @throws IOException 427 */ 428 public Token<DelegationTokenIdentifier> 429 getDelegationToken(Text renewer) throws IOException, InterruptedException{ 430 // client has already set the service 431 return client.getDelegationToken(renewer); 432 } 433 434 /** 435 * Renew a delegation token 436 * @param token the token to renew 437 * @return the new expiration time 438 * @throws InvalidToken 439 * @throws IOException 440 * @deprecated Use {@link Token#renew} instead 441 */ 442 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 443 ) throws InvalidToken, IOException, 444 InterruptedException { 445 return token.renew(getConf()); 446 } 447 448 /** 449 * Cancel a delegation token from the JobTracker 450 * @param token the token to cancel 451 * @throws IOException 452 * @deprecated Use {@link Token#cancel} instead 453 */ 454 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 455 ) throws IOException, 456 InterruptedException { 457 token.cancel(getConf()); 458 } 459 460}