/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include namespace folly { namespace channels { template Producer::KeepAlive::KeepAlive(Producer* ptr) : ptr_(ptr) {} template Producer::KeepAlive::~KeepAlive() { if (ptr_ && --ptr_->refCount_ == 0) { auto deleteTask = folly::coro::co_invoke([ptr = ptr_]() -> folly::coro::Task { delete ptr; co_return; }); std::move(deleteTask).scheduleOn(ptr_->getExecutor()).start(); } } template Producer::KeepAlive::KeepAlive( Producer::KeepAlive&& other) noexcept : ptr_(std::exchange(other.ptr_, nullptr)) {} template typename Producer::KeepAlive& Producer::KeepAlive::operator=( Producer::KeepAlive&& other) noexcept { if (&other == this) { return *this; } ptr_ = std::exchange(other.ptr_, nullptr); return *this; } template Producer::Producer( Sender sender, folly::Executor::KeepAlive executor) : sender_(std::move(detail::senderGetBridge(sender))), executor_(std::move(executor)) { CHECK(sender_->senderWait(this)); } template void Producer::write(TValue value) { executor_->add([this, value = std::move(value)]() mutable { sender_->senderPush(std::move(value)); }); } template void Producer::close(std::optional ex) { executor_->add([this, ex = std::move(ex)]() mutable { if (ex.has_value()) { sender_->senderClose(std::move(ex.value())); } else { sender_->senderClose(); } }); } template bool Producer::isClosed() { return sender_->isSenderClosed(); } template folly::Executor::KeepAlive Producer::getExecutor() { return executor_; } template typename Producer::KeepAlive Producer::getKeepAlive() { refCount_.fetch_add(1, std::memory_order_relaxed); return KeepAlive(this); } template void Producer::consume(detail::ChannelBridgeBase*) { onClosed().scheduleOn(getExecutor()).start([=](auto) { // Decrement ref count KeepAlive(this); }); } template void Producer::canceled(detail::ChannelBridgeBase* bridge) { consume(bridge); } namespace detail { template class ProducerImpl : public TProducer { template friend Receiver makeProducer( folly::Executor::KeepAlive executor, Args&&... args); public: using TProducer::TProducer; private: void ensureMakeProducerUsedForCreation() override {} }; } // namespace detail template Receiver makeProducer( folly::Executor::KeepAlive executor, Args&&... args) { using TValue = typename TProducer::ValueType; auto [receiver, sender] = Channel::create(); new detail::ProducerImpl( std::move(sender), std::move(executor), std::forward(args)...); return std::move(receiver); } } // namespace channels } // namespace folly