/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include namespace folly { /** * A producer-consumer queue for passing tasks to consumer thread. * * Users of this class are expected to implement functionality to wakeup * consumer thread. */ template class AtomicNotificationQueue { public: explicit AtomicNotificationQueue(); ~AtomicNotificationQueue(); /* * Set the maximum number of tasks processed in a single round. * Can be called from consumer thread only. */ void setMaxReadAtOnce(uint32_t maxAtOnce); /* * Returns the number of tasks in the queue. * Can be called from any thread. */ size_t size() const; /* * Checks if the queue is empty. * Can be called from consumer thread only. */ bool empty() const; /* * Tries to arm the queue. * 1) If the queue was empty: the queue becomes armed and true is returned. * 2) Otherwise returns false. * Can be called from consumer thread only. */ bool arm(); /* * Executes one round of tasks. Returns true iff tasks were run. * Can be called from consumer thread only. * * `Consumer::operator()` must accept `Task&&` as its first parameter. * It may also optionally accept `std::shared_ptr&&` as * its second parameter, in which case it must manage `folly::RequestContext` * for the consumed task. * * Consumer::operator() can optionally return * AtomicNotificationQueueTaskStatus to indicate if the provided task should * be considered consumed or discarded. Discarded tasks are not counted * towards `maxReadAtOnce`. */ template bool drive(Consumer&& consumer); /* * Adds a task into the queue. * Can be called from any thread. * Returns true iff the queue was armed, in which case * producers are expected to notify consumer thread. */ template bool push(Args&&... args); /* * Same as above * but RequestContext is passed from caller */ template bool push(std::shared_ptr rctx, Args&&... args); /* * Attempts adding a task into the queue. * Can be called from any thread. * Similarly to push(), producers are expected to notify * consumer iff SUCCESS_AND_ARMED is returned. */ enum class TryPushResult { FAILED_LIMIT_REACHED, SUCCESS, SUCCESS_AND_ARMED }; TryPushResult tryPush(Task&& task, uint32_t maxSize); private: struct Node { Task task; std::shared_ptr rctx; private: friend class AtomicNotificationQueue; template explicit Node( std::shared_ptr requestContext, Args&&... args) : task(std::forward(args)...), rctx(std::move(requestContext)) {} Node* next{}; }; class AtomicQueue; class Queue { public: Queue() {} Queue(Queue&& other) noexcept; Queue& operator=(Queue&& other) noexcept; ~Queue(); bool empty() const; ssize_t size() const; Node& front(); void pop(); void clear(); private: friend class AtomicNotificationQueue::AtomicQueue; Queue(Node* head, ssize_t size); static Queue fromReversed(Node* tail); Node* head_{nullptr}; ssize_t size_{0}; }; /** * Lock-free queue implementation. * The queue can be in 3 states: * 1) Empty * 2) Armed * 3) Non-empty (1 or more tasks in it) * * This diagram shows possible state transitions: * * +---------+ successful arm +-------------+ * | | +---------- arm() ----------> | | * | Empty | | Armed | +-+ * | | <------- getTasks() --------+ | | | * +-+--+----+ consumer disarm +-------------+ | * | ^ | * | | | * | | consumer pull armed push v * | | | * | | +-------------------+ | * v +- getTasks() -+ | | | * | | | Non-empty | <---- push()----+ * | ^---- arm() ---+ | | * | +-+--+------------+-+ * | ^ ^ | * | | | | * +------- push() -------^ ^-- push() --+ * disarmed push * * push() can be called in any state. It always transitions the queue into * Non-empty: * When Armed - push() returns true * When Empty/Non-empty - push() returns false * * getTasks() can be called in any state. It always transitions the queue into * Empty. * * arm() can't be called if the queue is already in Armed state: * When Empty - arm() returns an empty queue and transitions into Armed * When Non-Empty: equivalent to getTasks() * */ class AtomicQueue { public: AtomicQueue() {} ~AtomicQueue(); AtomicQueue(const AtomicQueue&) = delete; AtomicQueue& operator=(const AtomicQueue&) = delete; /* * Pushes a task into the queue. Returns true iff the queue was armed. * Can be called from any thread. */ template bool push(Args&&... args); /* * Same as above * but RequestContext is passed from caller */ template bool push(std::shared_ptr rctx, Args&&... args); /* * Returns true if the queue has tasks. * Can be called from any thread. */ bool hasTasks() const; /* * Returns all tasks currently in the queue (in FIFO order). Queue becomes * empty. * Can be called from consumer thread only. */ Queue getTasks(); /* * Tries to arm the queue. * 1) If the queue was empty: the queue becomes armed and an empty queue is * returned. * 2) If the queue wasn't empty: acts as getTasks(). * Can be called from consumer thread only. */ Queue arm(); private: template bool pushImpl(std::shared_ptr rctx, Args&&... args); alignas(folly::cacheline_align_v) std::atomic head_{}; static constexpr intptr_t kQueueArmedTag = 1; }; private: alignas(folly::cacheline_align_v) std::atomic pushCount_{0}; AtomicQueue atomicQueue_; Queue queue_; std::atomic taskExecuteCount_{0}; uint32_t maxReadAtOnce_{10}; }; /** * Consumer::operator() can optionally return AtomicNotificationQueueTaskStatus * to indicate if the provided task should be considered consumed or * discarded. Discarded tasks are not counted towards maxReadAtOnce_. */ enum class AtomicNotificationQueueTaskStatus : bool { // The dequeued task was consumed and should be counted as such CONSUMED = true, // The dequeued task should be discarded and the queue not count it as // consumed DISCARD = false }; } // namespace folly #include