/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include namespace folly { namespace channels { template MultiplexedSubscriptions::MultiplexedSubscriptions( SubscriptionMap& subscriptions) : subscriptions_(subscriptions) {} template bool MultiplexedSubscriptions::hasSubscription( const MultiplexedSubscriptions::KeyType& key) { return subscriptions_.contains(key) && !closedSubscriptionKeys_.contains(key); } template typename MultiplexedSubscriptions::KeyContextType& MultiplexedSubscriptions::getKeyContext( const MultiplexedSubscriptions::KeyType& key) { ensureKeyExists(key); return std::get(subscriptions_.at(key)); } template template void MultiplexedSubscriptions::write( const MultiplexedSubscriptions::KeyType& key, U&& value) { ensureKeyExists(key); auto& sender = std::get>(subscriptions_.at(key)); sender.write(std::forward(value)); } template void MultiplexedSubscriptions::close( const MultiplexedSubscriptions::KeyType& key, folly::exception_wrapper ex) { ensureKeyExists(key); auto& sender = std::get>(subscriptions_.at(key)); if (ex) { std::move(sender).close(std::move(ex)); } else { std::move(sender).close(); } // We do not erase from the subscriptions_ map yet, because we do not want // to invalidate the view returned by getSubscriptionKeys. closedSubscriptionKeys_.insert(key); } template void MultiplexedSubscriptions::ensureKeyExists( const KeyType& key) { if (!subscriptions_.contains(key) || closedSubscriptionKeys_.contains(key)) { throw std::runtime_error("Subscription with the given key does not exist."); } } template MultiplexChannel::MultiplexChannel(TProcessor* processor) : processor_(processor) {} template MultiplexChannel::MultiplexChannel( MultiplexChannel&& other) noexcept : processor_(std::exchange(other.processor_, nullptr)) {} template MultiplexChannel& MultiplexChannel::operator=( MultiplexChannel&& other) noexcept { if (&other == this) { return *this; } if (processor_) { std::move(*this).close(); } processor_ = std::exchange(other.processor_, nullptr); return *this; } template MultiplexChannel::~MultiplexChannel() { if (processor_ != nullptr) { std::move(*this).close(folly::exception_wrapper()); } } template MultiplexChannel::operator bool() const { return processor_; } template Receiver::OutputValueType> MultiplexChannel::subscribe( KeyType key, SubscriptionArgType subscriptionArg) { return processor_->subscribe(std::move(key), std::move(subscriptionArg)); } template folly::coro::Task::KeyType, typename MultiplexChannel::KeyContextType>>> MultiplexChannel::clearUnusedSubscriptions() { co_return co_await processor_->clearUnusedSubscriptions(); } template bool MultiplexChannel::anySubscribers() const { return processor_->anySubscribers(); } template void MultiplexChannel::close(folly::exception_wrapper ex) && { processor_->destroyHandle( ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult()); processor_ = nullptr; } namespace detail { /** * This object fans out values from the input receiver to all output receivers. * The lifetime of this object is described by the following state machine. * * The input receiver can be in one of three conceptual states: Active, * CancellationTriggered, or CancellationProcessed (removed). When the input * receiver reaches the CancellationProcessed state AND the user's * MultiplexChannel object is deleted, this object is deleted. * * When an input receiver receives a value indicating that the channel has * been closed, the state of the input receiver transitions from Active directly * to CancellationProcessed (and this object will be deleted once the user * destroys their MultiplexChannel object). * * When the user destroys their MultiplexChannel object, the state of the input * receiver transitions from Active to CancellationTriggered. This object will * then be deleted once the input receiver transitions to the * CancellationProcessed state. */ template class MultiplexChannelProcessor : public IChannelCallback { private: using KeyType = typename detail::MultiplexerTraits::KeyType; using KeyContextType = typename detail::MultiplexerTraits::KeyContextType; using SubscriptionArgType = typename detail::MultiplexerTraits::SubscriptionArgType; using InputValueType = typename detail::MultiplexerTraits::InputValueType; using OutputValueType = typename detail::MultiplexerTraits::OutputValueType; public: explicit MultiplexChannelProcessor(MultiplexerType multiplexer) : multiplexer_(std::move(multiplexer)), totalSubscriptions_(0), pendingAsyncCalls_(0) {} /** * Starts multiplexing values from the input receiver to to one or more keyed * subscriptions. */ void start(Receiver inputReceiver) { executeWithMutexWhenReady( [this, inputReceiver = std::move(inputReceiver)]() mutable -> folly::coro::Task { co_await processStart(std::move(inputReceiver)); }); } Receiver subscribe( KeyType key, SubscriptionArgType subscriptionArg) { auto [receiver, sender] = Channel::create(); totalSubscriptions_.fetch_add(1); executeWithMutexWhenReady( [this, key = std::move(key), subscriptionArg = std::move(subscriptionArg), sender = std::move(sender)]() mutable -> folly::coro::Task { co_await processNewSubscription( std::move(key), std::move(subscriptionArg), std::move(sender)); }); return std::move(receiver); } folly::coro::Task>> clearUnusedSubscriptions() { auto [promise, future] = folly::coro::makePromiseContract< std::vector>>(); executeWithMutexWhenReady( [this, promise = std::move(promise)]() mutable -> folly::coro::Task { co_await processClearUnusedSubscriptions(std::move(promise)); }); return folly::coro::toTask(std::move(future)); } bool anySubscribers() { return totalSubscriptions_.load() > 0; } /** * This is called when the user's MultiplexChannel object has been destroyed. */ void destroyHandle(CloseResult closeResult) { executeWithMutexWhenReady( [this, closeResult = std::move(closeResult)]() mutable -> folly::coro::Task { co_await processHandleDestroyed(std::move(closeResult)); }); } private: /** * Called when the input receiver has an update. */ void consume(ChannelBridgeBase*) override { executeWithMutexWhenReady([this]() -> folly::coro::Task { co_await processAllAvailableValues(); }); } /** * Called after we cancelled this input receiver, due to the destruction of * the handle. */ void canceled(ChannelBridgeBase*) override { executeWithMutexWhenReady([this]() -> folly::coro::Task { auto closeResult = CloseResult(); // Declaring first due to GCC bug co_await processReceiverCancelled(std::move(closeResult)); }); } folly::coro::Task processStart(Receiver inputReceiver) { auto [unbufferedInputReceiver, buffer] = detail::receiverUnbuffer(std::move(inputReceiver)); receiver_ = std::move(unbufferedInputReceiver); // Start processing new values that come in from the input receiver. co_await processAllAvailableValues(std::move(buffer)); } /** * Processes all available values from the input receiver (starting from the * provided buffer, if present). * * If an value was received indicating that the input channel has been closed * (or if the transform function indicated that channel should be closed), we * will process cancellation for the input receiver. */ folly::coro::Task processAllAvailableValues( std::optional> buffer = std::nullopt) { CHECK_NE(getReceiverState(), ChannelState::CancellationProcessed); auto closeResult = receiver_->isReceiverCancelled() ? CloseResult() : (buffer.has_value() ? co_await processValues(std::move(buffer.value())) : std::nullopt); while (!closeResult.has_value()) { if (receiver_->receiverWait(this)) { // There are no more values available right now. We will stop processing // until the channel fires the consume() callback (indicating that more // values are available). break; } auto values = receiver_->receiverGetValues(); CHECK(!values.empty()); closeResult = co_await processValues(std::move(values)); } if (closeResult.has_value()) { // The receiver received a value indicating channel closure. receiver_->receiverCancel(); co_await processReceiverCancelled(std::move(closeResult.value())); } } /** * Processes the given set of values for the input receiver. Returns a * CloseResult if channel was closed, so the caller can stop attempting to * process values from it. */ folly::coro::Task> processValues( ReceiverQueue values) { while (!values.empty()) { auto inputResult = std::move(values.front()); values.pop(); bool inputClosed = !inputResult.hasValue(); auto subscriptions = MultiplexedSubscriptions(subscriptions_); if (inputClosed && !inputResult.hasException()) { // The input channel was closed. We will send an OnClosedException to // onInputValue. inputResult = folly::Try( folly::make_exception_wrapper()); } // Process the input value by calling onInputValue on the user's // multiplexer. auto onInputValueResult = co_await folly::coro::co_awaitTry( multiplexer_.onInputValue(std::move(inputResult), subscriptions)); // If the user closed any subscriptions, erase them from the subscriptions // map. for (const auto& key : subscriptions.closedSubscriptionKeys_) { subscriptions_.erase(key); } if (!subscriptions.closedSubscriptionKeys_.empty()) { totalSubscriptions_.fetch_sub( subscriptions.closedSubscriptionKeys_.size()); subscriptions.closedSubscriptionKeys_.clear(); } if (inputClosed && onInputValueResult.hasValue()) { // The input channel was closed, but the onInputValue function did not // throw. We need to close all output receivers. onInputValueResult = folly::Try( folly::make_exception_wrapper()); } if (!onInputValueResult.hasValue()) { co_return onInputValueResult.template hasException() ? CloseResult() : CloseResult(std::move(onInputValueResult.exception())); } } co_return std::nullopt; } /** * Processes the cancellation of the input receiver. We will close all * senders with the exception received from the input receiver (if any). */ folly::coro::Task processReceiverCancelled(CloseResult closeResult) { CHECK_EQ(getReceiverState(), ChannelState::CancellationTriggered); receiver_ = nullptr; closeAllSubscriptions(std::move(closeResult)); co_return; } folly::coro::Task processNewSubscription( KeyType key, SubscriptionArgType subscriptionArg, Sender newSender) { if (subscriptions_.contains(key)) { // We already had a subscription for this key. totalSubscriptions_.fetch_sub(1); } auto& [sender, context] = subscriptions_[key]; auto initialValues = co_await folly::coro::co_awaitTry(multiplexer_.onNewSubscription( key, context, std::move(subscriptionArg))); if (initialValues.hasException()) { std::move(newSender).close(initialValues.exception()); co_return; } for (auto& initialValue : initialValues.value()) { newSender.write(std::move(initialValue)); } sender.subscribe(std::move(newSender)); } folly::coro::Task processClearUnusedSubscriptions( folly::coro::Promise>> promise) { auto clearedSubscriptions = std::vector>(); size_t subscriptionsToRemove = 0; for (auto it = subscriptions_.begin(); it != subscriptions_.end();) { auto& sender = std::get>(it->second); if (!sender.anySubscribers()) { clearedSubscriptions.push_back(std::make_pair( it->first, std::move(std::get(it->second)))); it = subscriptions_.erase(it); subscriptionsToRemove++; } else { ++it; } } totalSubscriptions_.fetch_sub(subscriptionsToRemove); promise.setValue(std::move(clearedSubscriptions)); co_return; } /** * Processes the destruction of the user's MultiplexChannel object. We will * cancel the receiver and trigger cancellation for all senders not already * cancelled. */ folly::coro::Task processHandleDestroyed(CloseResult closeResult) { handleDeleted_ = true; if (getReceiverState() == ChannelState::Active) { receiver_->receiverCancel(); } closeAllSubscriptions(std::move(closeResult)); co_return; } /** * Deletes this object if we have already processed cancellation for the * receiver and all senders, and if the user's MultiplexChannel object was * destroyed. */ void maybeDelete(std::unique_lock& lock) { if (getReceiverState() == ChannelState::CancellationProcessed && handleDeleted_ && pendingAsyncCalls_ == 0) { lock.unlock(); delete this; } } void executeWithMutexWhenReady( folly::Function()> func) { pendingAsyncCalls_++; auto rateLimiter = multiplexer_.getRateLimiter(); if (rateLimiter != nullptr) { rateLimiter->executeWhenReady( [this, func = std::move(func), executor = multiplexer_.getExecutor()]( RateLimiter::Token token) mutable { folly::coro::co_invoke( [this, token = std::move(token), func = std::move(func)]() mutable -> folly::coro::Task { auto lock = co_await mutex_.co_scoped_lock(); co_await func(); pendingAsyncCalls_--; maybeDelete(lock); }) .scheduleOn(executor) .start(); }, multiplexer_.getExecutor()); } else { folly::coro::co_invoke( [this, func = std::move(func)]() mutable -> folly::coro::Task { auto lock = co_await mutex_.co_scoped_lock(); co_await func(); pendingAsyncCalls_--; maybeDelete(lock); }) .scheduleOn(multiplexer_.getExecutor()) .start(); } } ChannelState getReceiverState() { return detail::getReceiverState(receiver_.get()); } void closeAllSubscriptions(CloseResult closeResult) { for (auto& [key, subscription] : subscriptions_) { auto& sender = std::get>(subscription); std::move(sender).close( closeResult.exception.has_value() ? closeResult.exception.value() : folly::exception_wrapper()); } totalSubscriptions_.fetch_sub(subscriptions_.size()); subscriptions_.clear(); } using SubscriptionMap = folly::F14FastMap< KeyType, std::tuple, KeyContextType>>; coro::Mutex mutex_; // The above coro mutex must be acquired before accessing this state. ChannelBridgePtr receiver_; SubscriptionMap subscriptions_; bool handleDeleted_{false}; // The above coro mutex does not need to be acquired before accessing this // state. MultiplexerType multiplexer_; std::atomic totalSubscriptions_; // Includes pending subscriptions std::atomic pendingAsyncCalls_; }; } // namespace detail template MultiplexChannel createMultiplexChannel( MultiplexerType multiplexer, InputReceiverType inputReceiver) { auto* processor = new detail::MultiplexChannelProcessor( std::move(multiplexer)); processor->start(std::move(inputReceiver)); return MultiplexChannel(processor); } } // namespace channels } // namespace folly