/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include namespace folly { namespace channels { /** * A Producer is a base class for an object that produces a channel. The * subclass can call write to write a new value to the channel, and close to * close the channel. It is a useful way to generate output values for a * receiver, without having to keep alive an extraneous object that produces * those values. * * When the consumer of the channel stops consuming, the onClosed function will * be called. The subclass should cancel any ongoing work in this function. * After onCancelled is called, the object will be deleted once the last * outstanding KeepAlive is destroyed. * * Example: * // Function that returns an executor * folly::Executor::KeepAlive getExecutor(); * * // Function that returns output values * std::vector getLatestOutputValues(); * * // Example producer implementation * class PollingProducer : public Producer { * public: * PollingProducer( * Sender sender, * folly::Executor::KeepAlive executor) * : Producer(std::move(sender), std::move(executor)) { * // Start polling for values. * folly::coro::co_withCancellation( * cancelSource_.getToken(), * [=, keepAlive = getKeepAlive()]() { * return pollForOutputValues(); * }) * .scheduleOn(getExecutor()) * .start(); * } * * folly::coro::Task onClosed() override { * // The consumer has stopped consuming our values. Stop polling. * cancelSource_.requestCancellation(); * } * * private: * folly::coro::Task pollForOutputValues() { * auto cancelToken = co_await folly::coro::co_current_cancellation_token; * while (!cancelToken.isCancellationRequested()) { * auto outputValues = getLatestOutputValues(); * for (auto& outputValue : outputValues) { * write(std::move(outputValue)); * } * } * co_await folly::coro::sleep(std::chrono::seconds(1)); * } * * folly::CancellationSource cancelSource_; * }; * * // Producer usage * Receiver receiver = makeProducer(getExecutor()); */ template class Producer : public detail::IChannelCallback { public: using ValueType = TValue; protected: /** * This object will ensure that the corresponding Producer that created it * will not be destroyed. */ class KeepAlive { public: ~KeepAlive(); KeepAlive(KeepAlive&&) noexcept; KeepAlive& operator=(KeepAlive&&) noexcept; private: friend class Producer; explicit KeepAlive(Producer* ptr); Producer* ptr_; }; Producer( Sender sender, folly::Executor::KeepAlive executor); virtual ~Producer() override = default; /** * Writes a value into the channel. */ void write(TValue value); /** * Closes the channel. */ void close(std::optional ex = std::nullopt); /** * Returns whether or not this producer is closed or cancelled. */ bool isClosed(); /** * Returns the executor used for this producer. */ folly::Executor::KeepAlive getExecutor(); /** * Returns a KeepAlive object. This object will not be destroyed before all * KeepAlive objects are destroyed. */ KeepAlive getKeepAlive(); /** * Called when the corresponding receiver is cancelled, or the sender is * closed. */ virtual folly::coro::Task onClosed() { co_return; } /** * If you get an error that this function is not implemented, do not * implement it. Instead, create your object with makeProducer * below. */ virtual void ensureMakeProducerUsedForCreation() = 0; private: template friend Receiver makeProducer( folly::Executor::KeepAlive executor, Args&&... args); void consume(detail::ChannelBridgeBase* bridge) override; void canceled(detail::ChannelBridgeBase* bridge) override; detail::ChannelBridgePtr sender_; folly::Executor::KeepAlive executor_; std::atomic refCount_{1}; }; /** * Creates a new object that extends the Producer class, and returns a receiver. * The receiver will receive any values produced by the producer. See the * description of the Producer class for information on how to implement a * producer. */ template Receiver makeProducer( folly::Executor::KeepAlive executor, Args&&... args); } // namespace channels } // namespace folly #include