001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.sink;
020
021import com.google.common.annotations.VisibleForTesting;
022import java.io.Closeable;
023import java.io.IOException;
024import java.io.PrintStream;
025import java.net.InetAddress;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.nio.charset.StandardCharsets;
029import java.util.Calendar;
030import java.util.Date;
031import java.util.TimeZone;
032import java.util.Timer;
033import java.util.TimerTask;
034import java.util.concurrent.ThreadLocalRandom;
035import java.util.concurrent.TimeUnit;
036import java.util.regex.Matcher;
037import java.util.regex.Pattern;
038
039import org.apache.commons.configuration.SubsetConfiguration;
040import org.apache.commons.lang.time.FastDateFormat;
041import org.apache.hadoop.classification.InterfaceAudience;
042import org.apache.hadoop.classification.InterfaceStability;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataOutputStream;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.LocatedFileStatus;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.fs.RemoteIterator;
049import org.apache.hadoop.metrics2.AbstractMetric;
050import org.apache.hadoop.metrics2.MetricsException;
051import org.apache.hadoop.metrics2.MetricsRecord;
052import org.apache.hadoop.metrics2.MetricsSink;
053import org.apache.hadoop.metrics2.MetricsTag;
054import org.apache.hadoop.security.SecurityUtil;
055import org.apache.hadoop.security.UserGroupInformation;
056
057/**
058 * <p>This class is a metrics sink that uses
059 * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs.  Every
060 * roll interval a new directory will be created under the path specified by the
061 * <code>basepath</code> property. All metrics will be logged to a file in the
062 * current interval's directory in a file named &lt;hostname&gt;.log, where
063 * &lt;hostname&gt; is the name of the host on which the metrics logging
064 * process is running. The base path is set by the
065 * <code>&lt;prefix&gt;.sink.&lt;instance&gt;.basepath</code> property.  The
066 * time zone used to create the current interval's directory name is GMT.  If
067 * the <code>basepath</code> property isn't specified, it will default to
068 * &quot;/tmp&quot;, which is the temp directory on whatever default file
069 * system is configured for the cluster.</p>
070 *
071 * <p>The <code>&lt;prefix&gt;.sink.&lt;instance&gt;.ignore-error</code>
072 * property controls whether an exception is thrown when an error is encountered
073 * writing a log file.  The default value is <code>true</code>.  When set to
074 * <code>false</code>, file errors are quietly swallowed.</p>
075 *
076 * <p>The <code>roll-interval</code> property sets the amount of time before
077 * rolling the directory. The default value is 1 hour. The roll interval may
078 * not be less than 1 minute. The property's value should be given as
079 * <i>number unit</i>, where <i>number</i> is an integer value, and
080 * <i>unit</i> is a valid unit.  Valid units are <i>minute</i>, <i>hour</i>,
081 * and <i>day</i>.  The units are case insensitive and may be abbreviated or
082 * plural. If no units are specified, hours are assumed. For example,
083 * &quot;2&quot;, &quot;2h&quot;, &quot;2 hour&quot;, and
084 * &quot;2 hours&quot; are all valid ways to specify two hours.</p>
085 *
086 * <p>The <code>roll-offset-interval-millis</code> property sets the upper
087 * bound on a random time interval (in milliseconds) that is used to delay
088 * before the initial roll.  All subsequent rolls will happen an integer
089 * number of roll intervals after the initial roll, hence retaining the original
090 * offset. The purpose of this property is to insert some variance in the roll
091 * times so that large clusters using this sink on every node don't cause a
092 * performance impact on HDFS by rolling simultaneously.  The default value is
093 * 30000 (30s).  When writing to HDFS, as a rule of thumb, the roll offset in
094 * millis should be no less than the number of sink instances times 5.
095 *
096 * <p>The primary use of this class is for logging to HDFS.  As it uses
097 * {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
098 * however, it can be used to write to the local file system, Amazon S3, or any
099 * other supported file system.  The base path for the sink will determine the
100 * file system used.  An unqualified path will write to the default file system
101 * set by the configuration.</p>
102 *
103 * <p>Not all file systems support the ability to append to files.  In file
104 * systems without the ability to append to files, only one writer can write to
105 * a file at a time.  To allow for concurrent writes from multiple daemons on a
106 * single host, the <code>source</code> property is used to set unique headers
107 * for the log files.  The property should be set to the name of
108 * the source daemon, e.g. <i>namenode</i>.  The value of the
109 * <code>source</code> property should typically be the same as the property's
110 * prefix.  If this property is not set, the source is taken to be
111 * <i>unknown</i>.</p>
112 *
113 * <p>Instead of appending to an existing file, by default the sink
114 * will create a new file with a suffix of &quot;.&lt;n&gt;&quet;, where
115 * <i>n</i> is the next lowest integer that isn't already used in a file name,
116 * similar to the Hadoop daemon logs.  NOTE: the file with the <b>highest</b>
117 * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.</p>
118 *
119 * <p>For file systems that allow append, the sink supports appending to the
120 * existing file instead. If the <code>allow-append</code> property is set to
121 * true, the sink will instead append to the existing file on file systems that
122 * support appends. By default, the <code>allow-append</code> property is
123 * false.</p>
124 *
125 * <p>Note that when writing to HDFS with <code>allow-append</code> set to true,
126 * there is a minimum acceptable number of data nodes.  If the number of data
127 * nodes drops below that minimum, the append will succeed, but reading the
128 * data will fail with an IOException in the DataStreamer class.  The minimum
129 * number of data nodes required for a successful append is generally 2 or
130 * 3.</p>
131 *
132 * <p>Note also that when writing to HDFS, the file size information is not
133 * updated until the file is closed (at the end of the interval) even though
134 * the data is being written successfully. This is a known HDFS limitation that
135 * exists because of the performance cost of updating the metadata.  See
136 * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
137 *
138 * <p>When using this sink in a secure (Kerberos) environment, two additional
139 * properties must be set: <code>keytab-key</code> and
140 * <code>principal-key</code>. <code>keytab-key</code> should contain the key by
141 * which the keytab file can be found in the configuration, for example,
142 * <code>yarn.nodemanager.keytab</code>. <code>principal-key</code> should
143 * contain the key by which the principal can be found in the configuration,
144 * for example, <code>yarn.nodemanager.principal</code>.
145 */
146@InterfaceAudience.Public
147@InterfaceStability.Evolving
148public class RollingFileSystemSink implements MetricsSink, Closeable {
149  private static final String BASEPATH_KEY = "basepath";
150  private static final String SOURCE_KEY = "source";
151  private static final String IGNORE_ERROR_KEY = "ignore-error";
152  private static final boolean DEFAULT_IGNORE_ERROR = false;
153  private static final String ALLOW_APPEND_KEY = "allow-append";
154  private static final boolean DEFAULT_ALLOW_APPEND = false;
155  private static final String KEYTAB_PROPERTY_KEY = "keytab-key";
156  private static final String USERNAME_PROPERTY_KEY = "principal-key";
157  private static final String ROLL_INTERVAL_KEY = "roll-interval";
158  private static final String DEFAULT_ROLL_INTERVAL = "1h";
159  private static final String ROLL_OFFSET_INTERVAL_MILLIS_KEY =
160      "roll-offset-interval-millis";
161  private static final int DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS = 30000;
162  private static final String SOURCE_DEFAULT = "unknown";
163  private static final String BASEPATH_DEFAULT = "/tmp";
164  private static final FastDateFormat DATE_FORMAT =
165      FastDateFormat.getInstance("yyyyMMddHHmm", TimeZone.getTimeZone("GMT"));
166  private final Object lock = new Object();
167  private boolean initialized = false;
168  private SubsetConfiguration properties;
169  private Configuration conf;
170  @VisibleForTesting
171  protected String source;
172  @VisibleForTesting
173  protected boolean ignoreError;
174  @VisibleForTesting
175  protected boolean allowAppend;
176  @VisibleForTesting
177  protected Path basePath;
178  private FileSystem fileSystem;
179  // The current directory path into which we're writing files
180  private Path currentDirPath;
181  // The path to the current file into which we're writing data
182  private Path currentFilePath;
183  // The stream to which we're currently writing.
184  private PrintStream currentOutStream;
185  // We keep this only to be able to call hsynch() on it.
186  private FSDataOutputStream currentFSOutStream;
187  private Timer flushTimer;
188  // The amount of time between rolls
189  @VisibleForTesting
190  protected long rollIntervalMillis;
191  // The maximum amount of random time to add to the initial roll
192  @VisibleForTesting
193  protected long rollOffsetIntervalMillis;
194  // The time for the nextFlush
195  @VisibleForTesting
196  protected Calendar nextFlush = null;
197  // This flag when true causes a metrics write to schedule a flush thread to
198  // run immediately, but only if a flush thread is already scheduled. (It's a
199  // timing thing.  If the first write forces the flush, it will strand the
200  // second write.)
201  @VisibleForTesting
202  protected static boolean forceFlush = false;
203  // This flag is used by the flusher thread to indicate that it has run. Used
204  // only for testing purposes.
205  @VisibleForTesting
206  protected static volatile boolean hasFlushed = false;
207  // Use this configuration instead of loading a new one.
208  @VisibleForTesting
209  protected static Configuration suppliedConf = null;
210  // Use this file system instead of getting a new one.
211  @VisibleForTesting
212  protected static FileSystem suppliedFilesystem = null;
213
214  /**
215   * Create an empty instance.  Required for reflection.
216   */
217  public RollingFileSystemSink() {
218  }
219
220  /**
221   * Create an instance for testing.
222   *
223   * @param flushIntervalMillis the roll interval in millis
224   * @param flushOffsetIntervalMillis the roll offset interval in millis
225   */
226  @VisibleForTesting
227  protected RollingFileSystemSink(long flushIntervalMillis,
228      long flushOffsetIntervalMillis) {
229    this.rollIntervalMillis = flushIntervalMillis;
230    this.rollOffsetIntervalMillis = flushOffsetIntervalMillis;
231  }
232
233  @Override
234  public void init(SubsetConfiguration metrics2Properties) {
235    properties = metrics2Properties;
236    basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
237    source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT);
238    ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, DEFAULT_IGNORE_ERROR);
239    allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, DEFAULT_ALLOW_APPEND);
240    rollOffsetIntervalMillis =
241        getNonNegative(ROLL_OFFSET_INTERVAL_MILLIS_KEY,
242          DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS);
243    rollIntervalMillis = getRollInterval();
244
245    conf = loadConf();
246    UserGroupInformation.setConfiguration(conf);
247
248    // Don't do secure setup if it's not needed.
249    if (UserGroupInformation.isSecurityEnabled()) {
250      // Validate config so that we don't get an NPE
251      checkIfPropertyExists(KEYTAB_PROPERTY_KEY);
252      checkIfPropertyExists(USERNAME_PROPERTY_KEY);
253
254
255      try {
256        // Login as whoever we're supposed to be and let the hostname be pulled
257        // from localhost. If security isn't enabled, this does nothing.
258        SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY),
259            properties.getString(USERNAME_PROPERTY_KEY));
260      } catch (IOException ex) {
261        throw new MetricsException("Error logging in securely: ["
262            + ex.toString() + "]", ex);
263      }
264    }
265  }
266
267  /**
268   * Initialize the connection to HDFS and create the base directory. Also
269   * launch the flush thread.
270   */
271  private boolean initFs() {
272    boolean success = false;
273
274    fileSystem = getFileSystem();
275
276    // This step isn't strictly necessary, but it makes debugging issues much
277    // easier. We try to create the base directory eagerly and fail with
278    // copious debug info if it fails.
279    try {
280      fileSystem.mkdirs(basePath);
281      success = true;
282    } catch (Exception ex) {
283      if (!ignoreError) {
284        throw new MetricsException("Failed to create " + basePath + "["
285            + SOURCE_KEY + "=" + source + ", "
286            + ALLOW_APPEND_KEY + "=" + allowAppend + ", "
287            + stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", "
288            + stringifySecurityProperty(USERNAME_PROPERTY_KEY)
289            + "] -- " + ex.toString(), ex);
290      }
291    }
292
293    if (success) {
294      // If we're permitted to append, check if we actually can
295      if (allowAppend) {
296        allowAppend = checkAppend(fileSystem);
297      }
298
299      flushTimer = new Timer("RollingFileSystemSink Flusher", true);
300      setInitialFlushTime(new Date());
301    }
302
303    return success;
304  }
305
306  /**
307   * Turn a security property into a nicely formatted set of <i>name=value</i>
308   * strings, allowing for either the property or the configuration not to be
309   * set.
310   *
311   * @param property the property to stringify
312   * @return the stringified property
313   */
314  private String stringifySecurityProperty(String property) {
315    String securityProperty;
316
317    if (properties.containsKey(property)) {
318      String propertyValue = properties.getString(property);
319      String confValue = conf.get(properties.getString(property));
320
321      if (confValue != null) {
322        securityProperty = property + "=" + propertyValue
323            + ", " + properties.getString(property) + "=" + confValue;
324      } else {
325        securityProperty = property + "=" + propertyValue
326            + ", " + properties.getString(property) + "=<NOT SET>";
327      }
328    } else {
329      securityProperty = property + "=<NOT SET>";
330    }
331
332    return securityProperty;
333  }
334
335  /**
336   * Extract the roll interval from the configuration and return it in
337   * milliseconds.
338   *
339   * @return the roll interval in millis
340   */
341  @VisibleForTesting
342  protected long getRollInterval() {
343    String rollInterval =
344        properties.getString(ROLL_INTERVAL_KEY, DEFAULT_ROLL_INTERVAL);
345    Pattern pattern = Pattern.compile("^\\s*(\\d+)\\s*([A-Za-z]*)\\s*$");
346    Matcher match = pattern.matcher(rollInterval);
347    long millis;
348
349    if (match.matches()) {
350      String flushUnit = match.group(2);
351      int rollIntervalInt;
352
353      try {
354        rollIntervalInt = Integer.parseInt(match.group(1));
355      } catch (NumberFormatException ex) {
356        throw new MetricsException("Unrecognized flush interval: "
357            + rollInterval + ". Must be a number followed by an optional "
358            + "unit. The unit must be one of: minute, hour, day", ex);
359      }
360
361      if ("".equals(flushUnit)) {
362        millis = TimeUnit.HOURS.toMillis(rollIntervalInt);
363      } else {
364        switch (flushUnit.toLowerCase()) {
365        case "m":
366        case "min":
367        case "minute":
368        case "minutes":
369          millis = TimeUnit.MINUTES.toMillis(rollIntervalInt);
370          break;
371        case "h":
372        case "hr":
373        case "hour":
374        case "hours":
375          millis = TimeUnit.HOURS.toMillis(rollIntervalInt);
376          break;
377        case "d":
378        case "day":
379        case "days":
380          millis = TimeUnit.DAYS.toMillis(rollIntervalInt);
381          break;
382        default:
383          throw new MetricsException("Unrecognized unit for flush interval: "
384              + flushUnit + ". Must be one of: minute, hour, day");
385        }
386      }
387    } else {
388      throw new MetricsException("Unrecognized flush interval: "
389          + rollInterval + ". Must be a number followed by an optional unit."
390          + " The unit must be one of: minute, hour, day");
391    }
392
393    if (millis < 60000) {
394      throw new MetricsException("The flush interval property must be "
395          + "at least 1 minute. Value was " + rollInterval);
396    }
397
398    return millis;
399  }
400
401  /**
402   * Return the property value if it's non-negative and throw an exception if
403   * it's not.
404   *
405   * @param key the property key
406   * @param defaultValue the default value
407   */
408  private long getNonNegative(String key, int defaultValue) {
409    int flushOffsetIntervalMillis = properties.getInt(key, defaultValue);
410
411    if (flushOffsetIntervalMillis < 0) {
412      throw new MetricsException("The " + key + " property must be "
413          + "non-negative. Value was " + flushOffsetIntervalMillis);
414    }
415
416    return flushOffsetIntervalMillis;
417  }
418
419  /**
420   * Throw a {@link MetricsException} if the given property is not set.
421   *
422   * @param key the key to validate
423   */
424  private void checkIfPropertyExists(String key) {
425    if (!properties.containsKey(key)) {
426      throw new MetricsException("Metrics2 configuration is missing " + key
427          + " property");
428    }
429  }
430
431  /**
432   * Return the supplied configuration for testing or otherwise load a new
433   * configuration.
434   *
435   * @return the configuration to use
436   */
437  private Configuration loadConf() {
438    Configuration c;
439
440    if (suppliedConf != null) {
441      c = suppliedConf;
442    } else {
443      // The config we're handed in init() isn't the one we want here, so we
444      // create a new one to pick up the full settings.
445      c = new Configuration();
446    }
447
448    return c;
449  }
450
451  /**
452   * Return the supplied file system for testing or otherwise get a new file
453   * system.
454   *
455   * @return the file system to use
456   * @throws MetricsException thrown if the file system could not be retrieved
457   */
458  private FileSystem getFileSystem() throws MetricsException {
459    FileSystem fs = null;
460
461    if (suppliedFilesystem != null) {
462      fs = suppliedFilesystem;
463    } else {
464      try {
465        fs = FileSystem.get(new URI(basePath.toString()), conf);
466      } catch (URISyntaxException ex) {
467        throw new MetricsException("The supplied filesystem base path URI"
468            + " is not a valid URI: " + basePath.toString(), ex);
469      } catch (IOException ex) {
470        throw new MetricsException("Error connecting to file system: "
471            + basePath + " [" + ex.toString() + "]", ex);
472      }
473    }
474
475    return fs;
476  }
477
478  /**
479   * Test whether the file system supports append and return the answer.
480   *
481   * @param fs the target file system
482   */
483  private boolean checkAppend(FileSystem fs) {
484    boolean canAppend = true;
485
486    try {
487      fs.append(basePath);
488    } catch (UnsupportedOperationException ex) {
489      canAppend = false;
490    } catch (IOException ex) {
491      // Ignore. The operation is supported.
492    }
493
494    return canAppend;
495  }
496
497  /**
498   * Check the current directory against the time stamp.  If they're not
499   * the same, create a new directory and a new log file in that directory.
500   *
501   * @throws MetricsException thrown if an error occurs while creating the
502   * new directory or new log file
503   */
504  private void rollLogDirIfNeeded() throws MetricsException {
505    // Because we're working relative to the clock, we use a Date instead
506    // of Time.monotonicNow().
507    Date now = new Date();
508
509    // We check whether currentOutStream is null instead of currentDirPath,
510    // because if currentDirPath is null, then currentOutStream is null, but
511    // currentOutStream can be null for other reasons.  Same for nextFlush.
512    if ((currentOutStream == null) || now.after(nextFlush.getTime())) {
513      // If we're not yet connected to HDFS, create the connection
514      if (!initialized) {
515        initialized = initFs();
516      }
517
518      if (initialized) {
519        // Close the stream. This step could have been handled already by the
520        // flusher thread, but if it has, the PrintStream will just swallow the
521        // exception, which is fine.
522        if (currentOutStream != null) {
523          currentOutStream.close();
524        }
525
526        currentDirPath = findCurrentDirectory(now);
527
528        try {
529          rollLogDir();
530        } catch (IOException ex) {
531          throwMetricsException("Failed to create new log file", ex);
532        }
533
534        // Update the time of the next flush
535        updateFlushTime(now);
536        // Schedule the next flush at that time
537        scheduleFlush(nextFlush.getTime());
538      }
539    } else if (forceFlush) {
540      scheduleFlush(new Date());
541    }
542  }
543
544  /**
545   * Use the given time to determine the current directory. The current
546   * directory will be based on the {@link #rollIntervalMinutes}.
547   *
548   * @param now the current time
549   * @return the current directory
550   */
551  private Path findCurrentDirectory(Date now) {
552    long offset = ((now.getTime() - nextFlush.getTimeInMillis())
553        / rollIntervalMillis) * rollIntervalMillis;
554    String currentDir =
555        DATE_FORMAT.format(new Date(nextFlush.getTimeInMillis() + offset));
556
557    return new Path(basePath, currentDir);
558  }
559
560  /**
561   * Schedule the current interval's directory to be flushed. If this ends up
562   * running after the top of the next interval, it will execute immediately.
563   *
564   * @param when the time the thread should run
565   */
566  private void scheduleFlush(Date when) {
567    // Store the current currentDirPath to close later
568    final PrintStream toClose = currentOutStream;
569
570    flushTimer.schedule(new TimerTask() {
571      @Override
572      public void run() {
573        synchronized (lock) {
574          // This close may have already been done by a putMetrics() call. If it
575          // has, the PrintStream will swallow the exception, which is fine.
576          toClose.close();
577        }
578
579        hasFlushed = true;
580      }
581    }, when);
582  }
583
584  /**
585   * Update the {@link #nextFlush} variable to the next flush time. Add
586   * an integer number of flush intervals, preserving the initial random offset.
587   *
588   * @param now the current time
589   */
590  @VisibleForTesting
591  protected void updateFlushTime(Date now) {
592    // In non-initial rounds, add an integer number of intervals to the last
593    // flush until a time in the future is achieved, thus preserving the
594    // original random offset.
595    int millis =
596        (int) (((now.getTime() - nextFlush.getTimeInMillis())
597        / rollIntervalMillis + 1) * rollIntervalMillis);
598
599    nextFlush.add(Calendar.MILLISECOND, millis);
600  }
601
602  /**
603   * Set the {@link #nextFlush} variable to the initial flush time. The initial
604   * flush will be an integer number of flush intervals past the beginning of
605   * the current hour and will have a random offset added, up to
606   * {@link #rollOffsetIntervalMillis}. The initial flush will be a time in
607   * past that can be used from which to calculate future flush times.
608   *
609   * @param now the current time
610   */
611  @VisibleForTesting
612  protected void setInitialFlushTime(Date now) {
613    // Start with the beginning of the current hour
614    nextFlush = Calendar.getInstance();
615    nextFlush.setTime(now);
616    nextFlush.set(Calendar.MILLISECOND, 0);
617    nextFlush.set(Calendar.SECOND, 0);
618    nextFlush.set(Calendar.MINUTE, 0);
619
620    // In the first round, calculate the first flush as the largest number of
621    // intervals from the beginning of the current hour that's not in the
622    // future by:
623    // 1. Subtract the beginning of the hour from the current time
624    // 2. Divide by the roll interval and round down to get the number of whole
625    //    intervals that have passed since the beginning of the hour
626    // 3. Multiply by the roll interval to get the number of millis between
627    //    the beginning of the current hour and the beginning of the current
628    //    interval.
629    int millis = (int) (((now.getTime() - nextFlush.getTimeInMillis())
630        / rollIntervalMillis) * rollIntervalMillis);
631
632    // Then add some noise to help prevent all the nodes from
633    // closing their files at the same time.
634    if (rollOffsetIntervalMillis > 0) {
635      millis += ThreadLocalRandom.current().nextLong(rollOffsetIntervalMillis);
636
637      // If the added time puts us into the future, step back one roll interval
638      // because the code to increment nextFlush to the next flush expects that
639      // nextFlush is the next flush from the previous interval.  There wasn't
640      // a previous interval, so we just fake it with the time in the past that
641      // would have been the previous interval if there had been one.
642      //
643      // It's OK if millis comes out negative.
644      while (nextFlush.getTimeInMillis() + millis > now.getTime()) {
645        millis -= rollIntervalMillis;
646      }
647    }
648
649    // Adjust the next flush time by millis to get the time of our ficticious
650    // previous next flush
651    nextFlush.add(Calendar.MILLISECOND, millis);
652  }
653
654  /**
655   * Create a new directory based on the current interval and a new log file in
656   * that directory.
657   *
658   * @throws IOException thrown if an error occurs while creating the
659   * new directory or new log file
660   */
661  private void rollLogDir() throws IOException {
662    String fileName =
663        source + "-" + InetAddress.getLocalHost().getHostName() + ".log";
664
665    Path targetFile = new Path(currentDirPath, fileName);
666    fileSystem.mkdirs(currentDirPath);
667
668    if (allowAppend) {
669      createOrAppendLogFile(targetFile);
670    } else {
671      createLogFile(targetFile);
672    }
673  }
674
675  /**
676   * Create a new log file and return the {@link FSDataOutputStream}. If a
677   * file with the specified path already exists, add a suffix, starting with 1
678   * and try again. Keep incrementing the suffix until a nonexistent target
679   * path is found.
680   *
681   * Once the file is open, update {@link #currentFSOutStream},
682   * {@link #currentOutStream}, and {@#link #currentFilePath} are set
683   * appropriately.
684   *
685   * @param initial the target path
686   * @throws IOException thrown if the call to see if the exists fails
687   */
688  private void createLogFile(Path initial) throws IOException {
689    Path currentAttempt = initial;
690    // Start at 0 so that if the base filname exists, we start with the suffix
691    // ".1".
692    int id = 0;
693
694    while (true) {
695      // First try blindly creating the file. If we fail, it either means
696      // the file exists, or the operation actually failed.  We do it this way
697      // because if we check whether the file exists, it might still be created
698      // by the time we try to create it. Creating first works like a
699      // test-and-set.
700      try {
701        currentFSOutStream = fileSystem.create(currentAttempt, false);
702        currentOutStream = new PrintStream(currentFSOutStream, true,
703            StandardCharsets.UTF_8.name());
704        currentFilePath = currentAttempt;
705        break;
706      } catch (IOException ex) {
707        // Now we can check to see if the file exists to know why we failed
708        if (fileSystem.exists(currentAttempt)) {
709          id = getNextIdToTry(initial, id);
710          currentAttempt = new Path(initial.toString() + "." + id);
711        } else {
712          throw ex;
713        }
714      }
715    }
716  }
717
718  /**
719   * Return the next ID suffix to use when creating the log file. This method
720   * will look at the files in the directory, find the one with the highest
721   * ID suffix, and 1 to that suffix, and return it. This approach saves a full
722   * linear probe, which matters in the case where there are a large number of
723   * log files.
724   *
725   * @param initial the base file path
726   * @param lastId the last ID value that was used
727   * @return the next ID to try
728   * @throws IOException thrown if there's an issue querying the files in the
729   * directory
730   */
731  private int getNextIdToTry(Path initial, int lastId)
732      throws IOException {
733    RemoteIterator<LocatedFileStatus> files =
734        fileSystem.listFiles(currentDirPath, true);
735    String base = initial.toString();
736    int id = lastId;
737
738    while (files.hasNext()) {
739      String file = files.next().getPath().getName();
740
741      if (file.startsWith(base)) {
742        int fileId = extractId(file);
743
744        if (fileId > id) {
745          id = fileId;
746        }
747      }
748    }
749
750    // Return either 1 more than the highest we found or 1 more than the last
751    // ID used (if no ID was found).
752    return id + 1;
753  }
754
755  /**
756   * Extract the ID from the suffix of the given file name.
757   *
758   * @param file the file name
759   * @return the ID or -1 if no ID could be extracted
760   */
761  private int extractId(String file) {
762    int index = file.lastIndexOf(".");
763    int id = -1;
764
765    // A hostname has to have at least 1 character
766    if (index > 0) {
767      try {
768        id = Integer.parseInt(file.substring(index + 1));
769      } catch (NumberFormatException ex) {
770        // This can happen if there's no suffix, but there is a dot in the
771        // hostname.  Just ignore it.
772      }
773    }
774
775    return id;
776  }
777
778  /**
779   * Create a new log file and return the {@link FSDataOutputStream}. If a
780   * file with the specified path already exists, open the file for append
781   * instead.
782   *
783   * Once the file is open, update {@link #currentFSOutStream},
784   * {@link #currentOutStream}, and {@#link #currentFilePath}.
785   *
786   * @param initial the target path
787   * @throws IOException thrown if the call to see the append operation fails.
788   */
789  private void createOrAppendLogFile(Path targetFile) throws IOException {
790    // First try blindly creating the file. If we fail, it either means
791    // the file exists, or the operation actually failed.  We do it this way
792    // because if we check whether the file exists, it might still be created
793    // by the time we try to create it. Creating first works like a
794    // test-and-set.
795    try {
796      currentFSOutStream = fileSystem.create(targetFile, false);
797      currentOutStream = new PrintStream(currentFSOutStream, true,
798          StandardCharsets.UTF_8.name());
799    } catch (IOException ex) {
800      // Try appending instead.  If we fail, if means the file doesn't
801      // actually exist yet or the operation actually failed.
802      try {
803        currentFSOutStream = fileSystem.append(targetFile);
804        currentOutStream = new PrintStream(currentFSOutStream, true,
805            StandardCharsets.UTF_8.name());
806      } catch (IOException ex2) {
807        // If the original create failed for a legit but transitory
808        // reason, the append will fail because the file now doesn't exist,
809        // resulting in a confusing stack trace.  To avoid that, we set
810        // the cause of the second exception to be the first exception.
811        // It's still a tiny bit confusing, but it's enough
812        // information that someone should be able to figure it out.
813        ex2.initCause(ex);
814
815        throw ex2;
816      }
817    }
818
819    currentFilePath = targetFile;
820  }
821
822  @Override
823  public void putMetrics(MetricsRecord record) {
824    synchronized (lock) {
825      rollLogDirIfNeeded();
826
827      if (currentOutStream != null) {
828        currentOutStream.printf("%d %s.%s", record.timestamp(),
829            record.context(), record.name());
830
831        String separator = ": ";
832
833        for (MetricsTag tag : record.tags()) {
834          currentOutStream.printf("%s%s=%s", separator, tag.name(),
835              tag.value());
836          separator = ", ";
837        }
838
839        for (AbstractMetric metric : record.metrics()) {
840          currentOutStream.printf("%s%s=%s", separator, metric.name(),
841              metric.value());
842        }
843
844        currentOutStream.println();
845
846        // If we don't hflush(), the data may not be written until the file is
847        // closed. The file won't be closed until the end of the interval *AND*
848        // another record is received. Calling hflush() makes sure that the data
849        // is complete at the end of the interval.
850        try {
851          currentFSOutStream.hflush();
852        } catch (IOException ex) {
853          throwMetricsException("Failed flushing the stream", ex);
854        }
855
856        checkForErrors("Unable to write to log file");
857      } else if (!ignoreError) {
858        throwMetricsException("Unable to write to log file");
859      }
860    }
861  }
862
863  @Override
864  public void flush() {
865    synchronized (lock) {
866      // currentOutStream is null if currentFSOutStream is null
867      if (currentFSOutStream != null) {
868        try {
869          currentFSOutStream.hflush();
870        } catch (IOException ex) {
871          throwMetricsException("Unable to flush log file", ex);
872        }
873      }
874    }
875  }
876
877  @Override
878  public void close() {
879    synchronized (lock) {
880      if (currentOutStream != null) {
881        currentOutStream.close();
882
883        try {
884          checkForErrors("Unable to close log file");
885        } finally {
886          // Null out the streams just in case someone tries to reuse us.
887          currentOutStream = null;
888          currentFSOutStream = null;
889        }
890      }
891    }
892  }
893
894  /**
895   * If the sink isn't set to ignore errors, throw a {@link MetricsException}
896   * if the stream encountered an exception.  The message parameter will be used
897   * as the new exception's message with the current file name
898   * ({@link #currentFilePath}) appended to it.
899   *
900   * @param message the exception message. The message will have a colon and
901   * the current file name ({@link #currentFilePath}) appended to it.
902   * @throws MetricsException thrown if there was an error and the sink isn't
903   * ignoring errors
904   */
905  private void checkForErrors(String message)
906      throws MetricsException {
907    if (!ignoreError && currentOutStream.checkError()) {
908      throw new MetricsException(message + ": " + currentFilePath);
909    }
910  }
911
912  /**
913   * If the sink isn't set to ignore errors, wrap the Throwable in a
914   * {@link MetricsException} and throw it.  The message parameter will be used
915   * as the new exception's message with the current file name
916   * ({@link #currentFilePath}) and the Throwable's string representation
917   * appended to it.
918   *
919   * @param message the exception message. The message will have a colon, the
920   * current file name ({@link #currentFilePath}), and the Throwable's string
921   * representation (wrapped in square brackets) appended to it.
922   * @param t the Throwable to wrap
923   */
924  private void throwMetricsException(String message, Throwable t) {
925    if (!ignoreError) {
926      throw new MetricsException(message + ": " + currentFilePath + " ["
927          + t.toString() + "]", t);
928    }
929  }
930
931  /**
932   * If the sink isn't set to ignore errors, throw a new
933   * {@link MetricsException}.  The message parameter will be used  as the
934   * new exception's message with the current file name
935   * ({@link #currentFilePath}) appended to it.
936   *
937   * @param message the exception message. The message will have a colon and
938   * the current file name ({@link #currentFilePath}) appended to it.
939   */
940  private void throwMetricsException(String message) {
941    if (!ignoreError) {
942      throw new MetricsException(message + ": " + currentFilePath);
943    }
944  }
945}