001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.v2.hs; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FileAlreadyExistsException; 050import org.apache.hadoop.fs.FileContext; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.fs.UnsupportedFileSystemException; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 059import org.apache.hadoop.hdfs.server.namenode.NameNode; 060import org.apache.hadoop.ipc.RetriableException; 061import org.apache.hadoop.mapred.JobACLsManager; 062import org.apache.hadoop.mapreduce.jobhistory.JobSummary; 063import org.apache.hadoop.mapreduce.v2.api.records.JobId; 064import org.apache.hadoop.mapreduce.v2.app.job.Job; 065import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; 066import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 067import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 068import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 069import org.apache.hadoop.security.AccessControlException; 070import org.apache.hadoop.service.AbstractService; 071import org.apache.hadoop.util.ShutdownThreadsHelper; 072import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; 073import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 074 075import com.google.common.annotations.VisibleForTesting; 076import com.google.common.util.concurrent.ThreadFactoryBuilder; 077import org.apache.hadoop.yarn.util.Clock; 078import org.apache.hadoop.yarn.util.SystemClock; 079 080/** 081 * This class provides a way to interact with history files in a thread safe 082 * manor. 083 */ 084@InterfaceAudience.Public 085@InterfaceStability.Unstable 086public class HistoryFileManager extends AbstractService { 087 private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); 088 private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); 089 090 private static enum HistoryInfoState { 091 IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED 092 }; 093 094 private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils 095 .doneSubdirsBeforeSerialTail(); 096 097 /** 098 * Maps between a serial number (generated based on jobId) and the timestamp 099 * component(s) to which it belongs. Facilitates jobId based searches. If a 100 * jobId is not found in this list - it will not be found. 101 */ 102 private static class SerialNumberIndex { 103 private SortedMap<String, Set<String>> cache; 104 private int maxSize; 105 106 public SerialNumberIndex(int maxSize) { 107 this.cache = new TreeMap<String, Set<String>>(); 108 this.maxSize = maxSize; 109 } 110 111 public synchronized void add(String serialPart, String timestampPart) { 112 if (!cache.containsKey(serialPart)) { 113 cache.put(serialPart, new HashSet<String>()); 114 if (cache.size() > maxSize) { 115 String key = cache.firstKey(); 116 LOG.error("Dropping " + key 117 + " from the SerialNumberIndex. We will no " 118 + "longer be able to see jobs that are in that serial index for " 119 + cache.get(key)); 120 cache.remove(key); 121 } 122 } 123 Set<String> datePartSet = cache.get(serialPart); 124 datePartSet.add(timestampPart); 125 } 126 127 public synchronized void remove(String serialPart, String timeStampPart) { 128 if (cache.containsKey(serialPart)) { 129 Set<String> set = cache.get(serialPart); 130 set.remove(timeStampPart); 131 if (set.isEmpty()) { 132 cache.remove(serialPart); 133 } 134 } 135 } 136 137 public synchronized Set<String> get(String serialPart) { 138 Set<String> found = cache.get(serialPart); 139 if (found != null) { 140 return new HashSet<String>(found); 141 } 142 return null; 143 } 144 } 145 146 /** 147 * Wrapper around {@link ConcurrentSkipListMap} that maintains size along 148 * side for O(1) size() implementation for use in JobListCache. 149 * 150 * Note: The size is not updated atomically with changes additions/removals. 151 * This race can lead to size() returning an incorrect size at times. 152 */ 153 static class JobIdHistoryFileInfoMap { 154 private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; 155 private AtomicInteger mapSize; 156 157 JobIdHistoryFileInfoMap() { 158 cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>(); 159 mapSize = new AtomicInteger(); 160 } 161 162 public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { 163 HistoryFileInfo ret = cache.putIfAbsent(key, value); 164 if (ret == null) { 165 mapSize.incrementAndGet(); 166 } 167 return ret; 168 } 169 170 public HistoryFileInfo remove(JobId key) { 171 HistoryFileInfo ret = cache.remove(key); 172 if (ret != null) { 173 mapSize.decrementAndGet(); 174 } 175 return ret; 176 } 177 178 /** 179 * Returns the recorded size of the internal map. Note that this could be out 180 * of sync with the actual size of the map 181 * @return "recorded" size 182 */ 183 public int size() { 184 return mapSize.get(); 185 } 186 187 public HistoryFileInfo get(JobId key) { 188 return cache.get(key); 189 } 190 191 public NavigableSet<JobId> navigableKeySet() { 192 return cache.navigableKeySet(); 193 } 194 195 public Collection<HistoryFileInfo> values() { 196 return cache.values(); 197 } 198 } 199 200 static class JobListCache { 201 private JobIdHistoryFileInfoMap cache; 202 private int maxSize; 203 private long maxAge; 204 205 public JobListCache(int maxSize, long maxAge) { 206 this.maxSize = maxSize; 207 this.maxAge = maxAge; 208 this.cache = new JobIdHistoryFileInfoMap(); 209 } 210 211 public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { 212 JobId jobId = fileInfo.getJobId(); 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("Adding " + jobId + " to job list cache with " 215 + fileInfo.getJobIndexInfo()); 216 } 217 HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); 218 if (cache.size() > maxSize) { 219 //There is a race here, where more then one thread could be trying to 220 // remove entries. This could result in too many entries being removed 221 // from the cache. This is considered OK as the size of the cache 222 // should be rather large, and we would rather have performance over 223 // keeping the cache size exactly at the maximum. 224 Iterator<JobId> keys = cache.navigableKeySet().iterator(); 225 long cutoff = System.currentTimeMillis() - maxAge; 226 227 // MAPREDUCE-6436: In order to reduce the number of logs written 228 // in case of a lot of move pending histories. 229 JobId firstInIntermediateKey = null; 230 int inIntermediateCount = 0; 231 JobId firstMoveFailedKey = null; 232 int moveFailedCount = 0; 233 234 while(cache.size() > maxSize && keys.hasNext()) { 235 JobId key = keys.next(); 236 HistoryFileInfo firstValue = cache.get(key); 237 if(firstValue != null) { 238 synchronized(firstValue) { 239 if (firstValue.isMovePending()) { 240 if(firstValue.didMoveFail() && 241 firstValue.jobIndexInfo.getFinishTime() <= cutoff) { 242 cache.remove(key); 243 //Now lets try to delete it 244 try { 245 firstValue.delete(); 246 } catch (IOException e) { 247 LOG.error("Error while trying to delete history files" + 248 " that could not be moved to done.", e); 249 } 250 } else { 251 if (firstValue.didMoveFail()) { 252 if (moveFailedCount == 0) { 253 firstMoveFailedKey = key; 254 } 255 moveFailedCount += 1; 256 } else { 257 if (inIntermediateCount == 0) { 258 firstInIntermediateKey = key; 259 } 260 inIntermediateCount += 1; 261 } 262 } 263 } else { 264 cache.remove(key); 265 } 266 } 267 } 268 } 269 // Log output only for first jobhisotry in pendings to restrict 270 // the total number of logs. 271 if (inIntermediateCount > 0) { 272 LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " + 273 "(e.g. " + firstInIntermediateKey + ") from JobListCache " + 274 "because it is not in done yet. Total count is " + 275 inIntermediateCount + "."); 276 } 277 if (moveFailedCount > 0) { 278 LOG.warn("Waiting to remove MOVE_FAILED state histories " + 279 "(e.g. " + firstMoveFailedKey + ") from JobListCache " + 280 "because it is not in done yet. Total count is " + 281 moveFailedCount + "."); 282 } 283 } 284 return old; 285 } 286 287 public void delete(HistoryFileInfo fileInfo) { 288 if (LOG.isDebugEnabled()) { 289 LOG.debug("Removing from cache " + fileInfo); 290 } 291 cache.remove(fileInfo.getJobId()); 292 } 293 294 public Collection<HistoryFileInfo> values() { 295 return new ArrayList<HistoryFileInfo>(cache.values()); 296 } 297 298 public HistoryFileInfo get(JobId jobId) { 299 return cache.get(jobId); 300 } 301 302 public boolean isFull() { 303 return cache.size() >= maxSize; 304 } 305 } 306 307 /** 308 * This class represents a user dir in the intermediate done directory. This 309 * is mostly for locking purposes. 310 */ 311 private class UserLogDir { 312 long modTime = 0; 313 private long scanTime = 0; 314 315 public synchronized void scanIfNeeded(FileStatus fs) { 316 long newModTime = fs.getModificationTime(); 317 // MAPREDUCE-6680: In some Cloud FileSystem, like Azure FS or S3, file's 318 // modification time is truncated into seconds. In that case, 319 // modTime == newModTime doesn't means no file update in the directory, 320 // so we need to have additional check. 321 // Note: modTime (X second Y millisecond) could be casted to X second or 322 // X+1 second. 323 if (modTime != newModTime 324 || (scanTime/1000) == (modTime/1000) 325 || (scanTime/1000 + 1) == (modTime/1000)) { 326 // reset scanTime before scanning happens 327 scanTime = System.currentTimeMillis(); 328 Path p = fs.getPath(); 329 try { 330 scanIntermediateDirectory(p); 331 //If scanning fails, we will scan again. We assume the failure is 332 // temporary. 333 modTime = newModTime; 334 } catch (IOException e) { 335 LOG.error("Error while trying to scan the directory " + p, e); 336 } 337 } else { 338 if (LOG.isDebugEnabled()) { 339 LOG.debug("Scan not needed of " + fs.getPath()); 340 } 341 // reset scanTime 342 scanTime = System.currentTimeMillis(); 343 } 344 } 345 } 346 347 public class HistoryFileInfo { 348 private Path historyFile; 349 private Path confFile; 350 private Path summaryFile; 351 private JobIndexInfo jobIndexInfo; 352 private volatile HistoryInfoState state; 353 354 @VisibleForTesting 355 protected HistoryFileInfo(Path historyFile, Path confFile, 356 Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) { 357 this.historyFile = historyFile; 358 this.confFile = confFile; 359 this.summaryFile = summaryFile; 360 this.jobIndexInfo = jobIndexInfo; 361 state = isInDone ? HistoryInfoState.IN_DONE 362 : HistoryInfoState.IN_INTERMEDIATE; 363 } 364 365 @VisibleForTesting 366 boolean isMovePending() { 367 return state == HistoryInfoState.IN_INTERMEDIATE 368 || state == HistoryInfoState.MOVE_FAILED; 369 } 370 371 @VisibleForTesting 372 boolean didMoveFail() { 373 return state == HistoryInfoState.MOVE_FAILED; 374 } 375 376 /** 377 * @return true if the files backed by this were deleted. 378 */ 379 public boolean isDeleted() { 380 return state == HistoryInfoState.DELETED; 381 } 382 383 @Override 384 public String toString() { 385 return "HistoryFileInfo jobID " + getJobId() 386 + " historyFile = " + historyFile; 387 } 388 389 @VisibleForTesting 390 synchronized void moveToDone() throws IOException { 391 if (LOG.isDebugEnabled()) { 392 LOG.debug("moveToDone: " + historyFile); 393 } 394 if (!isMovePending()) { 395 // It was either deleted or is already in done. Either way do nothing 396 if (LOG.isDebugEnabled()) { 397 LOG.debug("Move no longer pending"); 398 } 399 return; 400 } 401 try { 402 long completeTime = jobIndexInfo.getFinishTime(); 403 if (completeTime == 0) { 404 completeTime = System.currentTimeMillis(); 405 } 406 JobId jobId = jobIndexInfo.getJobId(); 407 408 List<Path> paths = new ArrayList<Path>(2); 409 if (historyFile == null) { 410 LOG.info("No file for job-history with " + jobId + " found in cache!"); 411 } else { 412 paths.add(historyFile); 413 } 414 415 if (confFile == null) { 416 LOG.info("No file for jobConf with " + jobId + " found in cache!"); 417 } else { 418 paths.add(confFile); 419 } 420 421 if (summaryFile == null || !intermediateDoneDirFc.util().exists( 422 summaryFile)) { 423 LOG.info("No summary file for job: " + jobId); 424 } else { 425 String jobSummaryString = getJobSummary(intermediateDoneDirFc, 426 summaryFile); 427 SUMMARY_LOG.info(jobSummaryString); 428 LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); 429 intermediateDoneDirFc.delete(summaryFile, false); 430 summaryFile = null; 431 } 432 433 Path targetDir = canonicalHistoryLogPath(jobId, completeTime); 434 addDirectoryToSerialNumberIndex(targetDir); 435 makeDoneSubdir(targetDir); 436 if (historyFile != null) { 437 Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile 438 .getName())); 439 if (!toPath.equals(historyFile)) { 440 moveToDoneNow(historyFile, toPath); 441 historyFile = toPath; 442 } 443 } 444 if (confFile != null) { 445 Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile 446 .getName())); 447 if (!toPath.equals(confFile)) { 448 moveToDoneNow(confFile, toPath); 449 confFile = toPath; 450 } 451 } 452 state = HistoryInfoState.IN_DONE; 453 } catch (Throwable t) { 454 LOG.error("Error while trying to move a job to done", t); 455 this.state = HistoryInfoState.MOVE_FAILED; 456 } 457 } 458 459 /** 460 * Parse a job from the JobHistoryFile, if the underlying file is not going 461 * to be deleted and the number of tasks associated with the job is not 462 * greater than maxTasksForLoadedJob. 463 * 464 * @return null if the underlying job history file was deleted, or 465 * an {@link UnparsedJob} object representing a partially parsed job 466 * if the job tasks exceeds the configured maximum, or 467 * a {@link CompletedJob} representing a fully parsed job. 468 * @throws IOException 469 * if there is an error trying to read the file if parsed. 470 */ 471 public synchronized Job loadJob() throws IOException { 472 if(isOversized()) { 473 return new UnparsedJob(maxTasksForLoadedJob, jobIndexInfo, this); 474 } else { 475 return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, 476 false, jobIndexInfo.getUser(), this, aclsMgr); 477 } 478 } 479 480 /** 481 * Return the history file. 482 * @return the history file. 483 */ 484 public synchronized Path getHistoryFile() { 485 return historyFile; 486 } 487 488 protected synchronized void delete() throws IOException { 489 if (LOG.isDebugEnabled()) { 490 LOG.debug("deleting " + historyFile + " and " + confFile); 491 } 492 state = HistoryInfoState.DELETED; 493 doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); 494 doneDirFc.delete(doneDirFc.makeQualified(confFile), false); 495 } 496 497 public JobIndexInfo getJobIndexInfo() { 498 return jobIndexInfo; 499 } 500 501 public JobId getJobId() { 502 return jobIndexInfo.getJobId(); 503 } 504 505 public synchronized Path getConfFile() { 506 return confFile; 507 } 508 509 public synchronized Configuration loadConfFile() throws IOException { 510 FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); 511 Configuration jobConf = new Configuration(false); 512 jobConf.addResource(fc.open(confFile), confFile.toString()); 513 return jobConf; 514 } 515 516 private boolean isOversized() { 517 final int totalTasks = jobIndexInfo.getNumReduces() + 518 jobIndexInfo.getNumMaps(); 519 return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob); 520 } 521 } 522 523 private SerialNumberIndex serialNumberIndex = null; 524 protected JobListCache jobListCache = null; 525 526 // Maintains a list of known done subdirectories. 527 private final Set<Path> existingDoneSubdirs = Collections 528 .synchronizedSet(new HashSet<Path>()); 529 530 /** 531 * Maintains a mapping between intermediate user directories and the last 532 * known modification time. 533 */ 534 private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 535 new ConcurrentHashMap<String, UserLogDir>(); 536 537 private JobACLsManager aclsMgr; 538 539 @VisibleForTesting 540 Configuration conf; 541 542 private String serialNumberFormat; 543 544 private Path doneDirPrefixPath = null; // folder for completed jobs 545 private FileContext doneDirFc; // done Dir FileContext 546 547 private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path 548 private FileContext intermediateDoneDirFc; // Intermediate Done Dir 549 // FileContext 550 @VisibleForTesting 551 protected ThreadPoolExecutor moveToDoneExecutor = null; 552 private long maxHistoryAge = 0; 553 554 /** 555 * The maximum number of tasks allowed for a job to be loaded. 556 */ 557 private int maxTasksForLoadedJob = -1; 558 559 public HistoryFileManager() { 560 super(HistoryFileManager.class.getName()); 561 } 562 563 @Override 564 protected void serviceInit(Configuration conf) throws Exception { 565 this.conf = conf; 566 567 int serialNumberLowDigits = 3; 568 serialNumberFormat = ("%0" 569 + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) 570 + "d"); 571 572 long maxFSWaitTime = conf.getLong( 573 JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, 574 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); 575 createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime); 576 577 maxTasksForLoadedJob = conf.getInt( 578 JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, 579 JHAdminConfig.DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX); 580 581 this.aclsMgr = new JobACLsManager(conf); 582 583 maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 584 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); 585 586 jobListCache = createJobListCache(); 587 588 serialNumberIndex = new SerialNumberIndex(conf.getInt( 589 JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, 590 JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); 591 592 int numMoveThreads = conf.getInt( 593 JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, 594 JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); 595 moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads); 596 super.serviceInit(conf); 597 } 598 599 protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) { 600 ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( 601 "MoveIntermediateToDone Thread #%d").build(); 602 return new HadoopThreadPoolExecutor(numMoveThreads, numMoveThreads, 603 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); 604 } 605 606 @VisibleForTesting 607 void createHistoryDirs(Clock clock, long intervalCheckMillis, 608 long timeOutMillis) throws IOException { 609 long start = clock.getTime(); 610 boolean done = false; 611 int counter = 0; 612 while (!done && 613 ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { 614 done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec 615 try { 616 Thread.sleep(intervalCheckMillis); 617 } catch (InterruptedException ex) { 618 throw new YarnRuntimeException(ex); 619 } 620 } 621 if (!done) { 622 throw new YarnRuntimeException("Timed out '" + timeOutMillis+ 623 "ms' waiting for FileSystem to become available"); 624 } 625 } 626 627 /** 628 * Check if the NameNode is still not started yet as indicated by the 629 * exception type and message. 630 * DistributedFileSystem returns a RemoteException with a message stating 631 * SafeModeException in it. So this is only way to check it is because of 632 * being in safe mode. In addition, Name Node may have not started yet, in 633 * which case, the message contains "NameNode still not started". 634 */ 635 private boolean isNameNodeStillNotStarted(Exception ex) { 636 String nameNodeNotStartedMsg = NameNode.composeNotStartedMessage( 637 HdfsServerConstants.NamenodeRole.NAMENODE); 638 return ex.toString().contains("SafeModeException") || 639 (ex instanceof RetriableException && ex.getMessage().contains( 640 nameNodeNotStartedMsg)); 641 } 642 643 /** 644 * Returns TRUE if the history dirs were created, FALSE if they could not 645 * be created because the FileSystem is not reachable or in safe mode and 646 * throws and exception otherwise. 647 */ 648 @VisibleForTesting 649 boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { 650 boolean succeeded = true; 651 String doneDirPrefix = JobHistoryUtils. 652 getConfiguredHistoryServerDoneDirPrefix(conf); 653 try { 654 doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( 655 new Path(doneDirPrefix)); 656 doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); 657 doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); 658 mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( 659 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); 660 } catch (ConnectException ex) { 661 if (logWait) { 662 LOG.info("Waiting for FileSystem at " + 663 doneDirPrefixPath.toUri().getAuthority() + "to be available"); 664 } 665 succeeded = false; 666 } catch (IOException e) { 667 if (isNameNodeStillNotStarted(e)) { 668 succeeded = false; 669 if (logWait) { 670 LOG.info("Waiting for FileSystem at " + 671 doneDirPrefixPath.toUri().getAuthority() + 672 "to be out of safe mode"); 673 } 674 } else { 675 throw new YarnRuntimeException("Error creating done directory: [" 676 + doneDirPrefixPath + "]", e); 677 } 678 } 679 if (succeeded) { 680 String intermediateDoneDirPrefix = JobHistoryUtils. 681 getConfiguredHistoryIntermediateDoneDirPrefix(conf); 682 try { 683 intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( 684 new Path(intermediateDoneDirPrefix)); 685 intermediateDoneDirFc = FileContext.getFileContext( 686 intermediateDoneDirPath.toUri(), conf); 687 mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( 688 JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); 689 } catch (ConnectException ex) { 690 succeeded = false; 691 if (logWait) { 692 LOG.info("Waiting for FileSystem at " + 693 intermediateDoneDirPath.toUri().getAuthority() + 694 "to be available"); 695 } 696 } catch (IOException e) { 697 if (isNameNodeStillNotStarted(e)) { 698 succeeded = false; 699 if (logWait) { 700 LOG.info("Waiting for FileSystem at " + 701 intermediateDoneDirPath.toUri().getAuthority() + 702 "to be out of safe mode"); 703 } 704 } else { 705 throw new YarnRuntimeException( 706 "Error creating intermediate done directory: [" 707 + intermediateDoneDirPath + "]", e); 708 } 709 } 710 } 711 return succeeded; 712 } 713 714 @Override 715 public void serviceStop() throws Exception { 716 ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); 717 super.serviceStop(); 718 } 719 720 protected JobListCache createJobListCache() { 721 return new JobListCache(conf.getInt( 722 JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, 723 JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); 724 } 725 726 private void mkdir(FileContext fc, Path path, FsPermission fsp) 727 throws IOException { 728 if (!fc.util().exists(path)) { 729 try { 730 fc.mkdir(path, fsp, true); 731 732 FileStatus fsStatus = fc.getFileStatus(path); 733 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 734 + ", Expected: " + fsp.toShort()); 735 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 736 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 737 + ", " + fsp); 738 fc.setPermission(path, fsp); 739 } 740 } catch (FileAlreadyExistsException e) { 741 LOG.info("Directory: [" + path + "] already exists."); 742 } 743 } 744 } 745 746 protected HistoryFileInfo createHistoryFileInfo(Path historyFile, 747 Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo, 748 boolean isInDone) { 749 return new HistoryFileInfo( 750 historyFile, confFile, summaryFile, jobIndexInfo, isInDone); 751 } 752 753 /** 754 * Populates index data structures. Should only be called at initialization 755 * times. 756 */ 757 @SuppressWarnings("unchecked") 758 void initExisting() throws IOException { 759 LOG.info("Initializing Existing Jobs..."); 760 List<FileStatus> timestampedDirList = findTimestampedDirectories(); 761 // Sort first just so insertion is in a consistent order 762 Collections.sort(timestampedDirList); 763 for (FileStatus fs : timestampedDirList) { 764 // TODO Could verify the correct format for these directories. 765 addDirectoryToSerialNumberIndex(fs.getPath()); 766 } 767 for (int i= timestampedDirList.size() - 1; 768 i >= 0 && !jobListCache.isFull(); i--) { 769 FileStatus fs = timestampedDirList.get(i); 770 addDirectoryToJobListCache(fs.getPath()); 771 } 772 } 773 774 private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { 775 String serialPart = serialDirPath.getName(); 776 String timeStampPart = JobHistoryUtils 777 .getTimestampPartFromPath(serialDirPath.toString()); 778 if (timeStampPart == null) { 779 LOG.warn("Could not find timestamp portion from path: " 780 + serialDirPath.toString() + ". Continuing with next"); 781 return; 782 } 783 if (serialPart == null) { 784 LOG.warn("Could not find serial portion from path: " 785 + serialDirPath.toString() + ". Continuing with next"); 786 return; 787 } 788 serialNumberIndex.remove(serialPart, timeStampPart); 789 } 790 791 private void addDirectoryToSerialNumberIndex(Path serialDirPath) { 792 if (LOG.isDebugEnabled()) { 793 LOG.debug("Adding " + serialDirPath + " to serial index"); 794 } 795 String serialPart = serialDirPath.getName(); 796 String timestampPart = JobHistoryUtils 797 .getTimestampPartFromPath(serialDirPath.toString()); 798 if (timestampPart == null) { 799 LOG.warn("Could not find timestamp portion from path: " + serialDirPath 800 + ". Continuing with next"); 801 return; 802 } 803 if (serialPart == null) { 804 LOG.warn("Could not find serial portion from path: " 805 + serialDirPath.toString() + ". Continuing with next"); 806 } else { 807 serialNumberIndex.add(serialPart, timestampPart); 808 } 809 } 810 811 private void addDirectoryToJobListCache(Path path) throws IOException { 812 if (LOG.isDebugEnabled()) { 813 LOG.debug("Adding " + path + " to job list cache."); 814 } 815 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, 816 doneDirFc); 817 for (FileStatus fs : historyFileList) { 818 if (LOG.isDebugEnabled()) { 819 LOG.debug("Adding in history for " + fs.getPath()); 820 } 821 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 822 .getName()); 823 String confFileName = JobHistoryUtils 824 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 825 String summaryFileName = JobHistoryUtils 826 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 827 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs 828 .getPath().getParent(), confFileName), new Path(fs.getPath() 829 .getParent(), summaryFileName), jobIndexInfo, true); 830 jobListCache.addIfAbsent(fileInfo); 831 } 832 } 833 834 @VisibleForTesting 835 protected static List<FileStatus> scanDirectory(Path path, FileContext fc, 836 PathFilter pathFilter) throws IOException { 837 path = fc.makeQualified(path); 838 List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); 839 try { 840 RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); 841 while (fileStatusIter.hasNext()) { 842 FileStatus fileStatus = fileStatusIter.next(); 843 Path filePath = fileStatus.getPath(); 844 if (fileStatus.isFile() && pathFilter.accept(filePath)) { 845 jhStatusList.add(fileStatus); 846 } 847 } 848 } catch (FileNotFoundException fe) { 849 LOG.error("Error while scanning directory " + path, fe); 850 } 851 return jhStatusList; 852 } 853 854 protected List<FileStatus> scanDirectoryForHistoryFiles(Path path, 855 FileContext fc) throws IOException { 856 return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); 857 } 858 859 /** 860 * Finds all history directories with a timestamp component by scanning the 861 * filesystem. Used when the JobHistory server is started. 862 * 863 * @return list of history directories 864 */ 865 protected List<FileStatus> findTimestampedDirectories() throws IOException { 866 List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 867 doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); 868 return fsList; 869 } 870 871 /** 872 * Scans the intermediate directory to find user directories. Scans these for 873 * history files if the modification time for the directory has changed. Once 874 * it finds history files it starts the process of moving them to the done 875 * directory. 876 * 877 * @throws IOException 878 * if there was a error while scanning 879 */ 880 void scanIntermediateDirectory() throws IOException { 881 // TODO it would be great to limit how often this happens, except in the 882 // case where we are looking for a particular job. 883 List<FileStatus> userDirList = JobHistoryUtils.localGlobber( 884 intermediateDoneDirFc, intermediateDoneDirPath, ""); 885 LOG.debug("Scanning intermediate dirs"); 886 for (FileStatus userDir : userDirList) { 887 String name = userDir.getPath().getName(); 888 UserLogDir dir = userDirModificationTimeMap.get(name); 889 if(dir == null) { 890 dir = new UserLogDir(); 891 UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); 892 if(old != null) { 893 dir = old; 894 } 895 } 896 dir.scanIfNeeded(userDir); 897 } 898 } 899 900 /** 901 * Scans the specified path and populates the intermediate cache. 902 * 903 * @param absPath 904 * @throws IOException 905 */ 906 private void scanIntermediateDirectory(final Path absPath) throws IOException { 907 if (LOG.isDebugEnabled()) { 908 LOG.debug("Scanning intermediate dir " + absPath); 909 } 910 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, 911 intermediateDoneDirFc); 912 if (LOG.isDebugEnabled()) { 913 LOG.debug("Found " + fileStatusList.size() + " files"); 914 } 915 for (FileStatus fs : fileStatusList) { 916 if (LOG.isDebugEnabled()) { 917 LOG.debug("scanning file: "+ fs.getPath()); 918 } 919 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 920 .getName()); 921 String confFileName = JobHistoryUtils 922 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 923 String summaryFileName = JobHistoryUtils 924 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 925 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs 926 .getPath().getParent(), confFileName), new Path(fs.getPath() 927 .getParent(), summaryFileName), jobIndexInfo, false); 928 929 final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); 930 if (old == null || old.didMoveFail()) { 931 final HistoryFileInfo found = (old == null) ? fileInfo : old; 932 long cutoff = System.currentTimeMillis() - maxHistoryAge; 933 if(found.getJobIndexInfo().getFinishTime() <= cutoff) { 934 try { 935 found.delete(); 936 } catch (IOException e) { 937 LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); 938 } 939 } else { 940 if (LOG.isDebugEnabled()) { 941 LOG.debug("Scheduling move to done of " +found); 942 } 943 moveToDoneExecutor.execute(new Runnable() { 944 @Override 945 public void run() { 946 try { 947 found.moveToDone(); 948 } catch (IOException e) { 949 LOG.info("Failed to process fileInfo for job: " + 950 found.getJobId(), e); 951 } 952 } 953 }); 954 } 955 } else if (!old.isMovePending()) { 956 //This is a duplicate so just delete it 957 if (LOG.isDebugEnabled()) { 958 LOG.debug("Duplicate: deleting"); 959 } 960 fileInfo.delete(); 961 } 962 } 963 } 964 965 /** 966 * Searches the job history file FileStatus list for the specified JobId. 967 * 968 * @param fileStatusList 969 * fileStatus list of Job History Files. 970 * @param jobId 971 * The JobId to find. 972 * @return A FileInfo object for the jobId, null if not found. 973 * @throws IOException 974 */ 975 private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, 976 JobId jobId) throws IOException { 977 for (FileStatus fs : fileStatusList) { 978 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 979 .getName()); 980 if (jobIndexInfo.getJobId().equals(jobId)) { 981 String confFileName = JobHistoryUtils 982 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 983 String summaryFileName = JobHistoryUtils 984 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 985 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path( 986 fs.getPath().getParent(), confFileName), new Path(fs.getPath() 987 .getParent(), summaryFileName), jobIndexInfo, true); 988 return fileInfo; 989 } 990 } 991 return null; 992 } 993 994 /** 995 * Scans old directories known by the idToDateString map for the specified 996 * jobId. If the number of directories is higher than the supported size of 997 * the idToDateString cache, the jobId will not be found. 998 * 999 * @param jobId 1000 * the jobId. 1001 * @return 1002 * @throws IOException 1003 */ 1004 private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { 1005 String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( 1006 jobId, serialNumberFormat); 1007 Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); 1008 if (dateStringSet == null) { 1009 return null; 1010 } 1011 for (String timestampPart : dateStringSet) { 1012 Path logDir = canonicalHistoryLogPath(jobId, timestampPart); 1013 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, 1014 doneDirFc); 1015 HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); 1016 if (fileInfo != null) { 1017 return fileInfo; 1018 } 1019 } 1020 return null; 1021 } 1022 1023 public Collection<HistoryFileInfo> getAllFileInfo() throws IOException { 1024 scanIntermediateDirectory(); 1025 return jobListCache.values(); 1026 } 1027 1028 public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { 1029 // FileInfo available in cache. 1030 HistoryFileInfo fileInfo = jobListCache.get(jobId); 1031 if (fileInfo != null) { 1032 return fileInfo; 1033 } 1034 // OK so scan the intermediate to be sure we did not lose it that way 1035 scanIntermediateDirectory(); 1036 fileInfo = jobListCache.get(jobId); 1037 if (fileInfo != null) { 1038 return fileInfo; 1039 } 1040 1041 // Intermediate directory does not contain job. Search through older ones. 1042 fileInfo = scanOldDirsForJob(jobId); 1043 if (fileInfo != null) { 1044 return fileInfo; 1045 } 1046 return null; 1047 } 1048 1049 private void moveToDoneNow(final Path src, final Path target) 1050 throws IOException { 1051 LOG.info("Moving " + src.toString() + " to " + target.toString()); 1052 intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); 1053 } 1054 1055 private String getJobSummary(FileContext fc, Path path) throws IOException { 1056 Path qPath = fc.makeQualified(path); 1057 FSDataInputStream in = null; 1058 String jobSummaryString = null; 1059 try { 1060 in = fc.open(qPath); 1061 jobSummaryString = in.readUTF(); 1062 } finally { 1063 if (in != null) { 1064 in.close(); 1065 } 1066 } 1067 return jobSummaryString; 1068 } 1069 1070 private void makeDoneSubdir(Path path) throws IOException { 1071 try { 1072 doneDirFc.getFileStatus(path); 1073 existingDoneSubdirs.add(path); 1074 } catch (FileNotFoundException fnfE) { 1075 try { 1076 FsPermission fsp = new FsPermission( 1077 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); 1078 doneDirFc.mkdir(path, fsp, true); 1079 FileStatus fsStatus = doneDirFc.getFileStatus(path); 1080 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 1081 + ", Expected: " + fsp.toShort()); 1082 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 1083 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 1084 + ", " + fsp); 1085 doneDirFc.setPermission(path, fsp); 1086 } 1087 existingDoneSubdirs.add(path); 1088 } catch (FileAlreadyExistsException faeE) { // Nothing to do. 1089 } 1090 } 1091 } 1092 1093 private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { 1094 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1095 id, timestampComponent, serialNumberFormat)); 1096 } 1097 1098 private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { 1099 String timestampComponent = JobHistoryUtils 1100 .timestampDirectoryComponent(millisecondTime); 1101 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1102 id, timestampComponent, serialNumberFormat)); 1103 } 1104 1105 private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { 1106 if (finishTime == 0) { 1107 return fileStatus.getModificationTime(); 1108 } 1109 return finishTime; 1110 } 1111 1112 private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { 1113 jobListCache.delete(fileInfo); 1114 fileInfo.delete(); 1115 } 1116 1117 List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException { 1118 return JobHistoryUtils. 1119 getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff); 1120 } 1121 1122 /** 1123 * Clean up older history files. 1124 * 1125 * @throws IOException 1126 * on any error trying to remove the entries. 1127 */ 1128 @SuppressWarnings("unchecked") 1129 void clean() throws IOException { 1130 long cutoff = System.currentTimeMillis() - maxHistoryAge; 1131 boolean halted = false; 1132 List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); 1133 // Sort in ascending order. Relies on YYYY/MM/DD/Serial 1134 Collections.sort(serialDirList); 1135 for (FileStatus serialDir : serialDirList) { 1136 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( 1137 serialDir.getPath(), doneDirFc); 1138 for (FileStatus historyFile : historyFileList) { 1139 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile 1140 .getPath().getName()); 1141 long effectiveTimestamp = getEffectiveTimestamp( 1142 jobIndexInfo.getFinishTime(), historyFile); 1143 if (effectiveTimestamp <= cutoff) { 1144 HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo 1145 .getJobId()); 1146 if (fileInfo == null) { 1147 String confFileName = JobHistoryUtils 1148 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 1149 1150 fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path( 1151 historyFile.getPath().getParent(), confFileName), null, 1152 jobIndexInfo, true); 1153 } 1154 deleteJobFromDone(fileInfo); 1155 } else { 1156 halted = true; 1157 break; 1158 } 1159 } 1160 if (!halted) { 1161 deleteDir(serialDir); 1162 removeDirectoryFromSerialNumberIndex(serialDir.getPath()); 1163 existingDoneSubdirs.remove(serialDir.getPath()); 1164 } else { 1165 break; // Don't scan any more directories. 1166 } 1167 } 1168 } 1169 1170 protected boolean deleteDir(FileStatus serialDir) 1171 throws AccessControlException, FileNotFoundException, 1172 UnsupportedFileSystemException, IOException { 1173 return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); 1174 } 1175 1176 // for test 1177 @VisibleForTesting 1178 void setMaxHistoryAge(long newValue){ 1179 maxHistoryAge=newValue; 1180 } 1181}