/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include namespace folly { template AtomicNotificationQueue::Queue::Queue(Queue&& other) noexcept : head_(std::exchange(other.head_, nullptr)), size_(std::exchange(other.size_, 0)) {} template typename AtomicNotificationQueue::Queue& AtomicNotificationQueue::Queue::operator=(Queue&& other) noexcept { clear(); std::swap(head_, other.head_); std::swap(size_, other.size_); return *this; } template AtomicNotificationQueue::Queue::~Queue() { clear(); } template bool AtomicNotificationQueue::Queue::empty() const { return !head_; } template ssize_t AtomicNotificationQueue::Queue::size() const { return size_; } template typename AtomicNotificationQueue::Node& AtomicNotificationQueue::Queue::front() { return *head_; } template void AtomicNotificationQueue::Queue::pop() { std::unique_ptr(std::exchange(head_, head_->next)); --size_; } template void AtomicNotificationQueue::Queue::clear() { while (!empty()) { pop(); } } template AtomicNotificationQueue::Queue::Queue(Node* head, ssize_t size) : head_(head), size_(size) {} template typename AtomicNotificationQueue::Queue AtomicNotificationQueue::Queue::fromReversed(Node* tail) { // Reverse a linked list. Node* head{nullptr}; ssize_t size = 0; while (tail) { head = std::exchange(tail, std::exchange(tail->next, head)); ++size; } return Queue(head, size); } template AtomicNotificationQueue::AtomicQueue::~AtomicQueue() { DCHECK(!head_); if (reinterpret_cast(head_.load(std::memory_order_relaxed)) == kQueueArmedTag) { return; } if (auto head = head_.load(std::memory_order_acquire)) { auto queueContentsToDrop = Queue::fromReversed(head); } } template template bool AtomicNotificationQueue::AtomicQueue::push(Args&&... args) { std::unique_ptr node(new Node(std::forward(args)...)); auto head = head_.load(std::memory_order_relaxed); while (true) { node->next = reinterpret_cast(head) == kQueueArmedTag ? nullptr : head; if (head_.compare_exchange_weak( head, node.get(), std::memory_order_acq_rel, std::memory_order_relaxed)) { node.release(); return reinterpret_cast(head) == kQueueArmedTag; } } } template bool AtomicNotificationQueue::AtomicQueue::hasTasks() const { auto head = head_.load(std::memory_order_relaxed); return head && reinterpret_cast(head) != kQueueArmedTag; } template typename AtomicNotificationQueue::Queue AtomicNotificationQueue::AtomicQueue::getTasks() { auto head = head_.exchange(nullptr, std::memory_order_acquire); if (head && reinterpret_cast(head) != kQueueArmedTag) { return Queue::fromReversed(head); } return {}; } template typename AtomicNotificationQueue::Queue AtomicNotificationQueue::AtomicQueue::arm() { auto head = head_.load(std::memory_order_relaxed); if (!head && head_.compare_exchange_strong( head, reinterpret_cast(kQueueArmedTag), std::memory_order_release, std::memory_order_relaxed)) { return {}; } DCHECK(reinterpret_cast(head) != kQueueArmedTag); return getTasks(); } template AtomicNotificationQueue::AtomicNotificationQueue() {} template AtomicNotificationQueue::~AtomicNotificationQueue() { // Empty the queue atomicQueue_.getTasks(); } template void AtomicNotificationQueue::setMaxReadAtOnce(uint32_t maxAtOnce) { maxReadAtOnce_ = maxAtOnce; } template size_t AtomicNotificationQueue::size() const { auto queueSize = pushCount_.load(std::memory_order_relaxed) - taskExecuteCount_.load(std::memory_order_relaxed); return queueSize >= 0 ? queueSize : 0; } template bool AtomicNotificationQueue::empty() const { return queue_.empty() && !atomicQueue_.hasTasks(); } template bool AtomicNotificationQueue::arm() { if (!queue_.empty()) { return false; } auto queue = atomicQueue_.arm(); if (queue.empty()) { return true; } else { queue_ = std::move(queue); return false; } } template template bool AtomicNotificationQueue::push(Args&&... args) { pushCount_.fetch_add(1, std::memory_order_relaxed); return atomicQueue_.push(std::forward(args)...); } template typename AtomicNotificationQueue::TryPushResult AtomicNotificationQueue::tryPush(Task&& task, uint32_t maxSize) { auto pushed = pushCount_.load(std::memory_order_relaxed); while (true) { auto executed = taskExecuteCount_.load(std::memory_order_relaxed); if (pushed - executed >= maxSize) { return TryPushResult::FAILED_LIMIT_REACHED; } if (pushCount_.compare_exchange_weak( pushed, pushed + 1, std::memory_order_relaxed, std::memory_order_relaxed)) { break; } } return atomicQueue_.push(std::move(task)) ? TryPushResult::SUCCESS_AND_ARMED : TryPushResult::SUCCESS; } namespace detail { template < typename Task, typename Consumer, typename = std::enable_if_t, AtomicNotificationQueueTaskStatus>::value>> AtomicNotificationQueueTaskStatus invokeConsumerWithTask( Consumer&& consumer, Task&& task, std::shared_ptr&& rctx) { RequestContextScopeGuard rcsg(std::move(rctx)); return consumer(std::forward(task)); } template < typename Task, typename Consumer, typename = std::enable_if_t&&>, AtomicNotificationQueueTaskStatus>::value>, typename = void> AtomicNotificationQueueTaskStatus invokeConsumerWithTask( Consumer&& consumer, Task&& task, std::shared_ptr&& rctx) { return consumer( std::forward(task), std::forward>(rctx)); } template < typename Task, typename Consumer, typename = std::enable_if_t< std::is_same, void>::value>, typename = void, typename = void> AtomicNotificationQueueTaskStatus invokeConsumerWithTask( Consumer&& consumer, Task&& task, std::shared_ptr&& rctx) { RequestContextScopeGuard rcsg(std::move(rctx)); consumer(std::forward(task)); return AtomicNotificationQueueTaskStatus::CONSUMED; } template < typename Task, typename Consumer, typename = std::enable_if_t&&>, void>::value>, typename = void, typename = void, typename = void> AtomicNotificationQueueTaskStatus invokeConsumerWithTask( Consumer&& consumer, Task&& task, std::shared_ptr&& rctx) { consumer( std::forward(task), std::forward>(rctx)); return AtomicNotificationQueueTaskStatus::CONSUMED; } } // namespace detail template template bool AtomicNotificationQueue::drive(Consumer&& consumer) { Queue nextQueue; // Since we cannot know if a task will be discarded before trying to execute // it, this check may cause this function to return early. That is, even // though: // 1. numConsumed < maxReadAtOnce_ // 2. atomicQueue_ is not empty // This is not an issue in practice because these tasks will be executed in // the next round. // // In short, if `size() > maxReadAtOnce_`: // * at least maxReadAtOnce_ tasks will be "processed" // * at most maxReadAtOnce_ tasks will be "executed" (rest being discarded) if (maxReadAtOnce_ == 0 || queue_.size() < maxReadAtOnce_) { nextQueue = atomicQueue_.getTasks(); } const bool wasAnyProcessed = !queue_.empty() || !nextQueue.empty(); for (int32_t numConsumed = 0; maxReadAtOnce_ == 0 || numConsumed < maxReadAtOnce_;) { if (queue_.empty()) { queue_ = std::move(nextQueue); if (queue_.empty()) { break; } } // This is faster than fetch_add and is safe because only consumer thread // writes to taskExecuteCount_. taskExecuteCount_.store( taskExecuteCount_.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); { auto& curNode = queue_.front(); AtomicNotificationQueueTaskStatus consumeTaskStatus = detail::invokeConsumerWithTask( std::forward(consumer), std::move(curNode.task), std::move(curNode.rctx)); if (consumeTaskStatus == AtomicNotificationQueueTaskStatus::CONSUMED) { ++numConsumed; } } queue_.pop(); } return wasAnyProcessed; } } // namespace folly