/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include namespace folly { namespace channels { template class MultiplexChannel; /** * Creates a new multiplex channel that multiplexes updates from a single input * receiver to one or more keyed subscriptions. * * The creator of a multiplex channel must pass a Multiplexer class that * implements the following functions: * * folly::Executor::KeepAlive getExecutor(); * * std::shared_ptr getRateLimiter(); // Can return nullptr * * // This function is called for each call to subscribe on the multiplex * // channel object. It returns a vector of output values that should be sent * // directly to the new output receiver for that subscription. * folly::coro::Task> onNewSubscription( * KeyType key, * KeyContextType& keyContext, * SubscriptionArgType subscriptionArg); * * // This function is called with each value fromm the given input receiver. * // This function sends any corresponding values to the relevant output * // receivers, using the subscriptions parameter. * folly::coro::Task onInputValue( * folly::Try inputValue, * MultiplexedSubscriptions& subscriptions); * * Example: * * struct InputValue { * std::string key; * int64_t value; * }; * * struct NoContext {} * struct NoSubscriptionArg {}; * * class Multiplexer { * public: * explicit Multiplexer( * folly::Executor::KeepAlive executor) * : executor_(std::move(executor)) {} * * folly::Executor::KeepAlive getExecutor() { * return executor_; * } * * std::shared_ptr getRateLimiter() { * return nullptr; // No rate limiting * } * * folly::coro::Task> onNewSubscription( * std::string key, * NoContext&, * NoSubscriptionArg&) { * co_return std::vector(); // No initial values * } * * folly::coro::Task onInputValue( * folly::Try inputValue, * MultiplexedSubscriptions& subscriptions) { * if (subscriptions.hasSubscription(inputValue->key)) { * subscriptions.write(inputValue->key, inputValue->value); * } * co_return; * } * * private: * folly::Executor::KeepAlive executor_; * } * * // Function that returns a receiver: * Receiver getInputReceiver(); * * // Function that returns an executor * folly::Executor::KeepAlive getExecutor(); * * auto multiplexChannel = createMultiplexChannel( * Multiplexer(getExecutor()), * getInputReceiver()); * * auto receiver1a = multiplexChannel.subscribe("one"); * auto receiver1b = multiplexChannel.subscribe("one"); * auto receiver2a = multiplexChannel.subscribe("two"); */ template MultiplexChannel createMultiplexChannel( MultiplexerType multiplexer, InputReceiverType inputReceiver); namespace detail { template class MultiplexChannelProcessor; } // namespace detail /** * A multiplex channel allows multiplexing updates from a single input receiver * to one or more keyed subscriptions. */ template class MultiplexChannel { using TProcessor = detail::MultiplexChannelProcessor; using KeyType = typename detail::MultiplexerTraits::KeyType; using KeyContextType = typename detail::MultiplexerTraits::KeyContextType; using SubscriptionArgType = typename detail::MultiplexerTraits::SubscriptionArgType; using OutputValueType = typename detail::MultiplexerTraits::OutputValueType; public: MultiplexChannel(MultiplexChannel&& other) noexcept; MultiplexChannel& operator=(MultiplexChannel&& other) noexcept; ~MultiplexChannel(); /** * Returns whether this MultiplexChannel is a valid object. This will return * false if the object was moved from. */ explicit operator bool() const; /** * Returns a new output receiver for the given key. */ Receiver subscribe( KeyType key, SubscriptionArgType subscriptionArg); /** * Removes keys with no subscribers, and returns their contexts. */ folly::coro::Task>> clearUnusedSubscriptions(); /** * Returns whether this multiplex channel has any subscribers for any keys. * Note that if any output receivers returned from subscribe have been * destroyed, such subscriptions will still be considered to exist until the * clearUnusedSubscriptions function is called. */ bool anySubscribers() const; /** * Closes the multiplex channel. */ void close(folly::exception_wrapper ex = folly::exception_wrapper()) &&; private: template friend MultiplexChannel createMultiplexChannel( Multiplexer, InputValueType); explicit MultiplexChannel(TProcessor* processor); TProcessor* processor_; }; /** * A class that allows one to see which keys are subscribed, and to write * values for particular subscriptions. This is passed to the onInputValue * function of the user-provided Multiplexer class. */ template class MultiplexedSubscriptions { public: using KeyType = typename detail::MultiplexerTraits::KeyType; using KeyContextType = typename detail::MultiplexerTraits::KeyContextType; using OutputValueType = typename detail::MultiplexerTraits::OutputValueType; friend class detail::MultiplexChannelProcessor; /** * Returns whether or not a subscription exists for the given key. */ bool hasSubscription(const KeyType& key); /** * Returns a reference to the context object for the given key. */ KeyContextType& getKeyContext(const KeyType& key); /** * Sends a value to all subscribers of a given key. */ template void write(const KeyType& key, U&& value); /** * Closes all subscribers for the given key. */ void close(const KeyType& key, folly::exception_wrapper ex); /** * Returns a view containing a list of subscribed keys. */ auto getAllSubscriptionKeys() { return subscriptions_ | ranges::views::keys; } private: using SubscriptionMap = folly::F14FastMap< KeyType, std::tuple, KeyContextType>>&; explicit MultiplexedSubscriptions(SubscriptionMap& subscriptions); void ensureKeyExists(const KeyType& key); SubscriptionMap& subscriptions_; folly::F14FastSet closedSubscriptionKeys_; }; } // namespace channels } // namespace folly #include