001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.event; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience.Public; 031import org.apache.hadoop.classification.InterfaceStability.Evolving; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.service.AbstractService; 034import org.apache.hadoop.util.ShutdownHookManager; 035import org.apache.hadoop.yarn.conf.YarnConfiguration; 036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 037 038import com.google.common.annotations.VisibleForTesting; 039 040/** 041 * Dispatches {@link Event}s in a separate thread. Currently only single thread 042 * does that. Potentially there could be multiple channels for each event type 043 * class and a thread pool can be used to dispatch the events. 044 */ 045@SuppressWarnings("rawtypes") 046@Public 047@Evolving 048public class AsyncDispatcher extends AbstractService implements Dispatcher { 049 050 private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); 051 052 private final BlockingQueue<Event> eventQueue; 053 private volatile int lastEventQueueSizeLogged = 0; 054 private volatile boolean stopped = false; 055 056 // Configuration flag for enabling/disabling draining dispatcher's events on 057 // stop functionality. 058 private volatile boolean drainEventsOnStop = false; 059 060 // Indicates all the remaining dispatcher's events on stop have been drained 061 // and processed. 062 // Race condition happens if dispatcher thread sets drained to true between 063 // handler setting drained to false and enqueueing event. YARN-3878 decided 064 // to ignore it because of its tiny impact. Also see YARN-5436. 065 private volatile boolean drained = true; 066 private final Object waitForDrained = new Object(); 067 068 // For drainEventsOnStop enabled only, block newly coming events into the 069 // queue while stopping. 070 private volatile boolean blockNewEvents = false; 071 private final EventHandler handlerInstance = new GenericEventHandler(); 072 073 private Thread eventHandlingThread; 074 protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; 075 private boolean exitOnDispatchException; 076 077 public AsyncDispatcher() { 078 this(new LinkedBlockingQueue<Event>()); 079 } 080 081 public AsyncDispatcher(BlockingQueue<Event> eventQueue) { 082 super("Dispatcher"); 083 this.eventQueue = eventQueue; 084 this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); 085 } 086 087 Runnable createThread() { 088 return new Runnable() { 089 @Override 090 public void run() { 091 while (!stopped && !Thread.currentThread().isInterrupted()) { 092 drained = eventQueue.isEmpty(); 093 // blockNewEvents is only set when dispatcher is draining to stop, 094 // adding this check is to avoid the overhead of acquiring the lock 095 // and calling notify every time in the normal run of the loop. 096 if (blockNewEvents) { 097 synchronized (waitForDrained) { 098 if (drained) { 099 waitForDrained.notify(); 100 } 101 } 102 } 103 Event event; 104 try { 105 event = eventQueue.take(); 106 } catch(InterruptedException ie) { 107 if (!stopped) { 108 LOG.warn("AsyncDispatcher thread interrupted", ie); 109 } 110 return; 111 } 112 if (event != null) { 113 dispatch(event); 114 } 115 } 116 } 117 }; 118 } 119 120 @Override 121 protected void serviceInit(Configuration conf) throws Exception { 122 this.exitOnDispatchException = 123 conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, 124 Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); 125 super.serviceInit(conf); 126 } 127 128 @Override 129 protected void serviceStart() throws Exception { 130 //start all the components 131 super.serviceStart(); 132 eventHandlingThread = new Thread(createThread()); 133 eventHandlingThread.setName("AsyncDispatcher event handler"); 134 eventHandlingThread.start(); 135 } 136 137 public void setDrainEventsOnStop() { 138 drainEventsOnStop = true; 139 } 140 141 @Override 142 protected void serviceStop() throws Exception { 143 if (drainEventsOnStop) { 144 blockNewEvents = true; 145 LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); 146 long endTime = System.currentTimeMillis() + getConfig() 147 .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 148 YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); 149 150 synchronized (waitForDrained) { 151 while (!isDrained() && eventHandlingThread != null 152 && eventHandlingThread.isAlive() 153 && System.currentTimeMillis() < endTime) { 154 waitForDrained.wait(1000); 155 LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + 156 eventHandlingThread.getState()); 157 } 158 } 159 } 160 stopped = true; 161 if (eventHandlingThread != null) { 162 eventHandlingThread.interrupt(); 163 try { 164 eventHandlingThread.join(); 165 } catch (InterruptedException ie) { 166 LOG.warn("Interrupted Exception while stopping", ie); 167 } 168 } 169 170 // stop all the components 171 super.serviceStop(); 172 } 173 174 @SuppressWarnings("unchecked") 175 protected void dispatch(Event event) { 176 //all events go thru this loop 177 if (LOG.isDebugEnabled()) { 178 LOG.debug("Dispatching the event " + event.getClass().getName() + "." 179 + event.toString()); 180 } 181 182 Class<? extends Enum> type = event.getType().getDeclaringClass(); 183 184 try{ 185 EventHandler handler = eventDispatchers.get(type); 186 if(handler != null) { 187 handler.handle(event); 188 } else { 189 throw new Exception("No handler for registered for " + type); 190 } 191 } catch (Throwable t) { 192 //TODO Maybe log the state of the queue 193 LOG.fatal("Error in dispatcher thread", t); 194 // If serviceStop is called, we should exit this thread gracefully. 195 if (exitOnDispatchException 196 && (ShutdownHookManager.get().isShutdownInProgress()) == false 197 && stopped == false) { 198 Thread shutDownThread = new Thread(createShutDownThread()); 199 shutDownThread.setName("AsyncDispatcher ShutDown handler"); 200 shutDownThread.start(); 201 } 202 } 203 } 204 205 @SuppressWarnings("unchecked") 206 @Override 207 public void register(Class<? extends Enum> eventType, 208 EventHandler handler) { 209 /* check to see if we have a listener registered */ 210 EventHandler<Event> registeredHandler = (EventHandler<Event>) 211 eventDispatchers.get(eventType); 212 LOG.info("Registering " + eventType + " for " + handler.getClass()); 213 if (registeredHandler == null) { 214 eventDispatchers.put(eventType, handler); 215 } else if (!(registeredHandler instanceof MultiListenerHandler)){ 216 /* for multiple listeners of an event add the multiple listener handler */ 217 MultiListenerHandler multiHandler = new MultiListenerHandler(); 218 multiHandler.addHandler(registeredHandler); 219 multiHandler.addHandler(handler); 220 eventDispatchers.put(eventType, multiHandler); 221 } else { 222 /* already a multilistener, just add to it */ 223 MultiListenerHandler multiHandler 224 = (MultiListenerHandler) registeredHandler; 225 multiHandler.addHandler(handler); 226 } 227 } 228 229 @Override 230 public EventHandler getEventHandler() { 231 return handlerInstance; 232 } 233 234 class GenericEventHandler implements EventHandler<Event> { 235 public void handle(Event event) { 236 if (blockNewEvents) { 237 return; 238 } 239 drained = false; 240 241 /* all this method does is enqueue all the events onto the queue */ 242 int qSize = eventQueue.size(); 243 if (qSize != 0 && qSize % 1000 == 0 244 && lastEventQueueSizeLogged != qSize) { 245 lastEventQueueSizeLogged = qSize; 246 LOG.info("Size of event-queue is " + qSize); 247 } 248 int remCapacity = eventQueue.remainingCapacity(); 249 if (remCapacity < 1000) { 250 LOG.warn("Very low remaining capacity in the event-queue: " 251 + remCapacity); 252 } 253 try { 254 eventQueue.put(event); 255 } catch (InterruptedException e) { 256 if (!stopped) { 257 LOG.warn("AsyncDispatcher thread interrupted", e); 258 } 259 // Need to reset drained flag to true if event queue is empty, 260 // otherwise dispatcher will hang on stop. 261 drained = eventQueue.isEmpty(); 262 throw new YarnRuntimeException(e); 263 } 264 }; 265 } 266 267 /** 268 * Multiplexing an event. Sending it to different handlers that 269 * are interested in the event. 270 * @param <T> the type of event these multiple handlers are interested in. 271 */ 272 static class MultiListenerHandler implements EventHandler<Event> { 273 List<EventHandler<Event>> listofHandlers; 274 275 public MultiListenerHandler() { 276 listofHandlers = new ArrayList<EventHandler<Event>>(); 277 } 278 279 @Override 280 public void handle(Event event) { 281 for (EventHandler<Event> handler: listofHandlers) { 282 handler.handle(event); 283 } 284 } 285 286 void addHandler(EventHandler<Event> handler) { 287 listofHandlers.add(handler); 288 } 289 290 } 291 292 Runnable createShutDownThread() { 293 return new Runnable() { 294 @Override 295 public void run() { 296 LOG.info("Exiting, bbye.."); 297 System.exit(-1); 298 } 299 }; 300 } 301 302 @VisibleForTesting 303 protected boolean isEventThreadWaiting() { 304 return eventHandlingThread.getState() == Thread.State.WAITING; 305 } 306 307 protected boolean isDrained() { 308 return drained; 309 } 310}