001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.logaggregation; 020 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.DataOutputStream; 025import java.io.EOFException; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.io.OutputStream; 031import java.io.PrintStream; 032import java.io.Writer; 033import java.nio.charset.Charset; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collections; 038import java.util.EnumSet; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.Iterator; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.Set; 046import java.util.regex.Pattern; 047 048import org.apache.commons.io.input.BoundedInputStream; 049import org.apache.commons.io.output.WriterOutputStream; 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.apache.commons.math3.util.Pair; 053import org.apache.hadoop.classification.InterfaceAudience.Private; 054import org.apache.hadoop.classification.InterfaceAudience.Public; 055import org.apache.hadoop.classification.InterfaceStability.Evolving; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.CreateFlag; 058import org.apache.hadoop.fs.FSDataInputStream; 059import org.apache.hadoop.fs.FSDataOutputStream; 060import org.apache.hadoop.fs.FileContext; 061import org.apache.hadoop.fs.Options; 062import org.apache.hadoop.fs.Path; 063import org.apache.hadoop.fs.permission.FsPermission; 064import org.apache.hadoop.io.IOUtils; 065import org.apache.hadoop.io.SecureIOUtils; 066import org.apache.hadoop.io.Writable; 067import org.apache.hadoop.io.file.tfile.TFile; 068import org.apache.hadoop.security.UserGroupInformation; 069import org.apache.hadoop.yarn.api.records.ApplicationAccessType; 070import org.apache.hadoop.yarn.api.records.ContainerId; 071import org.apache.hadoop.yarn.api.records.LogAggregationContext; 072import org.apache.hadoop.yarn.conf.YarnConfiguration; 073import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 074import org.apache.hadoop.yarn.util.ConverterUtils; 075import org.apache.hadoop.yarn.util.Times; 076 077import com.google.common.annotations.VisibleForTesting; 078import com.google.common.base.Predicate; 079import com.google.common.collect.Iterables; 080import com.google.common.collect.Sets; 081 082@Public 083@Evolving 084public class AggregatedLogFormat { 085 086 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); 087 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL"); 088 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER"); 089 private static final LogKey VERSION_KEY = new LogKey("VERSION"); 090 private static final Map<String, LogKey> RESERVED_KEYS; 091 //Maybe write out the retention policy. 092 //Maybe write out a list of containerLogs skipped by the retention policy. 093 private static final int VERSION = 1; 094 095 /** 096 * Umask for the log file. 097 */ 098 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission 099 .createImmutable((short) (0640 ^ 0777)); 100 101 102 static { 103 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>(); 104 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY); 105 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY); 106 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY); 107 } 108 109 @Public 110 public static class LogKey implements Writable { 111 112 private String keyString; 113 114 public LogKey() { 115 116 } 117 118 public LogKey(ContainerId containerId) { 119 this.keyString = containerId.toString(); 120 } 121 122 public LogKey(String keyString) { 123 this.keyString = keyString; 124 } 125 126 @Override 127 public int hashCode() { 128 return keyString == null ? 0 : keyString.hashCode(); 129 } 130 131 @Override 132 public boolean equals(Object obj) { 133 if (obj instanceof LogKey) { 134 LogKey other = (LogKey) obj; 135 if (this.keyString == null) { 136 return other.keyString == null; 137 } 138 return this.keyString.equals(other.keyString); 139 } 140 return false; 141 } 142 143 @Private 144 @Override 145 public void write(DataOutput out) throws IOException { 146 out.writeUTF(this.keyString); 147 } 148 149 @Private 150 @Override 151 public void readFields(DataInput in) throws IOException { 152 this.keyString = in.readUTF(); 153 } 154 155 @Override 156 public String toString() { 157 return this.keyString; 158 } 159 } 160 161 @Private 162 public static class LogValue { 163 164 private final List<String> rootLogDirs; 165 private final ContainerId containerId; 166 private final String user; 167 private final LogAggregationContext logAggregationContext; 168 private Set<File> uploadedFiles = new HashSet<File>(); 169 private final Set<String> alreadyUploadedLogFiles; 170 private Set<String> allExistingFileMeta = new HashSet<String>(); 171 private final boolean appFinished; 172 private final boolean containerFinished; 173 174 /** 175 * The retention context to determine if log files are older than 176 * the retention policy configured. 177 */ 178 private final LogRetentionContext logRetentionContext; 179 /** 180 * The set of log files that are older than retention policy that will 181 * not be uploaded but ready for deletion. 182 */ 183 private final Set<File> obseleteRetentionLogFiles = new HashSet<File>(); 184 185 // TODO Maybe add a version string here. Instead of changing the version of 186 // the entire k-v format 187 188 public LogValue(List<String> rootLogDirs, ContainerId containerId, 189 String user) { 190 this(rootLogDirs, containerId, user, null, new HashSet<String>(), 191 null, true, true); 192 } 193 194 public LogValue(List<String> rootLogDirs, ContainerId containerId, 195 String user, LogAggregationContext logAggregationContext, 196 Set<String> alreadyUploadedLogFiles, 197 LogRetentionContext retentionContext, boolean appFinished, 198 boolean containerFinished) { 199 this.rootLogDirs = new ArrayList<String>(rootLogDirs); 200 this.containerId = containerId; 201 this.user = user; 202 203 // Ensure logs are processed in lexical order 204 Collections.sort(this.rootLogDirs); 205 this.logAggregationContext = logAggregationContext; 206 this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; 207 this.appFinished = appFinished; 208 this.containerFinished = containerFinished; 209 this.logRetentionContext = retentionContext; 210 } 211 212 @VisibleForTesting 213 public Set<File> getPendingLogFilesToUploadForThisContainer() { 214 Set<File> pendingUploadFiles = new HashSet<File>(); 215 for (String rootLogDir : this.rootLogDirs) { 216 File appLogDir = new File(rootLogDir, 217 this.containerId.getApplicationAttemptId(). 218 getApplicationId().toString()); 219 File containerLogDir = 220 new File(appLogDir, this.containerId.toString()); 221 222 if (!containerLogDir.isDirectory()) { 223 continue; // ContainerDir may have been deleted by the user. 224 } 225 226 pendingUploadFiles 227 .addAll(getPendingLogFilesToUpload(containerLogDir)); 228 } 229 return pendingUploadFiles; 230 } 231 232 public void write(DataOutputStream out, Set<File> pendingUploadFiles) 233 throws IOException { 234 List<File> fileList = new ArrayList<File>(pendingUploadFiles); 235 Collections.sort(fileList); 236 237 for (File logFile : fileList) { 238 // We only aggregate top level files. 239 // Ignore anything inside sub-folders. 240 if (logFile.isDirectory()) { 241 LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it."); 242 continue; 243 } 244 245 FileInputStream in = null; 246 try { 247 in = secureOpenFile(logFile); 248 } catch (IOException e) { 249 logErrorMessage(logFile, e); 250 IOUtils.cleanup(LOG, in); 251 continue; 252 } 253 254 final long fileLength = logFile.length(); 255 // Write the logFile Type 256 out.writeUTF(logFile.getName()); 257 258 // Write the log length as UTF so that it is printable 259 out.writeUTF(String.valueOf(fileLength)); 260 261 // Write the log itself 262 try { 263 byte[] buf = new byte[65535]; 264 int len = 0; 265 long bytesLeft = fileLength; 266 while ((len = in.read(buf)) != -1) { 267 //If buffer contents within fileLength, write 268 if (len < bytesLeft) { 269 out.write(buf, 0, len); 270 bytesLeft-=len; 271 } 272 //else only write contents within fileLength, then exit early 273 else { 274 out.write(buf, 0, (int)bytesLeft); 275 break; 276 } 277 } 278 long newLength = logFile.length(); 279 if(fileLength < newLength) { 280 LOG.warn("Aggregated logs truncated by approximately "+ 281 (newLength-fileLength) +" bytes."); 282 } 283 this.uploadedFiles.add(logFile); 284 } catch (IOException e) { 285 String message = logErrorMessage(logFile, e); 286 out.write(message.getBytes(Charset.forName("UTF-8"))); 287 } finally { 288 IOUtils.cleanup(LOG, in); 289 } 290 } 291 } 292 293 @VisibleForTesting 294 public FileInputStream secureOpenFile(File logFile) throws IOException { 295 return SecureIOUtils.openForRead(logFile, getUser(), null); 296 } 297 298 private static String logErrorMessage(File logFile, Exception e) { 299 String message = "Error aggregating log file. Log file : " 300 + logFile.getAbsolutePath() + ". " + e.getMessage(); 301 LOG.error(message, e); 302 return message; 303 } 304 305 // Added for testing purpose. 306 public String getUser() { 307 return user; 308 } 309 310 private Set<File> getPendingLogFilesToUpload(File containerLogDir) { 311 Set<File> candidates = 312 new HashSet<File>(Arrays.asList(containerLogDir.listFiles())); 313 for (File logFile : candidates) { 314 this.allExistingFileMeta.add(getLogFileMetaData(logFile)); 315 } 316 317 // if log files are older than retention policy, do not upload them. 318 // but schedule them for deletion. 319 if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){ 320 obseleteRetentionLogFiles.addAll(candidates); 321 candidates.clear(); 322 return candidates; 323 } 324 325 Set<File> fileCandidates = new HashSet<File>(candidates); 326 if (this.logAggregationContext != null && candidates.size() > 0) { 327 fileCandidates = getFileCandidates(fileCandidates, this.appFinished); 328 if (!this.appFinished && this.containerFinished) { 329 Set<File> addition = new HashSet<File>(candidates); 330 addition = getFileCandidates(addition, true); 331 fileCandidates.addAll(addition); 332 } 333 } 334 335 return fileCandidates; 336 } 337 338 private Set<File> getFileCandidates(Set<File> candidates, 339 boolean useRegularPattern) { 340 filterFiles( 341 useRegularPattern ? this.logAggregationContext.getIncludePattern() 342 : this.logAggregationContext.getRolledLogsIncludePattern(), 343 candidates, false); 344 345 filterFiles( 346 useRegularPattern ? this.logAggregationContext.getExcludePattern() 347 : this.logAggregationContext.getRolledLogsExcludePattern(), 348 candidates, true); 349 350 Iterable<File> mask = 351 Iterables.filter(candidates, new Predicate<File>() { 352 @Override 353 public boolean apply(File next) { 354 return !alreadyUploadedLogFiles 355 .contains(getLogFileMetaData(next)); 356 } 357 }); 358 return Sets.newHashSet(mask); 359 } 360 361 private void filterFiles(String pattern, Set<File> candidates, 362 boolean exclusion) { 363 if (pattern != null && !pattern.isEmpty()) { 364 Pattern filterPattern = Pattern.compile(pattern); 365 for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr 366 .hasNext();) { 367 File candidate = candidatesItr.next(); 368 boolean match = filterPattern.matcher(candidate.getName()).find(); 369 if ((!match && !exclusion) || (match && exclusion)) { 370 candidatesItr.remove(); 371 } 372 } 373 } 374 } 375 376 public Set<Path> getCurrentUpLoadedFilesPath() { 377 Set<Path> path = new HashSet<Path>(); 378 for (File file : this.uploadedFiles) { 379 path.add(new Path(file.getAbsolutePath())); 380 } 381 return path; 382 } 383 384 public Set<String> getCurrentUpLoadedFileMeta() { 385 Set<String> info = new HashSet<String>(); 386 for (File file : this.uploadedFiles) { 387 info.add(getLogFileMetaData(file)); 388 } 389 return info; 390 } 391 392 public Set<Path> getObseleteRetentionLogFiles() { 393 Set<Path> path = new HashSet<Path>(); 394 for(File file: this.obseleteRetentionLogFiles) { 395 path.add(new Path(file.getAbsolutePath())); 396 } 397 return path; 398 } 399 400 public Set<String> getAllExistingFilesMeta() { 401 return this.allExistingFileMeta; 402 } 403 404 private String getLogFileMetaData(File file) { 405 return containerId.toString() + "_" + file.getName() + "_" 406 + file.lastModified(); 407 } 408 } 409 410 /** 411 * A context for log retention to determine if files are older than 412 * the retention policy configured in YarnConfiguration. 413 */ 414 public static class LogRetentionContext { 415 /** 416 * The time used with logRetentionMillis, to determine ages of 417 * log files and if files need to be uploaded. 418 */ 419 private final long logInitedTimeMillis; 420 /** 421 * The numbers of milli seconds since a log file is created to determine 422 * if we should upload it. -1 if disabled. 423 * see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details. 424 */ 425 private final long logRetentionMillis; 426 427 public LogRetentionContext(long logInitedTimeMillis, long 428 logRetentionMillis) { 429 this.logInitedTimeMillis = logInitedTimeMillis; 430 this.logRetentionMillis = logRetentionMillis; 431 } 432 433 public boolean isDisabled() { 434 return logInitedTimeMillis < 0 || logRetentionMillis < 0; 435 } 436 437 public boolean shouldRetainLog() { 438 return isDisabled() || 439 System.currentTimeMillis() - logInitedTimeMillis < logRetentionMillis; 440 } 441 } 442 443 /** 444 * The writer that writes out the aggregated logs. 445 */ 446 @Private 447 public static class LogWriter { 448 449 private final FSDataOutputStream fsDataOStream; 450 private final TFile.Writer writer; 451 private FileContext fc; 452 453 public LogWriter(final Configuration conf, final Path remoteAppLogFile, 454 UserGroupInformation userUgi) throws IOException { 455 try { 456 this.fsDataOStream = 457 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { 458 @Override 459 public FSDataOutputStream run() throws Exception { 460 fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); 461 fc.setUMask(APP_LOG_FILE_UMASK); 462 return fc.create( 463 remoteAppLogFile, 464 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 465 new Options.CreateOpts[] {}); 466 } 467 }); 468 } catch (InterruptedException e) { 469 throw new IOException(e); 470 } 471 472 // Keys are not sorted: null arg 473 // 256KB minBlockSize : Expected log size for each container too 474 this.writer = 475 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( 476 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, 477 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); 478 //Write the version string 479 writeVersion(); 480 } 481 482 @VisibleForTesting 483 public TFile.Writer getWriter() { 484 return this.writer; 485 } 486 487 private void writeVersion() throws IOException { 488 DataOutputStream out = this.writer.prepareAppendKey(-1); 489 VERSION_KEY.write(out); 490 out.close(); 491 out = this.writer.prepareAppendValue(-1); 492 out.writeInt(VERSION); 493 out.close(); 494 } 495 496 public void writeApplicationOwner(String user) throws IOException { 497 DataOutputStream out = this.writer.prepareAppendKey(-1); 498 APPLICATION_OWNER_KEY.write(out); 499 out.close(); 500 out = this.writer.prepareAppendValue(-1); 501 out.writeUTF(user); 502 out.close(); 503 } 504 505 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls) 506 throws IOException { 507 DataOutputStream out = this.writer.prepareAppendKey(-1); 508 APPLICATION_ACL_KEY.write(out); 509 out.close(); 510 out = this.writer.prepareAppendValue(-1); 511 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) { 512 out.writeUTF(entry.getKey().toString()); 513 out.writeUTF(entry.getValue()); 514 } 515 out.close(); 516 } 517 518 public void append(LogKey logKey, LogValue logValue) throws IOException { 519 Set<File> pendingUploadFiles = 520 logValue.getPendingLogFilesToUploadForThisContainer(); 521 if (pendingUploadFiles.size() == 0) { 522 return; 523 } 524 DataOutputStream out = this.writer.prepareAppendKey(-1); 525 logKey.write(out); 526 out.close(); 527 out = this.writer.prepareAppendValue(-1); 528 logValue.write(out, pendingUploadFiles); 529 out.close(); 530 } 531 532 public void close() { 533 try { 534 this.writer.close(); 535 } catch (IOException e) { 536 LOG.warn("Exception closing writer", e); 537 } 538 IOUtils.closeStream(fsDataOStream); 539 } 540 } 541 542 @Public 543 @Evolving 544 public static class LogReader { 545 546 private final FSDataInputStream fsDataIStream; 547 private final TFile.Reader.Scanner scanner; 548 private final TFile.Reader reader; 549 550 public LogReader(Configuration conf, Path remoteAppLogFile) 551 throws IOException { 552 FileContext fileContext = 553 FileContext.getFileContext(remoteAppLogFile.toUri(), conf); 554 this.fsDataIStream = fileContext.open(remoteAppLogFile); 555 reader = 556 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( 557 remoteAppLogFile).getLen(), conf); 558 this.scanner = reader.createScanner(); 559 } 560 561 private boolean atBeginning = true; 562 563 /** 564 * Returns the owner of the application. 565 * 566 * @return the application owner. 567 * @throws IOException 568 */ 569 public String getApplicationOwner() throws IOException { 570 TFile.Reader.Scanner ownerScanner = null; 571 try { 572 ownerScanner = reader.createScanner(); 573 LogKey key = new LogKey(); 574 while (!ownerScanner.atEnd()) { 575 TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); 576 key.readFields(entry.getKeyStream()); 577 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { 578 DataInputStream valueStream = entry.getValueStream(); 579 return valueStream.readUTF(); 580 } 581 ownerScanner.advance(); 582 } 583 return null; 584 } finally { 585 IOUtils.cleanup(LOG, ownerScanner); 586 } 587 } 588 589 /** 590 * Returns ACLs for the application. An empty map is returned if no ACLs are 591 * found. 592 * 593 * @return a map of the Application ACLs. 594 * @throws IOException 595 */ 596 public Map<ApplicationAccessType, String> getApplicationAcls() 597 throws IOException { 598 // TODO Seek directly to the key once a comparator is specified. 599 TFile.Reader.Scanner aclScanner = null; 600 try { 601 aclScanner = reader.createScanner(); 602 LogKey key = new LogKey(); 603 Map<ApplicationAccessType, String> acls = 604 new HashMap<ApplicationAccessType, String>(); 605 while (!aclScanner.atEnd()) { 606 TFile.Reader.Scanner.Entry entry = aclScanner.entry(); 607 key.readFields(entry.getKeyStream()); 608 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) { 609 DataInputStream valueStream = entry.getValueStream(); 610 while (true) { 611 String appAccessOp = null; 612 String aclString = null; 613 try { 614 appAccessOp = valueStream.readUTF(); 615 } catch (EOFException e) { 616 // Valid end of stream. 617 break; 618 } 619 try { 620 aclString = valueStream.readUTF(); 621 } catch (EOFException e) { 622 throw new YarnRuntimeException("Error reading ACLs", e); 623 } 624 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); 625 } 626 } 627 aclScanner.advance(); 628 } 629 return acls; 630 } finally { 631 IOUtils.cleanup(LOG, aclScanner); 632 } 633 } 634 635 /** 636 * Read the next key and return the value-stream. 637 * 638 * @param key 639 * @return the valueStream if there are more keys or null otherwise. 640 * @throws IOException 641 */ 642 public DataInputStream next(LogKey key) throws IOException { 643 if (!this.atBeginning) { 644 this.scanner.advance(); 645 } else { 646 this.atBeginning = false; 647 } 648 if (this.scanner.atEnd()) { 649 return null; 650 } 651 TFile.Reader.Scanner.Entry entry = this.scanner.entry(); 652 key.readFields(entry.getKeyStream()); 653 // Skip META keys 654 if (RESERVED_KEYS.containsKey(key.toString())) { 655 return next(key); 656 } 657 DataInputStream valueStream = entry.getValueStream(); 658 return valueStream; 659 } 660 661 /** 662 * Get a ContainerLogsReader to read the logs for 663 * the specified container. 664 * 665 * @param containerId 666 * @return object to read the container's logs or null if the 667 * logs could not be found 668 * @throws IOException 669 */ 670 @Private 671 public ContainerLogsReader getContainerLogsReader( 672 ContainerId containerId) throws IOException { 673 ContainerLogsReader logReader = null; 674 675 final LogKey containerKey = new LogKey(containerId); 676 LogKey key = new LogKey(); 677 DataInputStream valueStream = next(key); 678 while (valueStream != null && !key.equals(containerKey)) { 679 valueStream = next(key); 680 } 681 682 if (valueStream != null) { 683 logReader = new ContainerLogsReader(valueStream); 684 } 685 686 return logReader; 687 } 688 689 //TODO Change Log format and interfaces to be containerId specific. 690 // Avoid returning completeValueStreams. 691// public List<String> getTypesForContainer(DataInputStream valueStream){} 692// 693// /** 694// * @param valueStream 695// * The Log stream for the container. 696// * @param fileType 697// * the log type required. 698// * @return An InputStreamReader for the required log type or null if the 699// * type is not found. 700// * @throws IOException 701// */ 702// public InputStreamReader getLogStreamForType(DataInputStream valueStream, 703// String fileType) throws IOException { 704// valueStream.reset(); 705// try { 706// while (true) { 707// String ft = valueStream.readUTF(); 708// String fileLengthStr = valueStream.readUTF(); 709// long fileLength = Long.parseLong(fileLengthStr); 710// if (ft.equals(fileType)) { 711// BoundedInputStream bis = 712// new BoundedInputStream(valueStream, fileLength); 713// return new InputStreamReader(bis); 714// } else { 715// long totalSkipped = 0; 716// long currSkipped = 0; 717// while (currSkipped != -1 && totalSkipped < fileLength) { 718// currSkipped = valueStream.skip(fileLength - totalSkipped); 719// totalSkipped += currSkipped; 720// } 721// // TODO Verify skip behaviour. 722// if (currSkipped == -1) { 723// return null; 724// } 725// } 726// } 727// } catch (EOFException e) { 728// return null; 729// } 730// } 731 732 /** 733 * Writes all logs for a single container to the provided writer. 734 * @param valueStream 735 * @param writer 736 * @param logUploadedTime 737 * @throws IOException 738 */ 739 public static void readAcontainerLogs(DataInputStream valueStream, 740 Writer writer, long logUploadedTime) throws IOException { 741 OutputStream os = null; 742 PrintStream ps = null; 743 try { 744 os = new WriterOutputStream(writer, Charset.forName("UTF-8")); 745 ps = new PrintStream(os); 746 while (true) { 747 try { 748 readContainerLogs(valueStream, ps, logUploadedTime, Long.MAX_VALUE); 749 } catch (EOFException e) { 750 // EndOfFile 751 return; 752 } 753 } 754 } finally { 755 IOUtils.cleanup(LOG, ps); 756 IOUtils.cleanup(LOG, os); 757 } 758 } 759 760 /** 761 * Writes all logs for a single container to the provided writer. 762 * @param valueStream 763 * @param writer 764 * @throws IOException 765 */ 766 public static void readAcontainerLogs(DataInputStream valueStream, 767 Writer writer) throws IOException { 768 readAcontainerLogs(valueStream, writer, -1); 769 } 770 771 private static void readContainerLogs(DataInputStream valueStream, 772 PrintStream out, long logUploadedTime, long bytes) 773 throws IOException { 774 byte[] buf = new byte[65535]; 775 776 String fileType = valueStream.readUTF(); 777 String fileLengthStr = valueStream.readUTF(); 778 long fileLength = Long.parseLong(fileLengthStr); 779 out.print("LogType:"); 780 out.println(fileType); 781 if (logUploadedTime != -1) { 782 out.print("Log Upload Time:"); 783 out.println(Times.format(logUploadedTime)); 784 } 785 out.print("LogLength:"); 786 out.println(fileLengthStr); 787 out.println("Log Contents:"); 788 789 long toSkip = 0; 790 long totalBytesToRead = fileLength; 791 long skipAfterRead = 0; 792 if (bytes < 0) { 793 long absBytes = Math.abs(bytes); 794 if (absBytes < fileLength) { 795 toSkip = fileLength - absBytes; 796 totalBytesToRead = absBytes; 797 } 798 org.apache.hadoop.io.IOUtils.skipFully( 799 valueStream, toSkip); 800 } else { 801 if (bytes < fileLength) { 802 totalBytesToRead = bytes; 803 skipAfterRead = fileLength - bytes; 804 } 805 } 806 807 long curRead = 0; 808 long pendingRead = totalBytesToRead - curRead; 809 int toRead = 810 pendingRead > buf.length ? buf.length : (int) pendingRead; 811 int len = valueStream.read(buf, 0, toRead); 812 while (len != -1 && curRead < totalBytesToRead) { 813 out.write(buf, 0, len); 814 curRead += len; 815 816 pendingRead = totalBytesToRead - curRead; 817 toRead = 818 pendingRead > buf.length ? buf.length : (int) pendingRead; 819 len = valueStream.read(buf, 0, toRead); 820 } 821 org.apache.hadoop.io.IOUtils.skipFully( 822 valueStream, skipAfterRead); 823 out.println("\nEnd of LogType:" + fileType); 824 out.println(""); 825 } 826 827 /** 828 * Keep calling this till you get a {@link EOFException} for getting logs of 829 * all types for a single container. 830 * 831 * @param valueStream 832 * @param out 833 * @param logUploadedTime 834 * @throws IOException 835 */ 836 public static void readAContainerLogsForALogType( 837 DataInputStream valueStream, PrintStream out, long logUploadedTime) 838 throws IOException { 839 readContainerLogs(valueStream, out, logUploadedTime, Long.MAX_VALUE); 840 } 841 842 /** 843 * Keep calling this till you get a {@link EOFException} for getting logs of 844 * all types for a single container for the specific bytes. 845 * 846 * @param valueStream 847 * @param out 848 * @param logUploadedTime 849 * @param bytes 850 * @throws IOException 851 */ 852 public static void readAContainerLogsForALogType( 853 DataInputStream valueStream, PrintStream out, long logUploadedTime, 854 long bytes) throws IOException { 855 readContainerLogs(valueStream, out, logUploadedTime, bytes); 856 } 857 858 /** 859 * Keep calling this till you get a {@link EOFException} for getting logs of 860 * all types for a single container. 861 * 862 * @param valueStream 863 * @param out 864 * @throws IOException 865 */ 866 public static void readAContainerLogsForALogType( 867 DataInputStream valueStream, PrintStream out) 868 throws IOException { 869 readAContainerLogsForALogType(valueStream, out, -1); 870 } 871 872 /** 873 * Keep calling this till you get a {@link EOFException} for getting logs of 874 * the specific types for a single container. 875 * @param valueStream 876 * @param out 877 * @param logUploadedTime 878 * @param logType 879 * @throws IOException 880 */ 881 public static int readContainerLogsForALogType( 882 DataInputStream valueStream, PrintStream out, long logUploadedTime, 883 List<String> logType) throws IOException { 884 return readContainerLogsForALogType(valueStream, out, logUploadedTime, 885 logType, Long.MAX_VALUE); 886 } 887 888 /** 889 * Keep calling this till you get a {@link EOFException} for getting logs of 890 * the specific types for a single container. 891 * @param valueStream 892 * @param out 893 * @param logUploadedTime 894 * @param logType 895 * @throws IOException 896 */ 897 public static int readContainerLogsForALogType( 898 DataInputStream valueStream, PrintStream out, long logUploadedTime, 899 List<String> logType, long bytes) throws IOException { 900 byte[] buf = new byte[65535]; 901 902 String fileType = valueStream.readUTF(); 903 String fileLengthStr = valueStream.readUTF(); 904 long fileLength = Long.parseLong(fileLengthStr); 905 if (logType.contains(fileType)) { 906 out.print("LogType:"); 907 out.println(fileType); 908 if (logUploadedTime != -1) { 909 out.print("Log Upload Time:"); 910 out.println(Times.format(logUploadedTime)); 911 } 912 out.print("LogLength:"); 913 out.println(fileLengthStr); 914 out.println("Log Contents:"); 915 916 long toSkip = 0; 917 long totalBytesToRead = fileLength; 918 long skipAfterRead = 0; 919 if (bytes < 0) { 920 long absBytes = Math.abs(bytes); 921 if (absBytes < fileLength) { 922 toSkip = fileLength - absBytes; 923 totalBytesToRead = absBytes; 924 } 925 org.apache.hadoop.io.IOUtils.skipFully( 926 valueStream, toSkip); 927 } else { 928 if (bytes < fileLength) { 929 totalBytesToRead = bytes; 930 skipAfterRead = fileLength - bytes; 931 } 932 } 933 934 long curRead = 0; 935 long pendingRead = totalBytesToRead - curRead; 936 int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; 937 int len = valueStream.read(buf, 0, toRead); 938 while (len != -1 && curRead < totalBytesToRead) { 939 out.write(buf, 0, len); 940 curRead += len; 941 942 pendingRead = totalBytesToRead - curRead; 943 toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; 944 len = valueStream.read(buf, 0, toRead); 945 } 946 org.apache.hadoop.io.IOUtils.skipFully( 947 valueStream, skipAfterRead); 948 out.println("\nEnd of LogType:" + fileType); 949 out.println(""); 950 return 0; 951 } else { 952 long totalSkipped = 0; 953 long currSkipped = 0; 954 while (currSkipped != -1 && totalSkipped < fileLength) { 955 currSkipped = valueStream.skip(fileLength - totalSkipped); 956 totalSkipped += currSkipped; 957 } 958 return -1; 959 } 960 } 961 962 @Private 963 public static Pair<String, String> readContainerMetaDataAndSkipData( 964 DataInputStream valueStream, PrintStream out) throws IOException { 965 966 String fileType = valueStream.readUTF(); 967 String fileLengthStr = valueStream.readUTF(); 968 long fileLength = Long.parseLong(fileLengthStr); 969 Pair<String, String> logMeta = new Pair<String, String>( 970 fileType, fileLengthStr); 971 long totalSkipped = 0; 972 long currSkipped = 0; 973 while (currSkipped != -1 && totalSkipped < fileLength) { 974 currSkipped = valueStream.skip(fileLength - totalSkipped); 975 totalSkipped += currSkipped; 976 } 977 return logMeta; 978 } 979 980 public void close() { 981 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); 982 } 983 } 984 985 @Private 986 public static class ContainerLogsReader { 987 private DataInputStream valueStream; 988 private String currentLogType = null; 989 private long currentLogLength = 0; 990 private BoundedInputStream currentLogData = null; 991 private InputStreamReader currentLogISR; 992 993 public ContainerLogsReader(DataInputStream stream) { 994 valueStream = stream; 995 } 996 997 public String nextLog() throws IOException { 998 if (currentLogData != null && currentLogLength > 0) { 999 // seek to the end of the current log, relying on BoundedInputStream 1000 // to prevent seeking past the end of the current log 1001 do { 1002 if (currentLogData.skip(currentLogLength) < 0) { 1003 break; 1004 } 1005 } while (currentLogData.read() != -1); 1006 } 1007 1008 currentLogType = null; 1009 currentLogLength = 0; 1010 currentLogData = null; 1011 currentLogISR = null; 1012 1013 try { 1014 String logType = valueStream.readUTF(); 1015 String logLengthStr = valueStream.readUTF(); 1016 currentLogLength = Long.parseLong(logLengthStr); 1017 currentLogData = 1018 new BoundedInputStream(valueStream, currentLogLength); 1019 currentLogData.setPropagateClose(false); 1020 currentLogISR = new InputStreamReader(currentLogData, 1021 Charset.forName("UTF-8")); 1022 currentLogType = logType; 1023 } catch (EOFException e) { 1024 } 1025 1026 return currentLogType; 1027 } 1028 1029 public String getCurrentLogType() { 1030 return currentLogType; 1031 } 1032 1033 public long getCurrentLogLength() { 1034 return currentLogLength; 1035 } 1036 1037 public long skip(long n) throws IOException { 1038 return currentLogData.skip(n); 1039 } 1040 1041 public int read() throws IOException { 1042 return currentLogData.read(); 1043 } 1044 1045 public int read(byte[] buf, int off, int len) throws IOException { 1046 return currentLogData.read(buf, off, len); 1047 } 1048 1049 public int read(char[] buf, int off, int len) throws IOException { 1050 return currentLogISR.read(buf, off, len); 1051 } 1052 } 1053}