/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include namespace folly { namespace channels { template MergeChannel::MergeChannel(TProcessor* processor) : processor_(processor) {} template MergeChannel::MergeChannel(MergeChannel&& other) noexcept : processor_(std::exchange(other.processor_, nullptr)) {} template MergeChannel& MergeChannel::operator=( MergeChannel&& other) noexcept { if (&other == this) { return *this; } if (processor_) { std::move(*this).close(); } processor_ = std::exchange(other.processor_, nullptr); return *this; } template MergeChannel::~MergeChannel() { if (processor_) { std::move(*this).close(std::nullopt /* ex */); } } template MergeChannel::operator bool() const { return processor_; } template template void MergeChannel::addNewReceiver( KeyType key, TReceiver receiver) { processor_->addNewReceiver(key, std::move(receiver)); } template void MergeChannel::removeReceiver(KeyType key) { processor_->removeReceiver(key); } template folly::F14FastSet MergeChannel::getReceiverKeys() { return processor_->getReceiverKeys(); } template void MergeChannel::close( std::optional ex) && { processor_->destroyHandle( ex.has_value() ? detail::CloseResult(std::move(ex.value())) : detail::CloseResult()); processor_ = nullptr; } namespace detail { template class IMergeChannelProcessor : public IChannelCallback { public: virtual void addNewReceiver(KeyType key, Receiver receiver) = 0; virtual void removeReceiver(KeyType key) = 0; virtual folly::F14FastSet getReceiverKeys() = 0; virtual void destroyHandle(CloseResult closeResult) = 0; }; /** * This object does the merging of values from the input receivers to the output * receiver. The lifetime of this object is described by the following state * machine. * * The sender and all active receivers can be in one of three conceptual states: * Active, CancellationTriggered, or CancellationProcessed (removed). When the * sender and all receivers reach the CancellationProcessed state AND the user's * MergeChannel object is deleted, this object is deleted. * * When an input receiver receives a value indicating that the channel has * been closed, the state of that receiver transitions from Active directly to * CancellationProcessed and the receiver is removed. * * If the receiver closed with an exception, the state of the sender and all * other receivers transitions from Active to CancellationTriggered. In that * case, once we receive callbacks indicating the cancellation signal has been * received for all other receivers and the sender, the state of the sender and * all other receivers transitions to CancellationProcessed (and this object * will be deleted once the user destroys their MergeChannel object). * * When the sender receives notification that the consumer of the output * receiver has stopped consuming, the state of the sender transitions from * Active directly to CancellationProcessed, and the state of all remaining * input receivers transitions from Active to CancellationTriggered. Once we * receive callbacks for all input receivers indicating that the cancellation * signal has been received, each such receiver is transitioned to the * CancellationProcessed state (and this object will be deleted once the user * destroys their MergeChannel object). * * When the user destroys their MergeChannel object, the state of the sender and * all remaining receivers transition from Active to CancellationTriggered. This * object will then be deleted once the sender and each remaining input receiver * transitions to the CancellationProcessed state (after we receive each * cancelled callback). */ template class MergeChannelProcessor : public IMergeChannelProcessor { private: struct State { explicit State( ChannelBridgePtr> _sender) : sender(std::move(_sender)) {} ChannelState getSenderState() { return detail::getSenderState(sender.get()); } // The output sender for the merge channel. ChannelBridgePtr> sender; // A non-owning map from key to receiver. folly::F14NodeMap*> receiversByKey; // The set of receivers that feed into this MergeChannel. This map "owns" // its receivers. MergeChannelProcessor must free any receiver removed from // this map. folly::F14NodeMap*, const KeyType*> receivers; // Whether or not the handle to the MergeChannel has been destroyed. bool handleDestroyed{false}; }; using WLockedStatePtr = typename folly::Synchronized::WLockedPtr; public: MergeChannelProcessor( Sender> sender, folly::Executor::KeepAlive executor) : executor_(std::move(executor)), state_(State(std::move(detail::senderGetBridge(sender)))) { auto state = state_.wlock(); CHECK(state->sender->senderWait(this)); } /** * Adds a new receiver to be merged, along with a key to allow for later * removal. */ void addNewReceiver(KeyType key, Receiver receiver) { auto state = state_.wlock(); if (state->getSenderState() != ChannelState::Active) { return; } auto [unbufferedReceiver, buffer] = detail::receiverUnbuffer(std::move(receiver)); auto existingReceiverIt = state->receiversByKey.find(key); if (existingReceiverIt != state->receiversByKey.end()) { CHECK(state->receivers.contains(existingReceiverIt->second)); if (!existingReceiverIt->second->isReceiverCancelled()) { // We already have a receiver with the given key. Trigger cancellation // on that previous receiver. existingReceiverIt->second->receiverCancel(); } auto keyToRemove = existingReceiverIt->first; state->receivers[existingReceiverIt->second] = nullptr; state->receiversByKey.erase(existingReceiverIt); state->sender->senderPush(MergeChannelEvent{ keyToRemove, MergeChannelReceiverRemoved{}}); } auto [it, _] = state->receiversByKey.insert( std::make_pair(key, unbufferedReceiver.get())); auto* receiverPtr = unbufferedReceiver.get(); state->receivers.insert( std::make_pair(unbufferedReceiver.release(), &it->first)); state->sender->senderPush(MergeChannelEvent{ key, MergeChannelReceiverAdded{}}); processAllAvailableValues(state, receiverPtr, std::move(buffer)); } /** * Removes the receiver with the given key. */ void removeReceiver(KeyType key) { auto state = state_.wlock(); if (state->getSenderState() != ChannelState::Active) { return; } auto receiverIt = state->receiversByKey.find(key); if (receiverIt == state->receiversByKey.end()) { return; } CHECK(state->receivers.contains(receiverIt->second)); if (!receiverIt->second->isReceiverCancelled()) { receiverIt->second->receiverCancel(); } auto keyToRemove = receiverIt->first; state->receivers[receiverIt->second] = nullptr; state->receiversByKey.erase(receiverIt); state->sender->senderPush(MergeChannelEvent{ keyToRemove, MergeChannelReceiverRemoved{}}); } folly::F14FastSet getReceiverKeys() { auto state = state_.rlock(); auto receiverKeys = folly::F14FastSet(); receiverKeys.reserve(state->receiversByKey.size()); for (const auto& [key, _] : state->receiversByKey) { receiverKeys.insert(key); } return receiverKeys; } /** * Called when the user's MergeChannel object is destroyed. */ void destroyHandle(CloseResult closeResult) { auto state = state_.wlock(); processHandleDestroyed(state, std::move(closeResult)); } /** * Called when one of the channels we are listening to has an update (either * a value from an input receiver or a cancellation from the output receiver). */ void consume(ChannelBridgeBase* bridge) override { executor_->add([=]() { auto state = state_.wlock(); if (bridge == state->sender.get()) { // The consumer of the output receiver has stopped consuming. CHECK(state->getSenderState() != ChannelState::CancellationProcessed); state->sender->senderClose(); processSenderCancelled(state); } else { // One or more values are now available from an input receiver. auto* receiver = static_cast*>(bridge); CHECK( getReceiverState(receiver) != ChannelState::CancellationProcessed); processAllAvailableValues(state, receiver); } }); } /** * Called after we cancelled one of the channels we were listening to (either * the sender or an input receiver). */ void canceled(ChannelBridgeBase* bridge) override { executor_->add([=]() { auto state = state_.wlock(); if (bridge == state->sender.get()) { // We previously cancelled the sender due to an input receiver closure // with an exception (or the closure of all input receivers without an // exception). Process the cancellation for the sender. CHECK(state->getSenderState() == ChannelState::CancellationTriggered); processSenderCancelled(state); } else { // We previously cancelled this input receiver, either because the // consumer of the output receiver stopped consuming or because another // input receiver received an exception. Process the cancellation for // this input receiver. auto* receiver = static_cast*>(bridge); processReceiverCancelled(state, receiver, CloseResult()); } }); } protected: /** * Processes all available values from the given input receiver channel * (starting from the provided buffer, if present). * * If an value was received indicating that the input channel has been closed * (or if the transform function indicated that channel should be closed), we * will process cancellation for the input receiver. */ void processAllAvailableValues( WLockedStatePtr& state, ChannelBridge* receiver, std::optional> buffer = std::nullopt) { CHECK(state->receivers.contains(receiver)); const auto* key = state->receivers.at(receiver); auto closeResult = receiver->isReceiverCancelled() ? CloseResult() : (buffer.has_value() ? processValues(state, std::move(buffer.value()), key) : std::nullopt); while (!closeResult.has_value()) { if (receiver->receiverWait(this)) { // There are no more values available right now. We will stop processing // until the channel fires the consume() callback (indicating that more // values are available). break; } auto values = receiver->receiverGetValues(); CHECK(!values.empty()); closeResult = processValues(state, std::move(values), key); } if (closeResult.has_value()) { // The receiver received a value indicating channel closure. receiver->receiverCancel(); processReceiverCancelled(state, receiver, std::move(closeResult.value())); } } /** * Processes the given set of values for an input receiver. Returns a * CloseResult if the given channel was closed, so the caller can stop * attempting to process values from it. */ std::optional processValues( WLockedStatePtr& state, ReceiverQueue values, const KeyType* key) { while (!values.empty()) { auto inputResult = std::move(values.front()); values.pop(); if (inputResult.hasValue()) { // We have received a normal value from an input receiver. Write it to // the output receiver. state->sender->senderPush(MergeChannelEvent{ *key, std::move(inputResult.value())}); } else { // The input receiver was closed. return inputResult.hasException() ? CloseResult(std::move(inputResult.exception())) : CloseResult(); } } return std::nullopt; } /** * Processes the cancellation of an input receiver. */ void processReceiverCancelled( WLockedStatePtr& state, ChannelBridge* receiver, CloseResult closeResult) { CHECK(getReceiverState(receiver) == ChannelState::CancellationTriggered); auto* key = state->receivers.at(receiver); if (key != nullptr) { auto keyToRemove = *key; CHECK_EQ(state->receiversByKey.erase(keyToRemove), 1); if (state->getSenderState() == ChannelState::Active) { state->sender->senderPush(MergeChannelEvent{ keyToRemove, MergeChannelReceiverClosed{ closeResult.exception.has_value() ? std::move(closeResult.exception.value()) : folly::exception_wrapper()}}); } } state->receivers.erase(receiver); (ChannelBridgePtr(receiver)); maybeDelete(state); } /** * Processes the cancellation of the sender (indicating that the consumer of * the output receiver has stopped consuming). We will trigger cancellation * for all input receivers not already cancelled. */ void processSenderCancelled(WLockedStatePtr& state) { CHECK(state->getSenderState() == ChannelState::CancellationTriggered); state->sender = nullptr; for (auto [receiver, _] : state->receivers) { if (getReceiverState(receiver) == ChannelState::Active) { receiver->receiverCancel(); } } maybeDelete(state); } /** * Processes the destruction of the user's MergeChannel object. We will * close the sender and trigger cancellation for all input receivers not * already cancelled. */ void processHandleDestroyed(WLockedStatePtr& state, CloseResult closeResult) { CHECK(!state->handleDestroyed); state->handleDestroyed = true; if (state->getSenderState() == ChannelState::Active) { for (auto [key, receiver] : state->receiversByKey) { state->receivers[receiver] = nullptr; state->sender->senderPush(MergeChannelEvent{ key, MergeChannelReceiverRemoved{}}); } if (closeResult.exception.has_value()) { state->sender->senderClose(std::move(closeResult.exception.value())); } else { state->sender->senderClose(); } } for (auto [receiver, _] : state->receivers) { if (getReceiverState(receiver) == ChannelState::Active) { receiver->receiverCancel(); } } maybeDelete(state); } /** * Deletes this object if we have already processed cancellation for the * sender and all input receivers, and if the user's MergeChannel object was * destroyed. */ void maybeDelete(WLockedStatePtr& state) { if (state->getSenderState() == ChannelState::CancellationProcessed && state->receivers.empty() && state->handleDestroyed) { state.unlock(); delete this; } } ChannelState getReceiverState(ChannelBridge* receiver) { return detail::getReceiverState(receiver); } folly::Executor::KeepAlive executor_; folly::Synchronized state_; }; } // namespace detail template std::pair< Receiver>, MergeChannel> createMergeChannel( folly::Executor::KeepAlive executor) { auto [receiver, sender] = Channel>::create(); auto* processor = new detail::MergeChannelProcessor( std::move(sender), std::move(executor)); return std::make_pair( std::move(receiver), MergeChannel(processor)); } } // namespace channels } // namespace folly