001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * <p/> 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * <p/> 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.sink; 020 021import com.google.common.base.Strings; 022import org.apache.kafka.clients.producer.Producer; 023import org.apache.kafka.clients.producer.KafkaProducer; 024import org.apache.commons.configuration.SubsetConfiguration; 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.metrics2.AbstractMetric; 028import org.apache.hadoop.metrics2.MetricsException; 029import org.apache.hadoop.metrics2.MetricsRecord; 030import org.apache.hadoop.metrics2.MetricsSink; 031import org.apache.hadoop.metrics2.MetricsTag; 032import org.apache.kafka.clients.producer.ProducerRecord; 033import org.apache.kafka.clients.producer.RecordMetadata; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import java.io.Closeable; 038import java.io.IOException; 039import java.net.InetAddress; 040import java.nio.charset.Charset; 041import java.time.Instant; 042import java.time.LocalDateTime; 043import java.time.ZoneId; 044import java.time.format.DateTimeFormatter; 045import java.util.Properties; 046import java.util.concurrent.ExecutionException; 047import java.util.concurrent.Future; 048 049/** 050 * A metrics sink that writes to a Kafka broker. This requires you to configure 051 * a broker_list and a topic in the metrics2 configuration file. The broker_list 052 * must contain a comma-separated list of kafka broker host and ports. The topic 053 * will contain only one topic. 054 */ 055@InterfaceAudience.Public 056@InterfaceStability.Evolving 057public class KafkaSink implements MetricsSink, Closeable { 058 private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); 059 public static final String BROKER_LIST = "broker_list"; 060 public static final String TOPIC = "topic"; 061 062 private String hostname = null; 063 private String brokerList = null; 064 private String topic = null; 065 private Producer<Integer, byte[]> producer = null; 066 067 private final DateTimeFormatter dateFormat = 068 DateTimeFormatter.ofPattern("yyyy-MM-dd"); 069 private final DateTimeFormatter timeFormat = 070 DateTimeFormatter.ofPattern("HH:mm:ss"); 071 private final ZoneId zoneId = ZoneId.systemDefault(); 072 073 public void setProducer(Producer<Integer, byte[]> p) { 074 this.producer = p; 075 } 076 077 @Override 078 public void init(SubsetConfiguration conf) { 079 // Get Kafka broker configuration. 080 Properties props = new Properties(); 081 brokerList = conf.getString(BROKER_LIST); 082 if (LOG.isDebugEnabled()) { 083 LOG.debug("Broker list " + brokerList); 084 } 085 props.put("bootstrap.servers", brokerList); 086 if (LOG.isDebugEnabled()) { 087 LOG.debug("Kafka brokers: " + brokerList); 088 } 089 090 // Get Kafka topic configuration. 091 topic = conf.getString(TOPIC); 092 if (LOG.isDebugEnabled()) { 093 LOG.debug("Kafka topic " + topic); 094 } 095 if (Strings.isNullOrEmpty(topic)) { 096 throw new MetricsException("Kafka topic can not be null"); 097 } 098 099 // Set the rest of Kafka configuration. 100 props.put("key.serializer", 101 "org.apache.kafka.common.serialization.ByteArraySerializer"); 102 props.put("value.serializer", 103 "org.apache.kafka.common.serialization.ByteArraySerializer"); 104 props.put("request.required.acks", "0"); 105 106 // Set the hostname once and use it in every message. 107 hostname = "null"; 108 try { 109 hostname = InetAddress.getLocalHost().getHostName(); 110 } catch (Exception e) { 111 LOG.warn("Error getting Hostname, going to continue"); 112 } 113 114 try { 115 // Create the producer object. 116 producer = new KafkaProducer<Integer, byte[]>(props); 117 } catch (Exception e) { 118 throw new MetricsException("Error creating Producer, " + brokerList, e); 119 } 120 } 121 122 @Override 123 public void putMetrics(MetricsRecord record) { 124 125 if (producer == null) { 126 throw new MetricsException("Producer in KafkaSink is null!"); 127 } 128 129 // Create the json object. 130 StringBuilder jsonLines = new StringBuilder(); 131 132 long timestamp = record.timestamp(); 133 Instant instant = Instant.ofEpochMilli(timestamp); 134 LocalDateTime ldt = LocalDateTime.ofInstant(instant, zoneId); 135 String date = ldt.format(dateFormat); 136 String time = ldt.format(timeFormat); 137 138 // Collect datapoints and populate the json object. 139 jsonLines.append("{\"hostname\": \"" + hostname); 140 jsonLines.append("\", \"timestamp\": " + timestamp); 141 jsonLines.append(", \"date\": \"" + date); 142 jsonLines.append("\",\"time\": \"" + time); 143 jsonLines.append("\",\"name\": \"" + record.name() + "\" "); 144 for (MetricsTag tag : record.tags()) { 145 jsonLines.append( 146 ", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": "); 147 jsonLines.append(" \"" + tag.value().toString() + "\""); 148 } 149 for (AbstractMetric metric : record.metrics()) { 150 jsonLines.append(", \"" 151 + metric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": "); 152 jsonLines.append(" \"" + metric.value().toString() + "\""); 153 } 154 jsonLines.append("}"); 155 LOG.debug("kafka message: " + jsonLines.toString()); 156 157 // Create the record to be sent from the json. 158 ProducerRecord<Integer, byte[]> data = new ProducerRecord<Integer, byte[]>( 159 topic, jsonLines.toString().getBytes(Charset.forName("UTF-8"))); 160 161 // Send the data to the Kafka broker. Here is an example of this data: 162 // {"hostname": "...", "timestamp": 1436913651516, 163 // "date": "2015-6-14","time": "22:40:51","context": "yarn","name": 164 // "QueueMetrics, "running_0": "1", "running_60": "0", "running_300": "0", 165 // "running_1440": "0", "AppsSubmitted": "1", "AppsRunning": "1", 166 // "AppsPending": "0", "AppsCompleted": "0", "AppsKilled": "0", 167 // "AppsFailed": "0", "AllocatedMB": "134656", "AllocatedVCores": "132", 168 // "AllocatedContainers": "132", "AggregateContainersAllocated": "132", 169 // "AggregateContainersReleased": "0", "AvailableMB": "0", 170 // "AvailableVCores": "0", "PendingMB": "275456", "PendingVCores": "269", 171 // "PendingContainers": "269", "ReservedMB": "0", "ReservedVCores": "0", 172 // "ReservedContainers": "0", "ActiveUsers": "1", "ActiveApplications": "1"} 173 Future<RecordMetadata> future = producer.send(data); 174 jsonLines.setLength(0); 175 try { 176 future.get(); 177 } catch (InterruptedException e) { 178 throw new MetricsException("Error sending data", e); 179 } catch (ExecutionException e) { 180 throw new MetricsException("Error sending data", e); 181 } 182 } 183 184 @Override 185 public void flush() { 186 LOG.debug("Kafka seems not to have any flush() mechanism!"); 187 } 188 189 @Override 190 public void close() throws IOException { 191 // Close the producer and set it to null. 192 try { 193 producer.close(); 194 } catch (RuntimeException e) { 195 throw new MetricsException("Error closing producer", e); 196 } finally { 197 producer = null; 198 } 199 } 200}