/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include namespace folly { namespace channels { namespace detail { class ChannelBridgeBase {}; class IChannelCallback { public: virtual ~IChannelCallback() = default; virtual void consume(ChannelBridgeBase* bridge) = 0; virtual void canceled(ChannelBridgeBase* bridge) = 0; }; using SenderQueue = typename folly::channels::detail::Queue; template using ReceiverQueue = typename folly::channels::detail::Queue>; template class ChannelBridge : public ChannelBridgeBase { public: struct Deleter { void operator()(ChannelBridge* ptr) { ptr->decref(); } }; using Ptr = std::unique_ptr, Deleter>; static Ptr create() { return Ptr(new ChannelBridge()); } Ptr copy() { auto refCount = refCount_.fetch_add(1, std::memory_order_relaxed); DCHECK(refCount > 0); return Ptr(this); } // These should only be called from the sender thread template void senderPush(U&& value) { receiverQueue_.push( folly::Try(std::forward(value)), static_cast(this)); } bool senderWait(IChannelCallback* callback) { return senderQueue_.wait(callback, static_cast(this)); } IChannelCallback* cancelSenderWait() { return senderQueue_.cancelCallback(); } void senderClose() { if (!isSenderClosed()) { receiverQueue_.push( folly::Try(), static_cast(this)); senderQueue_.close(static_cast(this)); } } void senderClose(folly::exception_wrapper ex) { if (!isSenderClosed()) { receiverQueue_.push( folly::Try(std::move(ex)), static_cast(this)); senderQueue_.close(static_cast(this)); } } bool isSenderClosed() { return senderQueue_.isClosed(); } SenderQueue senderGetValues() { return senderQueue_.getMessages(static_cast(this)); } // These should only be called from the receiver thread void receiverCancel() { if (!isReceiverCancelled()) { senderQueue_.push(folly::Unit(), static_cast(this)); receiverQueue_.close(static_cast(this)); } } bool isReceiverCancelled() { return receiverQueue_.isClosed(); } bool receiverWait(IChannelCallback* callback) { return receiverQueue_.wait(callback, static_cast(this)); } IChannelCallback* cancelReceiverWait() { return receiverQueue_.cancelCallback(); } ReceiverQueue receiverGetValues() { return receiverQueue_.getMessages(static_cast(this)); } private: using ReceiverAtomicQueue = typename folly::channels::detail:: AtomicQueue>; using SenderAtomicQueue = typename folly::channels::detail:: AtomicQueue; void decref() { if (refCount_.fetch_sub(1, std::memory_order_acq_rel) == 1) { delete this; } } ReceiverAtomicQueue receiverQueue_; SenderAtomicQueue senderQueue_; std::atomic refCount_{1}; }; template using ChannelBridgePtr = typename ChannelBridge::Ptr; } // namespace detail } // namespace channels } // namespace folly