001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.event;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.util.ShutdownHookManager;
035import org.apache.hadoop.yarn.conf.YarnConfiguration;
036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
037
038import com.google.common.annotations.VisibleForTesting;
039
040/**
041 * Dispatches {@link Event}s in a separate thread. Currently only single thread
042 * does that. Potentially there could be multiple channels for each event type
043 * class and a thread pool can be used to dispatch the events.
044 */
045@SuppressWarnings("rawtypes")
046@Public
047@Evolving
048public class AsyncDispatcher extends AbstractService implements Dispatcher {
049
050  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
051
052  private final BlockingQueue<Event> eventQueue;
053  private volatile int lastEventQueueSizeLogged = 0;
054  private volatile boolean stopped = false;
055
056  // Configuration flag for enabling/disabling draining dispatcher's events on
057  // stop functionality.
058  private volatile boolean drainEventsOnStop = false;
059
060  // Indicates all the remaining dispatcher's events on stop have been drained
061  // and processed.
062  // Race condition happens if dispatcher thread sets drained to true between
063  // handler setting drained to false and enqueueing event. YARN-3878 decided
064  // to ignore it because of its tiny impact. Also see YARN-5436.
065  private volatile boolean drained = true;
066  private final Object waitForDrained = new Object();
067
068  // For drainEventsOnStop enabled only, block newly coming events into the
069  // queue while stopping.
070  private volatile boolean blockNewEvents = false;
071  private final EventHandler handlerInstance = new GenericEventHandler();
072
073  private Thread eventHandlingThread;
074  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
075  private boolean exitOnDispatchException;
076
077  public AsyncDispatcher() {
078    this(new LinkedBlockingQueue<Event>());
079  }
080
081  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
082    super("Dispatcher");
083    this.eventQueue = eventQueue;
084    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
085  }
086
087  Runnable createThread() {
088    return new Runnable() {
089      @Override
090      public void run() {
091        while (!stopped && !Thread.currentThread().isInterrupted()) {
092          drained = eventQueue.isEmpty();
093          // blockNewEvents is only set when dispatcher is draining to stop,
094          // adding this check is to avoid the overhead of acquiring the lock
095          // and calling notify every time in the normal run of the loop.
096          if (blockNewEvents) {
097            synchronized (waitForDrained) {
098              if (drained) {
099                waitForDrained.notify();
100              }
101            }
102          }
103          Event event;
104          try {
105            event = eventQueue.take();
106          } catch(InterruptedException ie) {
107            if (!stopped) {
108              LOG.warn("AsyncDispatcher thread interrupted", ie);
109            }
110            return;
111          }
112          if (event != null) {
113            dispatch(event);
114          }
115        }
116      }
117    };
118  }
119
120  @Override
121  protected void serviceInit(Configuration conf) throws Exception {
122    this.exitOnDispatchException =
123        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
124          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
125    super.serviceInit(conf);
126  }
127
128  @Override
129  protected void serviceStart() throws Exception {
130    //start all the components
131    super.serviceStart();
132    eventHandlingThread = new Thread(createThread());
133    eventHandlingThread.setName("AsyncDispatcher event handler");
134    eventHandlingThread.start();
135  }
136
137  public void setDrainEventsOnStop() {
138    drainEventsOnStop = true;
139  }
140
141  @Override
142  protected void serviceStop() throws Exception {
143    if (drainEventsOnStop) {
144      blockNewEvents = true;
145      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
146      long endTime = System.currentTimeMillis() + getConfig()
147          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
148              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
149
150      synchronized (waitForDrained) {
151        while (!isDrained() && eventHandlingThread != null
152            && eventHandlingThread.isAlive()
153            && System.currentTimeMillis() < endTime) {
154          waitForDrained.wait(1000);
155          LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
156              eventHandlingThread.getState());
157        }
158      }
159    }
160    stopped = true;
161    if (eventHandlingThread != null) {
162      eventHandlingThread.interrupt();
163      try {
164        eventHandlingThread.join();
165      } catch (InterruptedException ie) {
166        LOG.warn("Interrupted Exception while stopping", ie);
167      }
168    }
169
170    // stop all the components
171    super.serviceStop();
172  }
173
174  @SuppressWarnings("unchecked")
175  protected void dispatch(Event event) {
176    //all events go thru this loop
177    if (LOG.isDebugEnabled()) {
178      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
179          + event.toString());
180    }
181
182    Class<? extends Enum> type = event.getType().getDeclaringClass();
183
184    try{
185      EventHandler handler = eventDispatchers.get(type);
186      if(handler != null) {
187        handler.handle(event);
188      } else {
189        throw new Exception("No handler for registered for " + type);
190      }
191    } catch (Throwable t) {
192      //TODO Maybe log the state of the queue
193      LOG.fatal("Error in dispatcher thread", t);
194      // If serviceStop is called, we should exit this thread gracefully.
195      if (exitOnDispatchException
196          && (ShutdownHookManager.get().isShutdownInProgress()) == false
197          && stopped == false) {
198        Thread shutDownThread = new Thread(createShutDownThread());
199        shutDownThread.setName("AsyncDispatcher ShutDown handler");
200        shutDownThread.start();
201      }
202    }
203  }
204
205  @SuppressWarnings("unchecked")
206  @Override
207  public void register(Class<? extends Enum> eventType,
208      EventHandler handler) {
209    /* check to see if we have a listener registered */
210    EventHandler<Event> registeredHandler = (EventHandler<Event>)
211    eventDispatchers.get(eventType);
212    LOG.info("Registering " + eventType + " for " + handler.getClass());
213    if (registeredHandler == null) {
214      eventDispatchers.put(eventType, handler);
215    } else if (!(registeredHandler instanceof MultiListenerHandler)){
216      /* for multiple listeners of an event add the multiple listener handler */
217      MultiListenerHandler multiHandler = new MultiListenerHandler();
218      multiHandler.addHandler(registeredHandler);
219      multiHandler.addHandler(handler);
220      eventDispatchers.put(eventType, multiHandler);
221    } else {
222      /* already a multilistener, just add to it */
223      MultiListenerHandler multiHandler
224      = (MultiListenerHandler) registeredHandler;
225      multiHandler.addHandler(handler);
226    }
227  }
228
229  @Override
230  public EventHandler getEventHandler() {
231    return handlerInstance;
232  }
233
234  class GenericEventHandler implements EventHandler<Event> {
235    public void handle(Event event) {
236      if (blockNewEvents) {
237        return;
238      }
239      drained = false;
240
241      /* all this method does is enqueue all the events onto the queue */
242      int qSize = eventQueue.size();
243      if (qSize != 0 && qSize % 1000 == 0
244          && lastEventQueueSizeLogged != qSize) {
245        lastEventQueueSizeLogged = qSize;
246        LOG.info("Size of event-queue is " + qSize);
247      }
248      int remCapacity = eventQueue.remainingCapacity();
249      if (remCapacity < 1000) {
250        LOG.warn("Very low remaining capacity in the event-queue: "
251            + remCapacity);
252      }
253      try {
254        eventQueue.put(event);
255      } catch (InterruptedException e) {
256        if (!stopped) {
257          LOG.warn("AsyncDispatcher thread interrupted", e);
258        }
259        // Need to reset drained flag to true if event queue is empty,
260        // otherwise dispatcher will hang on stop.
261        drained = eventQueue.isEmpty();
262        throw new YarnRuntimeException(e);
263      }
264    };
265  }
266
267  /**
268   * Multiplexing an event. Sending it to different handlers that
269   * are interested in the event.
270   * @param <T> the type of event these multiple handlers are interested in.
271   */
272  static class MultiListenerHandler implements EventHandler<Event> {
273    List<EventHandler<Event>> listofHandlers;
274
275    public MultiListenerHandler() {
276      listofHandlers = new ArrayList<EventHandler<Event>>();
277    }
278
279    @Override
280    public void handle(Event event) {
281      for (EventHandler<Event> handler: listofHandlers) {
282        handler.handle(event);
283      }
284    }
285
286    void addHandler(EventHandler<Event> handler) {
287      listofHandlers.add(handler);
288    }
289
290  }
291
292  Runnable createShutDownThread() {
293    return new Runnable() {
294      @Override
295      public void run() {
296        LOG.info("Exiting, bbye..");
297        System.exit(-1);
298      }
299    };
300  }
301
302  @VisibleForTesting
303  protected boolean isEventThreadWaiting() {
304    return eventHandlingThread.getState() == Thread.State.WAITING;
305  }
306
307  protected boolean isDrained() {
308    return drained;
309  }
310}