001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.logaggregation;
020
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.EOFException;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStreamReader;
030import java.io.OutputStream;
031import java.io.PrintStream;
032import java.io.Writer;
033import java.nio.charset.Charset;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collections;
038import java.util.EnumSet;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.Iterator;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.Set;
046import java.util.regex.Pattern;
047
048import org.apache.commons.io.input.BoundedInputStream;
049import org.apache.commons.io.output.WriterOutputStream;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.apache.commons.math3.util.Pair;
053import org.apache.hadoop.classification.InterfaceAudience.Private;
054import org.apache.hadoop.classification.InterfaceAudience.Public;
055import org.apache.hadoop.classification.InterfaceStability.Evolving;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.CreateFlag;
058import org.apache.hadoop.fs.FSDataInputStream;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileContext;
061import org.apache.hadoop.fs.Options;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.fs.permission.FsPermission;
064import org.apache.hadoop.io.IOUtils;
065import org.apache.hadoop.io.SecureIOUtils;
066import org.apache.hadoop.io.Writable;
067import org.apache.hadoop.io.file.tfile.TFile;
068import org.apache.hadoop.security.UserGroupInformation;
069import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
070import org.apache.hadoop.yarn.api.records.ContainerId;
071import org.apache.hadoop.yarn.api.records.LogAggregationContext;
072import org.apache.hadoop.yarn.conf.YarnConfiguration;
073import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
074import org.apache.hadoop.yarn.util.ConverterUtils;
075import org.apache.hadoop.yarn.util.Times;
076
077import com.google.common.annotations.VisibleForTesting;
078import com.google.common.base.Predicate;
079import com.google.common.collect.Iterables;
080import com.google.common.collect.Sets;
081
082@Public
083@Evolving
084public class AggregatedLogFormat {
085
086  private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
087  private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
088  private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
089  private static final LogKey VERSION_KEY = new LogKey("VERSION");
090  private static final Map<String, LogKey> RESERVED_KEYS;
091  //Maybe write out the retention policy.
092  //Maybe write out a list of containerLogs skipped by the retention policy.
093  private static final int VERSION = 1;
094
095  /**
096   * Umask for the log file.
097   */
098  private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
099      .createImmutable((short) (0640 ^ 0777));
100
101
102  static {
103    RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
104    RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
105    RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
106    RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
107  }
108
109  @Public
110  public static class LogKey implements Writable {
111
112    private String keyString;
113
114    public LogKey() {
115
116    }
117
118    public LogKey(ContainerId containerId) {
119      this.keyString = containerId.toString();
120    }
121
122    public LogKey(String keyString) {
123      this.keyString = keyString;
124    }
125    
126    @Override
127    public int hashCode() {
128      return keyString == null ? 0 : keyString.hashCode();
129    }
130
131    @Override
132    public boolean equals(Object obj) {
133      if (obj instanceof LogKey) {
134        LogKey other = (LogKey) obj;
135        if (this.keyString == null) {
136          return other.keyString == null;
137        }
138        return this.keyString.equals(other.keyString);
139      }
140      return false;
141    }
142
143    @Private
144    @Override
145    public void write(DataOutput out) throws IOException {
146      out.writeUTF(this.keyString);
147    }
148
149    @Private
150    @Override
151    public void readFields(DataInput in) throws IOException {
152      this.keyString = in.readUTF();
153    }
154
155    @Override
156    public String toString() {
157      return this.keyString;
158    }
159  }
160
161  @Private
162  public static class LogValue {
163
164    private final List<String> rootLogDirs;
165    private final ContainerId containerId;
166    private final String user;
167    private final LogAggregationContext logAggregationContext;
168    private Set<File> uploadedFiles = new HashSet<File>();
169    private final Set<String> alreadyUploadedLogFiles;
170    private Set<String> allExistingFileMeta = new HashSet<String>();
171    private final boolean appFinished;
172    private final boolean containerFinished;
173
174    /**
175     * The retention context to determine if log files are older than
176     * the retention policy configured.
177     */
178    private final LogRetentionContext logRetentionContext;
179    /**
180     * The set of log files that are older than retention policy that will
181     * not be uploaded but ready for deletion.
182     */
183    private final Set<File> obseleteRetentionLogFiles = new HashSet<File>();
184
185    // TODO Maybe add a version string here. Instead of changing the version of
186    // the entire k-v format
187
188    public LogValue(List<String> rootLogDirs, ContainerId containerId,
189        String user) {
190      this(rootLogDirs, containerId, user, null, new HashSet<String>(),
191          null, true, true);
192    }
193
194    public LogValue(List<String> rootLogDirs, ContainerId containerId,
195        String user, LogAggregationContext logAggregationContext,
196        Set<String> alreadyUploadedLogFiles,
197        LogRetentionContext retentionContext, boolean appFinished,
198        boolean containerFinished) {
199      this.rootLogDirs = new ArrayList<String>(rootLogDirs);
200      this.containerId = containerId;
201      this.user = user;
202
203      // Ensure logs are processed in lexical order
204      Collections.sort(this.rootLogDirs);
205      this.logAggregationContext = logAggregationContext;
206      this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
207      this.appFinished = appFinished;
208      this.containerFinished = containerFinished;
209      this.logRetentionContext = retentionContext;
210    }
211
212    @VisibleForTesting
213    public Set<File> getPendingLogFilesToUploadForThisContainer() {
214      Set<File> pendingUploadFiles = new HashSet<File>();
215      for (String rootLogDir : this.rootLogDirs) {
216        File appLogDir = new File(rootLogDir,
217            this.containerId.getApplicationAttemptId().
218                getApplicationId().toString());
219        File containerLogDir =
220            new File(appLogDir, this.containerId.toString());
221
222        if (!containerLogDir.isDirectory()) {
223          continue; // ContainerDir may have been deleted by the user.
224        }
225
226        pendingUploadFiles
227          .addAll(getPendingLogFilesToUpload(containerLogDir));
228      }
229      return pendingUploadFiles;
230    }
231
232    public void write(DataOutputStream out, Set<File> pendingUploadFiles)
233        throws IOException {
234      List<File> fileList = new ArrayList<File>(pendingUploadFiles);
235      Collections.sort(fileList);
236
237      for (File logFile : fileList) {
238        // We only aggregate top level files.
239        // Ignore anything inside sub-folders.
240        if (logFile.isDirectory()) {
241          LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
242          continue;
243        }
244
245        FileInputStream in = null;
246        try {
247          in = secureOpenFile(logFile);
248        } catch (IOException e) {
249          logErrorMessage(logFile, e);
250          IOUtils.cleanup(LOG, in);
251          continue;
252        }
253
254        final long fileLength = logFile.length();
255        // Write the logFile Type
256        out.writeUTF(logFile.getName());
257
258        // Write the log length as UTF so that it is printable
259        out.writeUTF(String.valueOf(fileLength));
260
261        // Write the log itself
262        try {
263          byte[] buf = new byte[65535];
264          int len = 0;
265          long bytesLeft = fileLength;
266          while ((len = in.read(buf)) != -1) {
267            //If buffer contents within fileLength, write
268            if (len < bytesLeft) {
269              out.write(buf, 0, len);
270              bytesLeft-=len;
271            }
272            //else only write contents within fileLength, then exit early
273            else {
274              out.write(buf, 0, (int)bytesLeft);
275              break;
276            }
277          }
278          long newLength = logFile.length();
279          if(fileLength < newLength) {
280            LOG.warn("Aggregated logs truncated by approximately "+
281                (newLength-fileLength) +" bytes.");
282          }
283          this.uploadedFiles.add(logFile);
284        } catch (IOException e) {
285          String message = logErrorMessage(logFile, e);
286          out.write(message.getBytes(Charset.forName("UTF-8")));
287        } finally {
288          IOUtils.cleanup(LOG, in);
289        }
290      }
291    }
292
293    @VisibleForTesting
294    public FileInputStream secureOpenFile(File logFile) throws IOException {
295      return SecureIOUtils.openForRead(logFile, getUser(), null);
296    }
297
298    private static String logErrorMessage(File logFile, Exception e) {
299      String message = "Error aggregating log file. Log file : "
300          + logFile.getAbsolutePath() + ". " + e.getMessage();
301      LOG.error(message, e);
302      return message;
303    }
304
305    // Added for testing purpose.
306    public String getUser() {
307      return user;
308    }
309
310    private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
311      Set<File> candidates =
312          new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
313      for (File logFile : candidates) {
314        this.allExistingFileMeta.add(getLogFileMetaData(logFile));
315      }
316
317      // if log files are older than retention policy, do not upload them.
318      // but schedule them for deletion.
319      if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){
320        obseleteRetentionLogFiles.addAll(candidates);
321        candidates.clear();
322        return candidates;
323      }
324
325      Set<File> fileCandidates = new HashSet<File>(candidates);
326      if (this.logAggregationContext != null && candidates.size() > 0) {
327        fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
328        if (!this.appFinished && this.containerFinished) {
329          Set<File> addition = new HashSet<File>(candidates);
330          addition = getFileCandidates(addition, true);
331          fileCandidates.addAll(addition);
332        }
333      }
334
335      return fileCandidates;
336    }
337
338    private Set<File> getFileCandidates(Set<File> candidates,
339        boolean useRegularPattern) {
340      filterFiles(
341          useRegularPattern ? this.logAggregationContext.getIncludePattern()
342              : this.logAggregationContext.getRolledLogsIncludePattern(),
343          candidates, false);
344
345      filterFiles(
346          useRegularPattern ? this.logAggregationContext.getExcludePattern()
347              : this.logAggregationContext.getRolledLogsExcludePattern(),
348          candidates, true);
349
350      Iterable<File> mask =
351          Iterables.filter(candidates, new Predicate<File>() {
352            @Override
353            public boolean apply(File next) {
354              return !alreadyUploadedLogFiles
355                  .contains(getLogFileMetaData(next));
356            }
357          });
358      return Sets.newHashSet(mask);
359    }
360
361    private void filterFiles(String pattern, Set<File> candidates,
362        boolean exclusion) {
363      if (pattern != null && !pattern.isEmpty()) {
364        Pattern filterPattern = Pattern.compile(pattern);
365        for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
366          .hasNext();) {
367          File candidate = candidatesItr.next();
368          boolean match = filterPattern.matcher(candidate.getName()).find();
369          if ((!match && !exclusion) || (match && exclusion)) {
370            candidatesItr.remove();
371          }
372        }
373      }
374    }
375
376    public Set<Path> getCurrentUpLoadedFilesPath() {
377      Set<Path> path = new HashSet<Path>();
378      for (File file : this.uploadedFiles) {
379        path.add(new Path(file.getAbsolutePath()));
380      }
381      return path;
382    }
383
384    public Set<String> getCurrentUpLoadedFileMeta() {
385      Set<String> info = new HashSet<String>();
386      for (File file : this.uploadedFiles) {
387        info.add(getLogFileMetaData(file));
388      }
389      return info;
390    }
391
392    public Set<Path> getObseleteRetentionLogFiles() {
393      Set<Path> path = new HashSet<Path>();
394      for(File file: this.obseleteRetentionLogFiles) {
395        path.add(new Path(file.getAbsolutePath()));
396      }
397      return path;
398    }
399
400    public Set<String> getAllExistingFilesMeta() {
401      return this.allExistingFileMeta;
402    }
403
404    private String getLogFileMetaData(File file) {
405      return containerId.toString() + "_" + file.getName() + "_"
406          + file.lastModified();
407    }
408  }
409
410  /**
411   * A context for log retention to determine if files are older than
412   * the retention policy configured in YarnConfiguration.
413   */
414  public static class LogRetentionContext {
415    /**
416     * The time used with logRetentionMillis, to determine ages of
417     * log files and if files need to be uploaded.
418     */
419    private final long logInitedTimeMillis;
420    /**
421     * The numbers of milli seconds since a log file is created to determine
422     * if we should upload it. -1 if disabled.
423     * see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details.
424     */
425    private final long logRetentionMillis;
426
427    public LogRetentionContext(long logInitedTimeMillis, long
428        logRetentionMillis) {
429      this.logInitedTimeMillis = logInitedTimeMillis;
430      this.logRetentionMillis = logRetentionMillis;
431    }
432
433    public boolean isDisabled() {
434      return logInitedTimeMillis < 0 || logRetentionMillis < 0;
435    }
436
437    public boolean shouldRetainLog() {
438      return isDisabled() ||
439          System.currentTimeMillis() - logInitedTimeMillis < logRetentionMillis;
440    }
441  }
442
443  /**
444   * The writer that writes out the aggregated logs.
445   */
446  @Private
447  public static class LogWriter {
448
449    private final FSDataOutputStream fsDataOStream;
450    private final TFile.Writer writer;
451    private FileContext fc;
452
453    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
454        UserGroupInformation userUgi) throws IOException {
455      try {
456        this.fsDataOStream =
457            userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
458              @Override
459              public FSDataOutputStream run() throws Exception {
460                fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
461                fc.setUMask(APP_LOG_FILE_UMASK);
462                return fc.create(
463                    remoteAppLogFile,
464                    EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
465                    new Options.CreateOpts[] {});
466              }
467            });
468      } catch (InterruptedException e) {
469        throw new IOException(e);
470      }
471
472      // Keys are not sorted: null arg
473      // 256KB minBlockSize : Expected log size for each container too
474      this.writer =
475          new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
476              YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
477              YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
478      //Write the version string
479      writeVersion();
480    }
481
482    @VisibleForTesting
483    public TFile.Writer getWriter() {
484      return this.writer;
485    }
486
487    private void writeVersion() throws IOException {
488      DataOutputStream out = this.writer.prepareAppendKey(-1);
489      VERSION_KEY.write(out);
490      out.close();
491      out = this.writer.prepareAppendValue(-1);
492      out.writeInt(VERSION);
493      out.close();
494    }
495
496    public void writeApplicationOwner(String user) throws IOException {
497      DataOutputStream out = this.writer.prepareAppendKey(-1);
498      APPLICATION_OWNER_KEY.write(out);
499      out.close();
500      out = this.writer.prepareAppendValue(-1);
501      out.writeUTF(user);
502      out.close();
503    }
504
505    public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
506        throws IOException {
507      DataOutputStream out = this.writer.prepareAppendKey(-1);
508      APPLICATION_ACL_KEY.write(out);
509      out.close();
510      out = this.writer.prepareAppendValue(-1);
511      for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
512        out.writeUTF(entry.getKey().toString());
513        out.writeUTF(entry.getValue());
514      }
515      out.close();
516    }
517
518    public void append(LogKey logKey, LogValue logValue) throws IOException {
519      Set<File> pendingUploadFiles =
520          logValue.getPendingLogFilesToUploadForThisContainer();
521      if (pendingUploadFiles.size() == 0) {
522        return;
523      }
524      DataOutputStream out = this.writer.prepareAppendKey(-1);
525      logKey.write(out);
526      out.close();
527      out = this.writer.prepareAppendValue(-1);
528      logValue.write(out, pendingUploadFiles);
529      out.close();
530    }
531
532    public void close() {
533      try {
534        this.writer.close();
535      } catch (IOException e) {
536        LOG.warn("Exception closing writer", e);
537      }
538      IOUtils.closeStream(fsDataOStream);
539    }
540  }
541
542  @Public
543  @Evolving
544  public static class LogReader {
545
546    private final FSDataInputStream fsDataIStream;
547    private final TFile.Reader.Scanner scanner;
548    private final TFile.Reader reader;
549
550    public LogReader(Configuration conf, Path remoteAppLogFile)
551        throws IOException {
552      FileContext fileContext =
553          FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
554      this.fsDataIStream = fileContext.open(remoteAppLogFile);
555      reader =
556          new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
557              remoteAppLogFile).getLen(), conf);
558      this.scanner = reader.createScanner();
559    }
560
561    private boolean atBeginning = true;
562
563    /**
564     * Returns the owner of the application.
565     * 
566     * @return the application owner.
567     * @throws IOException
568     */
569    public String getApplicationOwner() throws IOException {
570      TFile.Reader.Scanner ownerScanner = null;
571      try {
572        ownerScanner = reader.createScanner();
573        LogKey key = new LogKey();
574        while (!ownerScanner.atEnd()) {
575          TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
576          key.readFields(entry.getKeyStream());
577          if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
578            DataInputStream valueStream = entry.getValueStream();
579            return valueStream.readUTF();
580          }
581          ownerScanner.advance();
582        }
583        return null;
584      } finally {
585        IOUtils.cleanup(LOG, ownerScanner);
586      }
587    }
588
589    /**
590     * Returns ACLs for the application. An empty map is returned if no ACLs are
591     * found.
592     * 
593     * @return a map of the Application ACLs.
594     * @throws IOException
595     */
596    public Map<ApplicationAccessType, String> getApplicationAcls()
597        throws IOException {
598      // TODO Seek directly to the key once a comparator is specified.
599      TFile.Reader.Scanner aclScanner = null;
600      try {
601        aclScanner = reader.createScanner();
602        LogKey key = new LogKey();
603        Map<ApplicationAccessType, String> acls =
604            new HashMap<ApplicationAccessType, String>();
605        while (!aclScanner.atEnd()) {
606          TFile.Reader.Scanner.Entry entry = aclScanner.entry();
607          key.readFields(entry.getKeyStream());
608          if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
609            DataInputStream valueStream = entry.getValueStream();
610            while (true) {
611              String appAccessOp = null;
612              String aclString = null;
613              try {
614                appAccessOp = valueStream.readUTF();
615              } catch (EOFException e) {
616                // Valid end of stream.
617                break;
618              }
619              try {
620                aclString = valueStream.readUTF();
621              } catch (EOFException e) {
622                throw new YarnRuntimeException("Error reading ACLs", e);
623              }
624              acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
625            }
626          }
627          aclScanner.advance();
628        }
629        return acls;
630      } finally {
631        IOUtils.cleanup(LOG, aclScanner);
632      }
633    }
634
635    /**
636     * Read the next key and return the value-stream.
637     * 
638     * @param key
639     * @return the valueStream if there are more keys or null otherwise.
640     * @throws IOException
641     */
642    public DataInputStream next(LogKey key) throws IOException {
643      if (!this.atBeginning) {
644        this.scanner.advance();
645      } else {
646        this.atBeginning = false;
647      }
648      if (this.scanner.atEnd()) {
649        return null;
650      }
651      TFile.Reader.Scanner.Entry entry = this.scanner.entry();
652      key.readFields(entry.getKeyStream());
653      // Skip META keys
654      if (RESERVED_KEYS.containsKey(key.toString())) {
655        return next(key);
656      }
657      DataInputStream valueStream = entry.getValueStream();
658      return valueStream;
659    }
660
661    /**
662     * Get a ContainerLogsReader to read the logs for
663     * the specified container.
664     *
665     * @param containerId
666     * @return object to read the container's logs or null if the
667     *         logs could not be found
668     * @throws IOException
669     */
670    @Private
671    public ContainerLogsReader getContainerLogsReader(
672        ContainerId containerId) throws IOException {
673      ContainerLogsReader logReader = null;
674
675      final LogKey containerKey = new LogKey(containerId);
676      LogKey key = new LogKey();
677      DataInputStream valueStream = next(key);
678      while (valueStream != null && !key.equals(containerKey)) {
679        valueStream = next(key);
680      }
681
682      if (valueStream != null) {
683        logReader = new ContainerLogsReader(valueStream);
684      }
685
686      return logReader;
687    }
688
689    //TODO  Change Log format and interfaces to be containerId specific.
690    // Avoid returning completeValueStreams.
691//    public List<String> getTypesForContainer(DataInputStream valueStream){}
692//    
693//    /**
694//     * @param valueStream
695//     *          The Log stream for the container.
696//     * @param fileType
697//     *          the log type required.
698//     * @return An InputStreamReader for the required log type or null if the
699//     *         type is not found.
700//     * @throws IOException
701//     */
702//    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
703//        String fileType) throws IOException {
704//      valueStream.reset();
705//      try {
706//        while (true) {
707//          String ft = valueStream.readUTF();
708//          String fileLengthStr = valueStream.readUTF();
709//          long fileLength = Long.parseLong(fileLengthStr);
710//          if (ft.equals(fileType)) {
711//            BoundedInputStream bis =
712//                new BoundedInputStream(valueStream, fileLength);
713//            return new InputStreamReader(bis);
714//          } else {
715//            long totalSkipped = 0;
716//            long currSkipped = 0;
717//            while (currSkipped != -1 && totalSkipped < fileLength) {
718//              currSkipped = valueStream.skip(fileLength - totalSkipped);
719//              totalSkipped += currSkipped;
720//            }
721//            // TODO Verify skip behaviour.
722//            if (currSkipped == -1) {
723//              return null;
724//            }
725//          }
726//        }
727//      } catch (EOFException e) {
728//        return null;
729//      }
730//    }
731
732    /**
733     * Writes all logs for a single container to the provided writer.
734     * @param valueStream
735     * @param writer
736     * @param logUploadedTime
737     * @throws IOException
738     */
739    public static void readAcontainerLogs(DataInputStream valueStream,
740        Writer writer, long logUploadedTime) throws IOException {
741      OutputStream os = null;
742      PrintStream ps = null;
743      try {
744        os = new WriterOutputStream(writer, Charset.forName("UTF-8"));
745        ps = new PrintStream(os);
746        while (true) {
747          try {
748            readContainerLogs(valueStream, ps, logUploadedTime, Long.MAX_VALUE);
749          } catch (EOFException e) {
750            // EndOfFile
751            return;
752          }
753        }
754      } finally {
755        IOUtils.cleanup(LOG, ps);
756        IOUtils.cleanup(LOG, os);
757      }
758    }
759
760    /**
761     * Writes all logs for a single container to the provided writer.
762     * @param valueStream
763     * @param writer
764     * @throws IOException
765     */
766    public static void readAcontainerLogs(DataInputStream valueStream,
767        Writer writer) throws IOException {
768      readAcontainerLogs(valueStream, writer, -1);
769    }
770
771    private static void readContainerLogs(DataInputStream valueStream,
772        PrintStream out, long logUploadedTime, long bytes)
773        throws IOException {
774      byte[] buf = new byte[65535];
775
776      String fileType = valueStream.readUTF();
777      String fileLengthStr = valueStream.readUTF();
778      long fileLength = Long.parseLong(fileLengthStr);
779      out.print("LogType:");
780      out.println(fileType);
781      if (logUploadedTime != -1) {
782        out.print("Log Upload Time:");
783        out.println(Times.format(logUploadedTime));
784      }
785      out.print("LogLength:");
786      out.println(fileLengthStr);
787      out.println("Log Contents:");
788
789      long toSkip = 0;
790      long totalBytesToRead = fileLength;
791      long skipAfterRead = 0;
792      if (bytes < 0) {
793        long absBytes = Math.abs(bytes);
794        if (absBytes < fileLength) {
795          toSkip = fileLength - absBytes;
796          totalBytesToRead = absBytes;
797        }
798        org.apache.hadoop.io.IOUtils.skipFully(
799            valueStream, toSkip);
800      } else {
801        if (bytes < fileLength) {
802          totalBytesToRead = bytes;
803          skipAfterRead = fileLength - bytes;
804        }
805      }
806
807      long curRead = 0;
808      long pendingRead = totalBytesToRead - curRead;
809      int toRead =
810                pendingRead > buf.length ? buf.length : (int) pendingRead;
811      int len = valueStream.read(buf, 0, toRead);
812      while (len != -1 && curRead < totalBytesToRead) {
813        out.write(buf, 0, len);
814        curRead += len;
815
816        pendingRead = totalBytesToRead - curRead;
817        toRead =
818                  pendingRead > buf.length ? buf.length : (int) pendingRead;
819        len = valueStream.read(buf, 0, toRead);
820      }
821      org.apache.hadoop.io.IOUtils.skipFully(
822          valueStream, skipAfterRead);
823      out.println("\nEnd of LogType:" + fileType);
824      out.println("");
825    }
826
827    /**
828     * Keep calling this till you get a {@link EOFException} for getting logs of
829     * all types for a single container.
830     * 
831     * @param valueStream
832     * @param out
833     * @param logUploadedTime
834     * @throws IOException
835     */
836    public static void readAContainerLogsForALogType(
837        DataInputStream valueStream, PrintStream out, long logUploadedTime)
838          throws IOException {
839      readContainerLogs(valueStream, out, logUploadedTime, Long.MAX_VALUE);
840    }
841
842    /**
843     * Keep calling this till you get a {@link EOFException} for getting logs of
844     * all types for a single container for the specific bytes.
845     *
846     * @param valueStream
847     * @param out
848     * @param logUploadedTime
849     * @param bytes
850     * @throws IOException
851     */
852    public static void readAContainerLogsForALogType(
853        DataInputStream valueStream, PrintStream out, long logUploadedTime,
854        long bytes) throws IOException {
855      readContainerLogs(valueStream, out, logUploadedTime, bytes);
856    }
857
858    /**
859     * Keep calling this till you get a {@link EOFException} for getting logs of
860     * all types for a single container.
861     * 
862     * @param valueStream
863     * @param out
864     * @throws IOException
865     */
866    public static void readAContainerLogsForALogType(
867        DataInputStream valueStream, PrintStream out)
868          throws IOException {
869      readAContainerLogsForALogType(valueStream, out, -1);
870    }
871
872    /**
873     * Keep calling this till you get a {@link EOFException} for getting logs of
874     * the specific types for a single container.
875     * @param valueStream
876     * @param out
877     * @param logUploadedTime
878     * @param logType
879     * @throws IOException
880     */
881    public static int readContainerLogsForALogType(
882        DataInputStream valueStream, PrintStream out, long logUploadedTime,
883        List<String> logType) throws IOException {
884      return readContainerLogsForALogType(valueStream, out, logUploadedTime,
885          logType, Long.MAX_VALUE);
886    }
887
888    /**
889     * Keep calling this till you get a {@link EOFException} for getting logs of
890     * the specific types for a single container.
891     * @param valueStream
892     * @param out
893     * @param logUploadedTime
894     * @param logType
895     * @throws IOException
896     */
897    public static int readContainerLogsForALogType(
898        DataInputStream valueStream, PrintStream out, long logUploadedTime,
899        List<String> logType, long bytes) throws IOException {
900      byte[] buf = new byte[65535];
901
902      String fileType = valueStream.readUTF();
903      String fileLengthStr = valueStream.readUTF();
904      long fileLength = Long.parseLong(fileLengthStr);
905      if (logType.contains(fileType)) {
906        out.print("LogType:");
907        out.println(fileType);
908        if (logUploadedTime != -1) {
909          out.print("Log Upload Time:");
910          out.println(Times.format(logUploadedTime));
911        }
912        out.print("LogLength:");
913        out.println(fileLengthStr);
914        out.println("Log Contents:");
915
916        long toSkip = 0;
917        long totalBytesToRead = fileLength;
918        long skipAfterRead = 0;
919        if (bytes < 0) {
920          long absBytes = Math.abs(bytes);
921          if (absBytes < fileLength) {
922            toSkip = fileLength - absBytes;
923            totalBytesToRead = absBytes;
924          }
925          org.apache.hadoop.io.IOUtils.skipFully(
926              valueStream, toSkip);
927        } else {
928          if (bytes < fileLength) {
929            totalBytesToRead = bytes;
930            skipAfterRead = fileLength - bytes;
931          }
932        }
933
934        long curRead = 0;
935        long pendingRead = totalBytesToRead - curRead;
936        int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
937        int len = valueStream.read(buf, 0, toRead);
938        while (len != -1 && curRead < totalBytesToRead) {
939          out.write(buf, 0, len);
940          curRead += len;
941
942          pendingRead = totalBytesToRead - curRead;
943          toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
944          len = valueStream.read(buf, 0, toRead);
945        }
946        org.apache.hadoop.io.IOUtils.skipFully(
947            valueStream, skipAfterRead);
948        out.println("\nEnd of LogType:" + fileType);
949        out.println("");
950        return 0;
951      } else {
952        long totalSkipped = 0;
953        long currSkipped = 0;
954        while (currSkipped != -1 && totalSkipped < fileLength) {
955          currSkipped = valueStream.skip(fileLength - totalSkipped);
956          totalSkipped += currSkipped;
957        }
958        return -1;
959      }
960    }
961
962    @Private
963    public static Pair<String, String> readContainerMetaDataAndSkipData(
964        DataInputStream valueStream, PrintStream out) throws IOException {
965
966      String fileType = valueStream.readUTF();
967      String fileLengthStr = valueStream.readUTF();
968      long fileLength = Long.parseLong(fileLengthStr);
969      Pair<String, String> logMeta = new Pair<String, String>(
970          fileType, fileLengthStr);
971      long totalSkipped = 0;
972      long currSkipped = 0;
973      while (currSkipped != -1 && totalSkipped < fileLength) {
974        currSkipped = valueStream.skip(fileLength - totalSkipped);
975        totalSkipped += currSkipped;
976      }
977      return logMeta;
978    }
979
980    public void close() {
981      IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
982    }
983  }
984
985  @Private
986  public static class ContainerLogsReader {
987    private DataInputStream valueStream;
988    private String currentLogType = null;
989    private long currentLogLength = 0;
990    private BoundedInputStream currentLogData = null;
991    private InputStreamReader currentLogISR;
992
993    public ContainerLogsReader(DataInputStream stream) {
994      valueStream = stream;
995    }
996
997    public String nextLog() throws IOException {
998      if (currentLogData != null && currentLogLength > 0) {
999        // seek to the end of the current log, relying on BoundedInputStream
1000        // to prevent seeking past the end of the current log
1001        do {
1002          if (currentLogData.skip(currentLogLength) < 0) {
1003            break;
1004          }
1005        } while (currentLogData.read() != -1);
1006      }
1007
1008      currentLogType = null;
1009      currentLogLength = 0;
1010      currentLogData = null;
1011      currentLogISR = null;
1012
1013      try {
1014        String logType = valueStream.readUTF();
1015        String logLengthStr = valueStream.readUTF();
1016        currentLogLength = Long.parseLong(logLengthStr);
1017        currentLogData =
1018            new BoundedInputStream(valueStream, currentLogLength);
1019        currentLogData.setPropagateClose(false);
1020        currentLogISR = new InputStreamReader(currentLogData,
1021            Charset.forName("UTF-8"));
1022        currentLogType = logType;
1023      } catch (EOFException e) {
1024      }
1025
1026      return currentLogType;
1027    }
1028
1029    public String getCurrentLogType() {
1030      return currentLogType;
1031    }
1032
1033    public long getCurrentLogLength() {
1034      return currentLogLength;
1035    }
1036
1037    public long skip(long n) throws IOException {
1038      return currentLogData.skip(n);
1039    }
1040
1041    public int read() throws IOException {
1042      return currentLogData.read();
1043    }
1044
1045    public int read(byte[] buf, int off, int len) throws IOException {
1046      return currentLogData.read(buf, off, len);
1047    }
1048
1049    public int read(char[] buf, int off, int len) throws IOException {
1050      return currentLogISR.read(buf, off, len);
1051    }
1052  }
1053}