/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include namespace folly { namespace channels { template FanoutChannel::FanoutChannel(TProcessor* processor) : processor_(processor) {} template FanoutChannel::FanoutChannel( FanoutChannel&& other) noexcept : processor_(std::exchange(other.processor_, nullptr)) {} template FanoutChannel& FanoutChannel::operator=( FanoutChannel&& other) noexcept { if (&other == this) { return *this; } if (processor_) { std::move(*this).close(); } processor_ = std::exchange(other.processor_, nullptr); return *this; } template FanoutChannel::~FanoutChannel() { if (processor_ != nullptr) { std::move(*this).close(folly::exception_wrapper()); } } template FanoutChannel::operator bool() const { return processor_ != nullptr; } template Receiver FanoutChannel::subscribe( folly::Function(const ContextType&)> getInitialValues) { return processor_->subscribe(std::move(getInitialValues)); } template bool FanoutChannel::anySubscribers() const { return processor_->anySubscribers(); } template void FanoutChannel::closeSubscribers( folly::exception_wrapper ex) { processor_->closeSubscribers( ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult()); } template void FanoutChannel::close( folly::exception_wrapper ex) && { processor_->destroyHandle( ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult()); processor_ = nullptr; } namespace detail { template class IFanoutChannelProcessor : public IChannelCallback { public: virtual Receiver subscribe( folly::Function(const ContextType&)> getInitialValues) = 0; virtual bool anySubscribers() = 0; virtual void closeSubscribers(CloseResult closeResult) = 0; virtual void destroyHandle(CloseResult closeResult) = 0; }; /** * This object fans out values from the input receiver to all output receivers. * The lifetime of this object is described by the following state machine. * * The input receiver can be in one of three conceptual states: Active, * CancellationTriggered, or CancellationProcessed (removed). When the input * receiver reaches the CancellationProcessed state AND the user's FanoutChannel * object is deleted, this object is deleted. * * When an input receiver receives a value indicating that the channel has * been closed, the state of the input receiver transitions from Active directly * to CancellationProcessed (and this object will be deleted once the user * destroys their FanoutChannel object). * * When the user destroys their FanoutChannel object, the state of the input * receiver transitions from Active to CancellationTriggered. This object will * then be deleted once the input receiver transitions to the * CancellationProcessed state. */ template class FanoutChannelProcessor : public IFanoutChannelProcessor { private: struct State { State(ContextType _context) : context(std::move(_context)) {} ChannelState getReceiverState() { return detail::getReceiverState(receiver.get()); } ChannelBridgePtr receiver; FanoutSender fanoutSender; ContextType context; bool handleDeleted{false}; }; using WLockedStatePtr = typename folly::Synchronized::WLockedPtr; public: explicit FanoutChannelProcessor( folly::Executor::KeepAlive executor, ContextType context) : executor_(std::move(executor)), state_(std::move(context)) {} /** * Starts fanning out values from the input receiver to all output receivers. * * @param inputReceiver: The input receiver to fan out values from. */ void start(Receiver inputReceiver) { auto state = state_.wlock(); auto [unbufferedInputReceiver, buffer] = detail::receiverUnbuffer(std::move(inputReceiver)); state->receiver = std::move(unbufferedInputReceiver); // Start processing new values that come in from the input receiver. processAllAvailableValues(state, std::move(buffer)); } /** * Returns a new output receiver that will receive all values from the input * receiver. If a getInitialValues parameter is provided, it will be executed * to determine the set of initial values that will (only) go to the new input * receiver. */ Receiver subscribe( folly::Function(const ContextType&)> getInitialValues) override { auto state = state_.wlock(); auto initialValues = getInitialValues ? getInitialValues(state->context) : std::vector(); if (!state->receiver) { auto [receiver, sender] = Channel::create(); for (auto&& value : initialValues) { sender.write(std::move(value)); } std::move(sender).close(); return std::move(receiver); } return state->fanoutSender.subscribe(std::move(initialValues)); } /** * Closes all subscribers without closing the fanout channel. */ void closeSubscribers(CloseResult closeResult) { auto state = state_.wlock(); std::move(state->fanoutSender) .close( closeResult.exception.has_value() ? closeResult.exception.value() : folly::exception_wrapper()); } /** * This is called when the user's FanoutChannel object has been destroyed. */ void destroyHandle(CloseResult closeResult) { auto state = state_.wlock(); processHandleDestroyed(state, std::move(closeResult)); } /** * Returns whether this fanout channel has any output receivers. */ bool anySubscribers() override { return state_.wlock()->fanoutSender.anySubscribers(); } private: /** * Called when one of the channels we are listening to has an update (either * a value from the input receiver or a cancellation from an output receiver). */ void consume(ChannelBridgeBase*) override { executor_->add([=]() { // One or more values are now available from the input receiver. auto state = state_.wlock(); CHECK_NE(state->getReceiverState(), ChannelState::CancellationProcessed); processAllAvailableValues(state); }); } void canceled(ChannelBridgeBase*) override { executor_->add([=]() { // We previously cancelled this input receiver, due to the destruction of // the handle. Process the cancellation for this input receiver. auto state = state_.wlock(); processReceiverCancelled(state, CloseResult()); }); } /** * Processes all available values from the input receiver (starting from the * provided buffer, if present). * * If an value was received indicating that the input channel has been closed * (or if the transform function indicated that channel should be closed), we * will process cancellation for the input receiver. */ void processAllAvailableValues( WLockedStatePtr& state, std::optional> buffer = std::nullopt) { auto closeResult = state->receiver->isReceiverCancelled() ? CloseResult() : (buffer.has_value() ? processValues(state, std::move(buffer.value())) : std::nullopt); while (!closeResult.has_value()) { if (state->receiver->receiverWait(this)) { // There are no more values available right now. We will stop processing // until the channel fires the consume() callback (indicating that more // values are available). break; } auto values = state->receiver->receiverGetValues(); CHECK(!values.empty()); closeResult = processValues(state, std::move(values)); } if (closeResult.has_value()) { // The receiver received a value indicating channel closure. state->receiver->receiverCancel(); processReceiverCancelled(state, std::move(closeResult.value())); } } /** * Processes the given set of values for the input receiver. Returns a * CloseResult if channel was closed, so the caller can stop attempting to * process values from it. */ std::optional processValues( WLockedStatePtr& state, ReceiverQueue values) { while (!values.empty()) { auto inputResult = std::move(values.front()); values.pop(); if (inputResult.hasValue()) { // We have received a normal value from the input receiver. Write it to // all output senders. state->context.update(inputResult.value()); state->fanoutSender.write(std::move(inputResult.value())); } else { // The input receiver was closed. return inputResult.hasException() ? CloseResult(std::move(inputResult.exception())) : CloseResult(); } } return std::nullopt; } /** * Processes the cancellation of the input receiver. We will close all senders * with the exception received from the input receiver (if any). */ void processReceiverCancelled( WLockedStatePtr& state, CloseResult closeResult) { CHECK_EQ(state->getReceiverState(), ChannelState::CancellationTriggered); state->receiver = nullptr; std::move(state->fanoutSender) .close( closeResult.exception.has_value() ? closeResult.exception.value() : folly::exception_wrapper()); maybeDelete(state); } /** * Processes the destruction of the user's FanoutChannel object. We will * cancel the receiver and trigger cancellation for all senders not already * cancelled. */ void processHandleDestroyed(WLockedStatePtr& state, CloseResult closeResult) { state->handleDeleted = true; if (state->getReceiverState() == ChannelState::Active) { state->receiver->receiverCancel(); } std::move(state->fanoutSender) .close( closeResult.exception.has_value() ? closeResult.exception.value() : folly::exception_wrapper()); maybeDelete(state); } /** * Deletes this object if we have already processed cancellation for the * receiver and all senders, and if the user's FanoutChannel object was * destroyed. */ void maybeDelete(WLockedStatePtr& state) { if (state->getReceiverState() == ChannelState::CancellationProcessed && state->handleDeleted) { state.unlock(); delete this; } } folly::Executor::KeepAlive executor_; folly::Synchronized state_; }; } // namespace detail template FanoutChannel createFanoutChannel( TReceiver inputReceiver, folly::Executor::KeepAlive executor, ContextType context) { auto* processor = new detail::FanoutChannelProcessor( std::move(executor), std::move(context)); processor->start(std::move(inputReceiver)); return FanoutChannel(processor); } } // namespace channels } // namespace folly