/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include namespace folly { template EventBaseAtomicNotificationQueue:: EventBaseAtomicNotificationQueue(Consumer&& consumer) : pid_(get_cached_pid()), notificationQueue_(), consumer_(std::move(consumer)) { #ifdef FOLLY_HAVE_EVENTFD eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (eventfd_ == -1) { if (errno == ENOSYS || errno == EINVAL) { // eventfd not availalble LOG(ERROR) << "failed to create eventfd for AtomicNotificationQueue: " << errno << ", falling back to pipe mode (is your kernel " << "> 2.6.30?)"; } else { // some other error folly::throwSystemError( "Failed to create eventfd for AtomicNotificationQueue", errno); } } #endif if (eventfd_ == -1) { if (pipe(pipeFds_)) { folly::throwSystemError( "Failed to create pipe for AtomicNotificationQueue", errno); } try { // put both ends of the pipe into non-blocking mode if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) { folly::throwSystemError( "failed to put AtomicNotificationQueue pipe read " "endpoint into non-blocking mode", errno); } if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) { folly::throwSystemError( "failed to put AtomicNotificationQueue pipe write " "endpoint into non-blocking mode", errno); } } catch (...) { ::close(pipeFds_[0]); ::close(pipeFds_[1]); throw; } } } template EventBaseAtomicNotificationQueue:: ~EventBaseAtomicNotificationQueue() { // discard pending tasks and disarm the queue while (drive( [](Task&&) { return AtomicNotificationQueueTaskStatus::DISCARD; })) { } // We must unregister before closing the fd. Otherwise the base class // would unregister the fd after it's already closed, which is invalid // (some other thread could've opened something that reused the fd). unregisterHandler(); // Don't drain fd in the child process. if (pid_ == get_cached_pid()) { // Wait till we observe all the writes before closing fds while (writesObserved_ < (successfulArmCount_ - consumerDisarmedCount_) + writesLocal_) { drainFd(); } DCHECK( writesObserved_ == (successfulArmCount_ - consumerDisarmedCount_) + writesLocal_); } if (eventfd_ >= 0) { ::close(eventfd_); eventfd_ = -1; } if (pipeFds_[0] >= 0) { ::close(pipeFds_[0]); pipeFds_[0] = -1; } if (pipeFds_[1] >= 0) { ::close(pipeFds_[1]); pipeFds_[1] = -1; } } template void EventBaseAtomicNotificationQueue::setMaxReadAtOnce( uint32_t maxAtOnce) { notificationQueue_.setMaxReadAtOnce(maxAtOnce); } template size_t EventBaseAtomicNotificationQueue::size() const { return notificationQueue_.size(); } template bool EventBaseAtomicNotificationQueue::empty() const { return notificationQueue_.empty(); } template void EventBaseAtomicNotificationQueue::drain() { while (drive(consumer_)) { } } template template void EventBaseAtomicNotificationQueue::putMessage( Args&&... args) { if (notificationQueue_.push(std::forward(args)...)) { notifyFd(); } } template bool EventBaseAtomicNotificationQueue::tryPutMessage( Task&& task, uint32_t maxSize) { auto result = notificationQueue_.tryPush(std::forward(task), maxSize); if (result == AtomicNotificationQueue::TryPushResult::SUCCESS_AND_ARMED) { notifyFd(); } return result != AtomicNotificationQueue::TryPushResult::FAILED_LIMIT_REACHED; } template void EventBaseAtomicNotificationQueue::stopConsuming() { evb_ = nullptr; cancelLoopCallback(); unregisterHandler(); detachEventBase(); } template void EventBaseAtomicNotificationQueue::startConsuming( EventBase* evb) { startConsumingImpl(evb, false); } template void EventBaseAtomicNotificationQueue::startConsumingInternal( EventBase* evb) { startConsumingImpl(evb, true); } template void EventBaseAtomicNotificationQueue::startConsumingImpl( EventBase* evb, bool internal) { evb_ = evb; initHandler( evb_, folly::NetworkSocket::fromFd(eventfd_ >= 0 ? eventfd_ : pipeFds_[0])); auto registerHandlerResult = internal ? registerInternalHandler(READ | PERSIST) : registerHandler(READ | PERSIST); if (registerHandlerResult) { edgeTriggeredSet_ = eventfd_ >= 0 && setEdgeTriggered(); ++writesLocal_; notifyFd(); } else { edgeTriggeredSet_ = false; } } template void EventBaseAtomicNotificationQueue::notifyFd() { checkPid(); ssize_t bytes_written = 0; size_t bytes_expected = 0; do { if (eventfd_ >= 0) { // eventfd(2) dictates that we must write a 64-bit integer uint64_t signal = 1; bytes_expected = sizeof(signal); bytes_written = ::write(eventfd_, &signal, bytes_expected); } else { uint8_t signal = 1; bytes_expected = sizeof(signal); bytes_written = ::write(pipeFds_[1], &signal, bytes_expected); } } while (bytes_written == -1 && errno == EINTR); if (bytes_written != ssize_t(bytes_expected)) { folly::throwSystemError( "failed to signal AtomicNotificationQueue after " "write", errno); } } template void EventBaseAtomicNotificationQueue::drainFd() { checkPid(); uint64_t message = 0; if (eventfd_ >= 0) { auto result = readNoInt(eventfd_, &message, sizeof(message)); CHECK(result == sizeof(message) || errno == EAGAIN); writesObserved_ += message; } else { ssize_t result; while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != -1) { writesObserved_ += result; } CHECK(result == -1 && errno == EAGAIN); } } template void EventBaseAtomicNotificationQueue:: runLoopCallback() noexcept { DCHECK(!armed_); if (!notificationQueue_.arm()) { activateEvent(); } else { armed_ = true; successfulArmCount_++; } } template template bool EventBaseAtomicNotificationQueue::drive(T&& consumer) { auto wasEmpty = !notificationQueue_.drive(std::forward(consumer)); if (wasEmpty && armed_) { consumerDisarmedCount_++; } armed_ = false; return !wasEmpty; } template void EventBaseAtomicNotificationQueue::handlerReady( uint16_t) noexcept { execute(); } template void EventBaseAtomicNotificationQueue::execute() { if (!edgeTriggeredSet_) { drainFd(); } drive(consumer_); evb_->runInLoop(this, false, nullptr); } template void EventBaseAtomicNotificationQueue::activateEvent() { if (!EventHandler::activateEvent(0)) { // Fallback for EventBase backends that don't support activateEvent ++writesLocal_; notifyFd(); } } template void EventBaseAtomicNotificationQueue::checkPid() const { if (FOLLY_UNLIKELY(pid_ != get_cached_pid())) { checkPidFail(); } } template [[noreturn]] FOLLY_NOINLINE void EventBaseAtomicNotificationQueue::checkPidFail() const { folly::terminate_with( "Pid mismatch. Pid = " + folly::to(get_cached_pid()) + ". Expecting " + folly::to(pid_)); } } // namespace folly