 * Copyright (c) Facebook, Inc. and its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

#pragma once

#include <sys/types.h>

#include <algorithm>
#include <iterator>
#include <memory>
#include <stdexcept>
#include <utility>

#include <boost/intrusive/slist.hpp>
#include <glog/logging.h>

#include <folly/Exception.h>
#include <folly/FileUtil.h>
#include <folly/Likely.h>
#include <folly/ScopeGuard.h>
#include <folly/SpinLock.h>
#include <folly/io/async/DelayedDestruction.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/Request.h>
#include <folly/portability/Fcntl.h>
#include <folly/portability/Sockets.h>
#include <folly/portability/Unistd.h>
#include <folly/system/Pid.h>

#if defined(__linux__) && !defined(__ANDROID__)
#include <folly/io/async/EventFDWrapper.h>

namespace folly {

 * A producer-consumer queue for passing messages between EventBase threads.
 * Messages can be added to the queue from any thread.  Multiple consumers may
 * listen to the queue from multiple EventBase threads.
 * A NotificationQueue may not be destroyed while there are still consumers
 * registered to receive events from the queue.  It is the user's
 * responsibility to ensure that all consumers are unregistered before the
 * queue is destroyed.
 * MessageT should be MoveConstructible (i.e., must support either a move
 * constructor or a copy constructor, or both).  Ideally it's move constructor
 * (or copy constructor if no move constructor is provided) should never throw
 * exceptions.  If the constructor may throw, the consumers could end up
 * spinning trying to move a message off the queue and failing, and then
 * retrying.
template <typename MessageT>
class NotificationQueue {
  struct Node : public boost::intrusive::slist_base_hook<
                    boost::intrusive::cache_last<true>> {
    template <typename MessageTT>
    Node(MessageTT&& msg, std::shared_ptr<RequestContext> ctx)
        : msg_(std::forward<MessageTT>(msg)), ctx_(std::move(ctx)) {}
    MessageT msg_;
    std::shared_ptr<RequestContext> ctx_;

   * A callback interface for consuming messages from the queue as they arrive.
  class Consumer : public DelayedDestruction, private EventHandler {
    enum : uint16_t { kDefaultMaxReadAtOnce = 10 };

        : queue_(nullptr),
          maxReadAtOnce_(kDefaultMaxReadAtOnce) {}

    // create a consumer in-place, without the need to build new class
    template <typename TCallback>
    static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
        TCallback&& callback);

     * messageAvailable() will be invoked whenever a new
     * message is available from the pipe.
    virtual void messageAvailable(MessageT&& message) noexcept = 0;

     * Begin consuming messages from the specified queue.
     * messageAvailable() will be called whenever a message is available.  This
     * consumer will continue to consume messages until stopConsuming() is
     * called.
     * A Consumer may only consume messages from a single NotificationQueue at
     * a time.  startConsuming() should not be called if this consumer is
     * already consuming.
    void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
      init(eventBase, queue);
      registerHandler(READ | PERSIST);

     * Same as above but registers this event handler as internal so that it
     * doesn't count towards the pending reader count for the IOLoop.
    void startConsumingInternal(
        EventBase* eventBase, NotificationQueue* queue) {
      init(eventBase, queue);
      registerInternalHandler(READ | PERSIST);

     * Stop consuming messages.
     * startConsuming() may be called again to resume consumption of messages
     * at a later point in time.
    void stopConsuming();

     * Consume messages off the queue until it is empty. No messages may be
     * added to the queue while it is draining, so that the process is bounded.
     * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
     * and tryPutMessageNoThrow will return false.
     * @returns true if the queue was drained, false otherwise. In practice,
     * this will only fail if someone else is already draining the queue.
    bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;

     * Get the NotificationQueue that this consumer is currently consuming
     * messages from.  Returns nullptr if the consumer is not currently
     * consuming events from any queue.
    NotificationQueue* getCurrentQueue() const { return queue_; }

     * Set a limit on how many messages this consumer will read each iteration
     * around the event loop.
     * This helps rate-limit how much work the Consumer will do each event loop
     * iteration, to prevent it from starving other event handlers.
     * A limit of 0 means no limit will be enforced.  If unset, the limit
     * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
    void setMaxReadAtOnce(uint32_t maxAtOnce) { maxReadAtOnce_ = maxAtOnce; }
    uint32_t getMaxReadAtOnce() const { return maxReadAtOnce_; }

    EventBase* getEventBase() { return base_; }

    void handlerReady(uint16_t events) noexcept override;

    void destroy() override;

    ~Consumer() override {}

     * Consume messages off the queue until
     *   - the queue is empty (1), or
     *   - until the consumer is destroyed, or
     *   - until the consumer is uninstalled, or
     *   - an exception is thrown in the course of dequeueing, or
     *   - unless isDrain is true, until the maxReadAtOnce_ limit is hit
     * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
    void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;

    void setActive(bool active, bool shouldLock = false) {
      if (!queue_) {
        active_ = active;
      if (shouldLock) {
      if (!active_ && active) {
      } else if (active_ && !active) {
      active_ = active;
      if (shouldLock) {
    void init(EventBase* eventBase, NotificationQueue* queue);

    NotificationQueue* queue_;
    bool* destroyedFlagPtr_;
    uint32_t maxReadAtOnce_;
    EventBase* base_;
    bool active_{false};

  class SimpleConsumer {
    explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {

    ~SimpleConsumer() { --queue_.numConsumers_; }

    int getFd() const {
      return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];

    template <typename F>
    void consume(F&& f);

    NotificationQueue& queue_;

  enum class FdType {

   * Create a new NotificationQueue.
   * If the maxSize parameter is specified, this sets the maximum queue size
   * that will be enforced by tryPutMessage().  (This size is advisory, and may
   * be exceeded if producers explicitly use putMessage() instead of
   * tryPutMessage().)
   * The fdType parameter determines the type of file descriptor used
   * internally to signal message availability.  The default (eventfd) is
   * preferable for performance and because it won't fail when the queue gets
   * too long.  It is not available on on older and non-linux kernels, however.
   * In this case the code will fall back to using a pipe, the parameter is
   * mostly for testing purposes.
  explicit NotificationQueue(
      uint32_t maxSize = 0,
      FdType fdType = FdType::EVENTFD)
      FdType fdType = FdType::PIPE)
      : eventfd_(-1),
        pipeFds_{-1, -1},
        pid_(folly::get_cached_pid()) {

    if (fdType == FdType::EVENTFD) {
      eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
      if (eventfd_ == -1) {
        if (errno == ENOSYS || errno == EINVAL) {
          // eventfd not availalble
          LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
                     << errno << ", falling back to pipe mode (is your kernel "
                     << "> 2.6.30?)";
          fdType = FdType::PIPE;
        } else {
          // some other error
              "Failed to create eventfd for "
    if (fdType == FdType::PIPE) {
      if (pipe(pipeFds_)) {
            "Failed to create pipe for NotificationQueue", errno);
      try {
        // put both ends of the pipe into non-blocking mode
        if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
              "failed to put NotificationQueue pipe read "
              "endpoint into non-blocking mode",
        if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
              "failed to put NotificationQueue pipe write "
              "endpoint into non-blocking mode",
      } catch (...) {

  ~NotificationQueue() {
    std::unique_ptr<Node> data;
    while (!queue_.empty()) {
    if (eventfd_ >= 0) {
      eventfd_ = -1;
    if (pipeFds_[0] >= 0) {
      pipeFds_[0] = -1;
    if (pipeFds_[1] >= 0) {
      pipeFds_[1] = -1;

   * Set the advisory maximum queue size.
   * This maximum queue size affects calls to tryPutMessage().  Message
   * producers can still use the putMessage() call to unconditionally put a
   * message on the queue, ignoring the configured maximum queue size.  This
   * can cause the queue size to exceed the configured maximum.
  void setMaxQueueSize(uint32_t max) { advisoryMaxQueueSize_ = max; }

   * Attempt to put a message on the queue if the queue is not already full.
   * If the queue is full, a std::overflow_error will be thrown.  The
   * setMaxQueueSize() function controls the maximum queue size.
   * If the queue is currently draining, an std::runtime_error will be thrown.
   * This method may contend briefly on a spinlock if many threads are
   * concurrently accessing the queue, but for all intents and purposes it will
   * immediately place the message on the queue and return.
   * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
   * may throw any other exception thrown by the MessageT move/copy
   * constructor.
  template <typename MessageTT>
  void tryPutMessage(MessageTT&& message) {
    putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);

   * No-throw versions of the above.  Instead returns true on success, false on
   * failure.
   * Only std::overflow_error (the common exception case) and std::runtime_error
   * (which indicates that the queue is being drained) are prevented from being
   * thrown. User code must still catch std::bad_alloc errors.
  template <typename MessageTT>
  bool tryPutMessageNoThrow(MessageTT&& message) {
    return putMessageImpl(
        std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);

   * Unconditionally put a message on the queue.
   * This method is like tryPutMessage(), but ignores the maximum queue size
   * and always puts the message on the queue, even if the maximum queue size
   * would be exceeded.
   * putMessage() may throw
   *   - std::bad_alloc if memory allocation fails, and may
   *   - std::runtime_error if the queue is currently draining
   *   - any other exception thrown by the MessageT move/copy constructor.
  template <typename MessageTT>
  void putMessage(MessageTT&& message) {
    putMessageImpl(std::forward<MessageTT>(message), 0);

   * Put several messages on the queue.
  template <typename InputIteratorT>
  void putMessages(InputIteratorT first, InputIteratorT last) {
    typedef typename std::iterator_traits<InputIteratorT>::iterator_category
    putMessagesImpl(first, last, IterCategory());

   * Try to immediately pull a message off of the queue, without blocking.
   * If a message is immediately available, the result parameter will be
   * updated to contain the message contents and true will be returned.
   * If no message is available, false will be returned and result will be left
   * unmodified.
  bool tryConsume(MessageT& result) {
    SCOPE_EXIT { syncSignalAndQueue(); };

    std::unique_ptr<Node> data;

      std::unique_lock<SpinLock> g(spinlock_);

      if (UNLIKELY(queue_.empty())) {
        return false;


    result = std::move(data->msg_);

    return true;

  size_t size() const {
    std::unique_lock<SpinLock> g(spinlock_);
    return queue_.size();

   * Check that the NotificationQueue is being used from the correct process.
   * If you create a NotificationQueue in one process, then fork, and try to
   * send messages to the queue from the child process, you're going to have a
   * bad time.  Unfortunately users have (accidentally) run into this.
   * Because we use an eventfd/pipe, the child process can actually signal the
   * parent process that an event is ready.  However, it can't put anything on
   * the parent's queue, so the parent wakes up and finds an empty queue.  This
   * check ensures that we catch the problem in the misbehaving child process
   * code, and crash before signalling the parent process.
  void checkPid() const {
    if (FOLLY_UNLIKELY(pid_ != folly::get_cached_pid())) {

  // Forbidden copy constructor and assignment operator
  NotificationQueue(NotificationQueue const&) = delete;
  NotificationQueue& operator=(NotificationQueue const&) = delete;

  inline bool checkQueueSize(size_t maxSize, bool throws = true) const {
    DCHECK(0 == spinlock_.try_lock());
    if (maxSize > 0 && queue_.size() >= maxSize) {
      if (throws) {
        throw std::overflow_error(
            "unable to add message to NotificationQueue: "
            "queue is full");
      return false;
    return true;

  inline bool checkDraining(bool throws = true) {
    if (UNLIKELY(draining_ && throws)) {
      throw std::runtime_error("queue is draining, cannot add message");
    return draining_;

  void ensureSignalLocked() const {
    // semantics: empty fd == empty queue <=> !signal_
    if (signal_) {

    ssize_t bytes_written = 0;
    size_t bytes_expected = 0;

    do {
      if (eventfd_ >= 0) {
        // eventfd(2) dictates that we must write a 64-bit integer
        uint64_t signal = 1;
        bytes_expected = sizeof(signal);
        bytes_written = ::write(eventfd_, &signal, bytes_expected);
      } else {
        uint8_t signal = 1;
        bytes_expected = sizeof(signal);
        bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
    } while (bytes_written == -1 && errno == EINTR);

    if (bytes_written == ssize_t(bytes_expected)) {
      signal_ = true;
    } else {
          "failed to signal NotificationQueue after "

  void drainSignalsLocked() {
    ssize_t bytes_read = 0;
    if (eventfd_ > 0) {
      uint64_t message;
      bytes_read = readNoInt(eventfd_, &message, sizeof(message));
      CHECK(bytes_read != -1 || errno == EAGAIN);
    } else {
      // There should only be one byte in the pipe. To avoid potential leaks we
      // still drain.
      uint8_t message[32];
      ssize_t result;
      while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) !=
             -1) {
        bytes_read += result;
      CHECK(result == -1 && errno == EAGAIN);
      LOG_IF(ERROR, bytes_read > 1)
          << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
          << bytes_read << " bytes, expected <= 1";
    LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
        << "[NotificationQueue] Unexpected state while draining signals: signal_="
        << signal_ << " bytes_read=" << bytes_read;

    signal_ = false;

  void ensureSignal() const {
    std::unique_lock<SpinLock> g(spinlock_);

  void syncSignalAndQueue() {
    std::unique_lock<SpinLock> g(spinlock_);

    if (queue_.empty()) {
    } else {

  template <typename MessageTT>
  bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
    bool signal = false;
      auto data = std::make_unique<Node>(
          std::forward<MessageTT>(message), RequestContext::saveContext());
      std::unique_lock<SpinLock> g(spinlock_);
      if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
        return false;
      // We only need to signal an event if not all consumers are
      // awake.
      if (numActiveConsumers_ < numConsumers_) {
        signal = true;
      if (signal) {
    return true;

  template <typename InputIteratorT>
  void putMessagesImpl(
      InputIteratorT first, InputIteratorT last, std::input_iterator_tag) {
    bool signal = false;
    boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q;
    try {
      while (first != last) {
        auto data = std::make_unique<Node>(
            std::move(*first), RequestContext::saveContext());
      std::unique_lock<SpinLock> g(spinlock_);
      queue_.splice(queue_.end(), q);
      if (numActiveConsumers_ < numConsumers_) {
        signal = true;
      if (signal) {
    } catch (...) {
      std::unique_ptr<Node> data;
      while (!q.empty()) {

  FOLLY_NOINLINE void checkPidFail() const {
        "Pid mismatch. Pid = " +
        folly::to<std::string>(folly::get_cached_pid()) + ". Expecting " +

  mutable folly::SpinLock spinlock_;
  mutable bool signal_{false};
  int eventfd_;
  int pipeFds_[2]; // to fallback to on older/non-linux systems
  uint32_t advisoryMaxQueueSize_;
  pid_t pid_;
  boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> queue_;
  int numConsumers_{0};
  std::atomic<int> numActiveConsumers_{0};
  bool draining_{false};

template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::destroy() {
  // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
  // will be non-nullptr.  Mark the value that it points to, so that
  // handlerReady() will know the callback is destroyed, and that it cannot
  // access any member variables anymore.
  if (destroyedFlagPtr_) {
    *destroyedFlagPtr_ = true;

template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::handlerReady(
    uint16_t /*events*/) noexcept {

template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::consumeMessages(
    bool isDrain, size_t* numConsumed) noexcept {
  DestructorGuard dg(this);
  uint32_t numProcessed = 0;
    if (queue_) {
  SCOPE_EXIT { setActive(false, /* shouldLock = */ true); };
    if (numConsumed != nullptr) {
      *numConsumed = numProcessed;
  while (true) {
    // Now pop the message off of the queue.
    // We have to manually acquire and release the spinlock here, rather than
    // using SpinLockHolder since the MessageT has to be constructed while
    // holding the spinlock and available after we release it.  SpinLockHolder
    // unfortunately doesn't provide a release() method.  (We can't construct
    // MessageT first since we have no guarantee that MessageT has a default
    // constructor.
    bool locked = true;

    try {
      if (UNLIKELY(queue_->queue_.empty())) {
        // If there is no message, we've reached the end of the queue, return.

      // Pull a message off the queue.
      std::unique_ptr<Node> data;

      // Check to see if the queue is empty now.
      // We use this as an optimization to see if we should bother trying to
      // loop again and read another message after invoking this callback.
      bool wasEmpty = queue_->queue_.empty();
      if (wasEmpty) {

      // Now unlock the spinlock before we invoke the callback.
      RequestContextScopeGuard rctx(std::move(data->ctx_));

      locked = false;

      // Call the callback
      bool callbackDestroyed = false;
      CHECK(destroyedFlagPtr_ == nullptr);
      destroyedFlagPtr_ = &callbackDestroyed;
      destroyedFlagPtr_ = nullptr;

      // Make sure message destructor is called with the correct RequestContext.

      // If the callback was destroyed before it returned, we are done
      if (callbackDestroyed) {

      // If the callback is no longer installed, we are done.
      if (queue_ == nullptr) {

      // If we have hit maxReadAtOnce_, we are done.
      if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {

      // If the queue was empty before we invoked the callback, it's probable
      // that it is still empty now.  Just go ahead and return, rather than
      // looping again and trying to re-read from the eventfd.  (If a new
      // message had in fact arrived while we were invoking the callback, we
      // will simply be woken up the next time around the event loop and will
      // process the message then.)
      if (wasEmpty) {
    } catch (const std::exception&) {
      // This catch block is really just to handle the case where the MessageT
      // constructor throws.  The messageAvailable() callback itself is
      // declared as noexcept and should never throw.
      // If the MessageT constructor does throw we try to handle it as best as
      // we can, but we can't work miracles.  We will just ignore the error for
      // now and return.  The next time around the event loop we will end up
      // trying to read the message again.  If MessageT continues to throw we
      // will never make forward progress and will keep trying each time around
      // the event loop.
      if (locked) {
        // Unlock the spinlock.


template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::init(
    EventBase* eventBase, NotificationQueue* queue) {
  assert(queue_ == nullptr);

  base_ = eventBase;

  queue_ = queue;

    std::unique_lock<SpinLock> g(queue_->spinlock_);

  if (queue_->eventfd_ >= 0) {
    initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->eventfd_));
  } else {
    initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->pipeFds_[0]));

template <typename MessageT>
void NotificationQueue<MessageT>::Consumer::stopConsuming() {
  if (queue_ == nullptr) {

    std::unique_lock<SpinLock> g(queue_->spinlock_);

  queue_ = nullptr;

template <typename MessageT>
bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
    size_t* numConsumed) noexcept {
  DestructorGuard dg(this);
    std::unique_lock<SpinLock> g(queue_->spinlock_);
    if (queue_->draining_) {
      return false;
    queue_->draining_ = true;
  consumeMessages(true, numConsumed);
    std::unique_lock<SpinLock> g(queue_->spinlock_);
    queue_->draining_ = false;
  return true;

template <typename MessageT>
template <typename F>
void NotificationQueue<MessageT>::SimpleConsumer::consume(F&& foreach) {
  SCOPE_EXIT { queue_.syncSignalAndQueue(); };


  std::unique_ptr<Node> data;
    std::unique_lock<SpinLock> g(queue_.spinlock_);

    if (UNLIKELY(queue_.queue_.empty())) {


  RequestContextScopeGuard rctx(std::move(data->ctx_));
  // Make sure message destructor is called with the correct RequestContext.

 * Creates a NotificationQueue::Consumer wrapping a function object
 * Modeled after AsyncTimeout::make

namespace detail {

template <typename MessageT, typename TCallback>
struct notification_queue_consumer_wrapper
    : public NotificationQueue<MessageT>::Consumer {
  template <typename UCallback>
  explicit notification_queue_consumer_wrapper(UCallback&& callback)
      : callback_(std::forward<UCallback>(callback)) {}

  // we are being stricter here and requiring noexcept for callback
  void messageAvailable(MessageT&& message) noexcept override {
        "callback must be declared noexcept, e.g.: `[]() noexcept {}`");


  TCallback callback_;

} // namespace detail

template <typename MessageT>
template <typename TCallback>
    typename NotificationQueue<MessageT>::Consumer,
NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
  return std::unique_ptr<
      new detail::notification_queue_consumer_wrapper<
          typename std::decay<TCallback>::type>(

} // namespace folly