001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.lib;
020
021import static org.apache.hadoop.metrics2.lib.Interns.info;
022
023import java.util.Map;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.ScheduledFuture;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.commons.lang.StringUtils;
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.metrics2.MetricsInfo;
033import org.apache.hadoop.metrics2.MetricsRecordBuilder;
034import org.apache.hadoop.metrics2.util.Quantile;
035import org.apache.hadoop.metrics2.util.QuantileEstimator;
036import org.apache.hadoop.metrics2.util.SampleQuantiles;
037
038import com.google.common.annotations.VisibleForTesting;
039import com.google.common.util.concurrent.ThreadFactoryBuilder;
040
041/**
042 * Watches a stream of long values, maintaining online estimates of specific
043 * quantiles with provably low error bounds. This is particularly useful for
044 * accurate high-percentile (e.g. 95th, 99th) latency metrics.
045 */
046@InterfaceAudience.Public
047@InterfaceStability.Evolving
048public class MutableQuantiles extends MutableMetric {
049
050  @VisibleForTesting
051  public static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
052      new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
053      new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
054
055  private final MetricsInfo numInfo;
056  private final MetricsInfo[] quantileInfos;
057  private final int interval;
058
059  private QuantileEstimator estimator;
060  private long previousCount = 0;
061  private ScheduledFuture<?> scheduledTask = null;
062
063  @VisibleForTesting
064  protected Map<Quantile, Long> previousSnapshot = null;
065
066  private static final ScheduledExecutorService scheduler = Executors
067      .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
068          .setNameFormat("MutableQuantiles-%d").build());
069
070  /**
071   * Instantiates a new {@link MutableQuantiles} for a metric that rolls itself
072   * over on the specified time interval.
073   * 
074   * @param name
075   *          of the metric
076   * @param description
077   *          long-form textual description of the metric
078   * @param sampleName
079   *          type of items in the stream (e.g., "Ops")
080   * @param valueName
081   *          type of the values
082   * @param interval
083   *          rollover interval (in seconds) of the estimator
084   */
085  public MutableQuantiles(String name, String description, String sampleName,
086      String valueName, int interval) {
087    String ucName = StringUtils.capitalize(name);
088    String usName = StringUtils.capitalize(sampleName);
089    String uvName = StringUtils.capitalize(valueName);
090    String desc = StringUtils.uncapitalize(description);
091    String lsName = StringUtils.uncapitalize(sampleName);
092    String lvName = StringUtils.uncapitalize(valueName);
093
094    numInfo = info(ucName + "Num" + usName, String.format(
095        "Number of %s for %s with %ds interval", lsName, desc, interval));
096    // Construct the MetricsInfos for the quantiles, converting to percentiles
097    quantileInfos = new MetricsInfo[quantiles.length];
098    String nameTemplate = ucName + "%dthPercentile" + uvName;
099    String descTemplate = "%d percentile " + lvName + " with " + interval
100        + " second interval for " + desc;
101    for (int i = 0; i < quantiles.length; i++) {
102      int percentile = (int) (100 * quantiles[i].quantile);
103      quantileInfos[i] = info(String.format(nameTemplate, percentile),
104          String.format(descTemplate, percentile));
105    }
106
107    estimator = new SampleQuantiles(quantiles);
108
109    this.interval = interval;
110    scheduledTask = scheduler.scheduleAtFixedRate(new RolloverSample(this),
111        interval, interval, TimeUnit.SECONDS);
112  }
113
114  @Override
115  public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
116    if (all || changed()) {
117      builder.addGauge(numInfo, previousCount);
118      for (int i = 0; i < quantiles.length; i++) {
119        long newValue = 0;
120        // If snapshot is null, we failed to update since the window was empty
121        if (previousSnapshot != null) {
122          newValue = previousSnapshot.get(quantiles[i]);
123        }
124        builder.addGauge(quantileInfos[i], newValue);
125      }
126      if (changed()) {
127        clearChanged();
128      }
129    }
130  }
131
132  public synchronized void add(long value) {
133    estimator.insert(value);
134  }
135
136  public int getInterval() {
137    return interval;
138  }
139
140  public void stop() {
141    if (scheduledTask != null) {
142      scheduledTask.cancel(false);
143    }
144    scheduledTask = null;
145  }
146
147  public synchronized void setEstimator(QuantileEstimator quantileEstimator) {
148    this.estimator = quantileEstimator;
149  }
150
151  /**
152   * Runnable used to periodically roll over the internal
153   * {@link SampleQuantiles} every interval.
154   */
155  private static class RolloverSample implements Runnable {
156
157    MutableQuantiles parent;
158
159    public RolloverSample(MutableQuantiles parent) {
160      this.parent = parent;
161    }
162
163    @Override
164    public void run() {
165      synchronized (parent) {
166        parent.previousCount = parent.estimator.getCount();
167        parent.previousSnapshot = parent.estimator.snapshot();
168        parent.estimator.clear();
169      }
170      parent.setChanged();
171    }
172
173  }
174}