/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace folly { class Timekeeper; namespace futures { namespace detail { typedef folly::fibers::Baton FutureBatonType; } // namespace detail } // namespace futures namespace detail { std::shared_ptr getTimekeeperSingleton(); } // namespace detail namespace futures { namespace detail { // InvokeResultWrapper and wrapInvoke enable wrapping a result value in its // nearest Future-type counterpart capable of also carrying an exception. // e.g. // (semi)Future -> (semi)Future (no change) // Try -> Try (no change) // void -> Try // T -> Try template struct InvokeResultWrapperBase { template static T wrapResult(F fn) { return T(fn()); } static T wrapException(exception_wrapper&& e) { return T(std::move(e)); } }; template struct InvokeResultWrapper : InvokeResultWrapperBase> {}; template struct InvokeResultWrapper> : InvokeResultWrapperBase> {}; template struct InvokeResultWrapper> : InvokeResultWrapperBase> {}; template struct InvokeResultWrapper> : InvokeResultWrapperBase> {}; template <> struct InvokeResultWrapper : InvokeResultWrapperBase> { template static Try wrapResult(F fn) { fn(); return Try(unit); } }; template auto wrapInvoke(folly::Try&& t, F&& f) { auto fn = [&]() { return static_cast(f)( t.template get< false, typename futures::detail::valueCallableResult::FirstArg>()); }; using FnResult = decltype(fn()); using Wrapper = InvokeResultWrapper; if (t.hasException()) { return Wrapper::wrapException(std::move(t).exception()); } return Wrapper::wrapResult(fn); } // Guarantees that the stored functor is destructed before the stored promise // may be fulfilled. Assumes the stored functor to be noexcept-destructible. template class CoreCallbackState { using DF = std::decay_t; public: CoreCallbackState(Promise&& promise, F&& func) noexcept( noexcept(DF(std::declval()))) : func_(static_cast(func)), promise_(std::move(promise)) { assert(before_barrier()); } CoreCallbackState(CoreCallbackState&& that) noexcept( noexcept(DF(std::declval()))) { if (that.before_barrier()) { new (&func_) DF(static_cast(that.func_)); promise_ = that.stealPromise(); } } CoreCallbackState& operator=(CoreCallbackState&&) = delete; ~CoreCallbackState() { if (before_barrier()) { stealPromise(); } } template auto invoke(Args&&... args) noexcept( noexcept(std::declval()(std::declval()...))) { assert(before_barrier()); return static_cast(func_)(static_cast(args)...); } template auto tryInvoke(Args&&... args) noexcept { return makeTryWith([&] { return invoke(static_cast(args)...); }); } void setTry(Executor::KeepAlive<>&& keepAlive, Try&& t) { stealPromise().setTry(std::move(keepAlive), std::move(t)); } void setException(Executor::KeepAlive<>&& keepAlive, exception_wrapper&& ew) { setTry(std::move(keepAlive), Try(std::move(ew))); } Promise stealPromise() noexcept { assert(before_barrier()); func_.~DF(); return std::move(promise_); } private: bool before_barrier() const noexcept { return !promise_.isFulfilled(); } union { DF func_; }; Promise promise_{Promise::makeEmpty()}; }; template auto makeCoreCallbackState(Promise&& p, F&& f) noexcept( noexcept(CoreCallbackState( std::declval&&>(), std::declval()))) { return CoreCallbackState(std::move(p), static_cast(f)); } template auto makeCoreCallbackState(Promise&& p, R (&f)(Args...)) noexcept { return CoreCallbackState(std::move(p), &f); } template FutureBase::FutureBase(SemiFuture&& other) noexcept : core_(other.core_) { other.core_ = nullptr; } template FutureBase::FutureBase(Future&& other) noexcept : core_(other.core_) { other.core_ = nullptr; } template template FutureBase::FutureBase(T2&& val) : core_(Core::make(Try(static_cast(val)))) {} template template FutureBase::FutureBase( typename std::enable_if::value>::type*) : core_(Core::make(Try(T()))) {} template void FutureBase::assign(FutureBase&& other) noexcept { detach(); core_ = std::exchange(other.core_, nullptr); } template FutureBase::~FutureBase() { detach(); } template T& FutureBase::value() & { return result().value(); } template T const& FutureBase::value() const& { return result().value(); } template T&& FutureBase::value() && { return std::move(result().value()); } template T const&& FutureBase::value() const&& { return std::move(result().value()); } template Try& FutureBase::result() & { return getCoreTryChecked(); } template Try const& FutureBase::result() const& { return getCoreTryChecked(); } template Try&& FutureBase::result() && { return std::move(getCoreTryChecked()); } template Try const&& FutureBase::result() const&& { return std::move(getCoreTryChecked()); } template bool FutureBase::isReady() const { return getCore().hasResult(); } template bool FutureBase::hasValue() const { return result().hasValue(); } template bool FutureBase::hasException() const { return result().hasException(); } template void FutureBase::detach() { if (core_) { core_->detachFuture(); core_ = nullptr; } } template void FutureBase::throwIfInvalid() const { if (!core_) { throw_exception(); } } template void FutureBase::throwIfContinued() const { if (!core_ || core_->hasCallback()) { throw_exception(); } } template Optional> FutureBase::poll() { auto& core = getCore(); return core.hasResult() ? Optional>(std::move(core.getTry())) : Optional>(); } template void FutureBase::raise(exception_wrapper exception) { getCore().raise(std::move(exception)); } template template void FutureBase::setCallback_( F&& func, futures::detail::InlineContinuation allowInline) { throwIfContinued(); getCore().setCallback( static_cast(func), RequestContext::saveContext(), allowInline); } template FutureBase::FutureBase(futures::detail::EmptyConstruct) noexcept : core_(nullptr) {} // MSVC 2017 Update 7 released with a bug that causes issues expanding to an // empty parameter pack when invoking a templated member function. It should // be fixed for MSVC 2017 Update 8. // TODO: Remove. namespace detail_msvc_15_7_workaround { template using IfArgsSizeIs = std::enable_if_t; template = 0> decltype(auto) invoke( R, State& state, Executor::KeepAlive<>&&, Try&& /* t */) { return state.invoke(); } template = 0> decltype(auto) invoke(R, State& state, Executor::KeepAlive<>&& ka, Try&& t) { using Arg1 = typename R::Arg::ArgList::Tail::FirstArg; return state.invoke( std::move(ka), std::move(t).template get()); } template = 0> decltype(auto) tryInvoke( R, State& state, Executor::KeepAlive<>&&, Try&& /* t */) { return state.tryInvoke(); } template = 0> decltype(auto) tryInvoke( R, State& state, Executor::KeepAlive<>&& ka, Try&& t) { using Arg1 = typename R::Arg::ArgList::Tail::FirstArg; return state.tryInvoke( std::move(ka), std::move(t).template get()); } } // namespace detail_msvc_15_7_workaround // then // Variant: returns a value // e.g. f.then([](Try&& t){ return t.value(); }); template template typename std::enable_if::type FutureBase::thenImplementation( F&& func, R, futures::detail::InlineContinuation allowInline) { static_assert(R::Arg::ArgsSize::value == 2, "Then must take two arguments"); typedef typename R::ReturnsFuture::Inner B; Promise p; p.core_->initCopyInterruptHandlerFrom(this->getCore()); // grab the Future now before we lose our handle on the Promise auto sf = p.getSemiFuture(); sf.setExecutor(folly::Executor::KeepAlive<>{this->getExecutor()}); auto f = Future(sf.core_); sf.core_ = nullptr; this->setCallback_( [state = futures::detail::makeCoreCallbackState( std::move(p), static_cast(func))]( Executor::KeepAlive<>&& ka, Try&& t) mutable { if (!R::Arg::isTry() && t.hasException()) { state.setException(std::move(ka), std::move(t.exception())); } else { auto propagateKA = ka.copy(); state.setTry(std::move(propagateKA), makeTryWith([&] { return detail_msvc_15_7_workaround::invoke( R{}, state, std::move(ka), std::move(t)); })); } }, allowInline); return f; } // Pass through a simple future as it needs no deferral adaptation template Future chainExecutor(Executor::KeepAlive<>, Future&& f) { return std::move(f); } // Correctly chain a SemiFuture for deferral template Future chainExecutor(Executor::KeepAlive<> e, SemiFuture&& f) { if (!e) { e = folly::getKeepAliveToken(InlineExecutor::instance()); } return std::move(f).via(e); } // Variant: returns a Future // e.g. f.then([](T&& t){ return makeFuture(t); }); template template typename std::enable_if::type FutureBase::thenImplementation( F&& func, R, futures::detail::InlineContinuation allowInline) { static_assert(R::Arg::ArgsSize::value == 2, "Then must take two arguments"); typedef typename R::ReturnsFuture::Inner B; Promise p; p.core_->initCopyInterruptHandlerFrom(this->getCore()); // grab the Future now before we lose our handle on the Promise auto sf = p.getSemiFuture(); auto e = getKeepAliveToken(this->getExecutor()); sf.setExecutor(std::move(e)); auto f = Future(sf.core_); sf.core_ = nullptr; this->setCallback_( [state = futures::detail::makeCoreCallbackState( std::move(p), static_cast(func))]( Executor::KeepAlive<>&& ka, Try&& t) mutable { if (!R::Arg::isTry() && t.hasException()) { state.setException(std::move(ka), std::move(t.exception())); } else { // Ensure that if function returned a SemiFuture we correctly chain // potential deferral. auto tf2 = detail_msvc_15_7_workaround::tryInvoke( R{}, state, ka.copy(), std::move(t)); if (tf2.hasException()) { state.setException(std::move(ka), std::move(tf2.exception())); } else { auto statePromise = state.stealPromise(); auto tf3 = chainExecutor(std::move(ka), *std::move(tf2)); std::exchange(statePromise.core_, nullptr) ->setProxy(std::exchange(tf3.core_, nullptr)); } } }, allowInline); return f; } class WaitExecutor final : public folly::Executor { public: void add(Func func) override { bool empty; { auto wQueue = queue_.wlock(); if (wQueue->detached) { return; } empty = wQueue->funcs.empty(); wQueue->funcs.push_back(std::move(func)); } if (empty) { baton_.post(); } } void drive() { baton_.wait(); fibers::runInMainContext([&]() { baton_.reset(); auto funcs = std::move(queue_.wlock()->funcs); for (auto& func : funcs) { std::exchange(func, nullptr)(); } }); } using Clock = std::chrono::steady_clock; bool driveUntil(Clock::time_point deadline) { if (!baton_.try_wait_until(deadline)) { return false; } return fibers::runInMainContext([&]() { baton_.reset(); auto funcs = std::move(queue_.wlock()->funcs); for (auto& func : funcs) { std::exchange(func, nullptr)(); } return true; }); } void detach() { // Make sure we don't hold the lock while destroying funcs. [&] { auto wQueue = queue_.wlock(); wQueue->detached = true; return std::move(wQueue->funcs); }(); } static KeepAlive create() { return makeKeepAlive(new WaitExecutor()); } private: WaitExecutor() {} bool keepAliveAcquire() noexcept override { auto keepAliveCount = keepAliveCount_.fetch_add(1, std::memory_order_relaxed); DCHECK(keepAliveCount > 0); return true; } void keepAliveRelease() noexcept override { auto keepAliveCount = keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel); DCHECK(keepAliveCount > 0); if (keepAliveCount == 1) { delete this; } } struct Queue { std::vector funcs; bool detached{false}; }; folly::Synchronized queue_; FutureBatonType baton_; std::atomic keepAliveCount_{1}; }; // Vector-like structure to play with window, // which otherwise expects a vector of size `times`, // which would be expensive with large `times` sizes. struct WindowFakeVector { using iterator = std::vector::iterator; WindowFakeVector(size_t size) : size_(size) {} size_t operator[](const size_t index) const { return index; } size_t size() const { return size_; } private: size_t size_; }; } // namespace detail } // namespace futures template SemiFuture::type> makeSemiFuture(T&& t) { return makeSemiFuture(Try::type>(static_cast(t))); } // makeSemiFutureWith(SemiFuture()) -> SemiFuture template typename std::enable_if< isFutureOrSemiFuture>::value, SemiFuture::value_type>>::type makeSemiFutureWith(F&& func) { using InnerType = typename isFutureOrSemiFuture>::Inner; try { return static_cast(func)(); } catch (...) { return makeSemiFuture( exception_wrapper(std::current_exception())); } } // makeSemiFutureWith(T()) -> SemiFuture // makeSemiFutureWith(void()) -> SemiFuture template typename std::enable_if< !(isFutureOrSemiFuture>::value), SemiFuture>>>::type makeSemiFutureWith(F&& func) { using LiftedResult = lift_unit_t>; return makeSemiFuture( makeTryWith([&func]() mutable { return static_cast(func)(); })); } template SemiFuture makeSemiFuture(std::exception_ptr const& e) { return makeSemiFuture(Try(e)); } template SemiFuture makeSemiFuture(exception_wrapper ew) { return makeSemiFuture(Try(std::move(ew))); } template typename std:: enable_if::value, SemiFuture>::type makeSemiFuture(E const& e) { return makeSemiFuture(Try(make_exception_wrapper(e))); } template SemiFuture makeSemiFuture(Try t) { return SemiFuture(SemiFuture::Core::make(std::move(t))); } // This must be defined after the constructors to avoid a bug in MSVC // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244 inline SemiFuture makeSemiFuture() { return makeSemiFuture(Unit{}); } template SemiFuture SemiFuture::makeEmpty() { return SemiFuture(futures::detail::EmptyConstruct{}); } template futures::detail::DeferredWrapper SemiFuture::stealDeferredExecutor() { return this->getCore().stealDeferredExecutor(); } template void SemiFuture::releaseDeferredExecutor(Core* core) { if (!core || core->hasCallback()) { return; } auto executor = core->stealDeferredExecutor(); async_tracing::logSemiFutureDiscard( executor.get() ? async_tracing::DiscardHasDeferred::DEFERRED_EXECUTOR : async_tracing::DiscardHasDeferred::NO_EXECUTOR); if (executor) { executor.get()->detach(); } } template SemiFuture::~SemiFuture() { releaseDeferredExecutor(this->core_); } template SemiFuture::SemiFuture(SemiFuture&& other) noexcept : futures::detail::FutureBase(std::move(other)) {} template SemiFuture::SemiFuture(Future&& other) noexcept : futures::detail::FutureBase(std::move(other)) { // SemiFuture should not have an executor on construction if (this->core_) { this->setExecutor(futures::detail::KeepAliveOrDeferred{}); } } template SemiFuture& SemiFuture::operator=(SemiFuture&& other) noexcept { releaseDeferredExecutor(this->core_); this->assign(std::move(other)); return *this; } template SemiFuture& SemiFuture::operator=(Future&& other) noexcept { releaseDeferredExecutor(this->core_); this->assign(std::move(other)); // SemiFuture should not have an executor on construction if (this->core_) { this->setExecutor(Executor::KeepAlive<>{}); } return *this; } template Future SemiFuture::via(Executor::KeepAlive<> executor) && { folly::async_tracing::logSemiFutureVia(this->getExecutor(), executor.get()); if (!executor) { throw_exception(); } if (auto deferredExecutor = this->getDeferredExecutor()) { deferredExecutor->setExecutor(executor.copy()); } auto newFuture = Future(this->core_); this->core_ = nullptr; newFuture.setExecutor(std::move(executor)); return newFuture; } template Future SemiFuture::via( Executor::KeepAlive<> executor, int8_t priority) && { return std::move(*this).via( ExecutorWithPriority::create(std::move(executor), priority)); } template Future SemiFuture::toUnsafeFuture() && { return std::move(*this).via(&InlineExecutor::instance()); } template template SemiFuture::value_type> SemiFuture::defer(F&& func) && { auto deferredExecutorPtr = this->getDeferredExecutor(); futures::detail::KeepAliveOrDeferred deferredExecutor = [&]() { if (deferredExecutorPtr) { return futures::detail::KeepAliveOrDeferred{deferredExecutorPtr->copy()}; } else { auto newDeferredExecutor = futures::detail::KeepAliveOrDeferred( futures::detail::DeferredExecutor::create()); this->setExecutor(newDeferredExecutor.copy()); return newDeferredExecutor; } }(); auto sf = Future(this->core_).thenTryInline(static_cast(func)).semi(); this->core_ = nullptr; // Carry deferred executor through chain as constructor from Future will // nullify it sf.setExecutor(std::move(deferredExecutor)); return sf; } template template SemiFuture< typename futures::detail::tryExecutorCallableResult::value_type> SemiFuture::deferExTry(F&& func) && { auto deferredExecutorPtr = this->getDeferredExecutor(); futures::detail::DeferredWrapper deferredExecutor = [&]() mutable { if (deferredExecutorPtr) { return deferredExecutorPtr->copy(); } else { auto newDeferredExecutor = futures::detail::DeferredExecutor::create(); this->setExecutor( futures::detail::KeepAliveOrDeferred{newDeferredExecutor->copy()}); return newDeferredExecutor; } }(); auto sf = Future(this->core_) .thenExTryInline([func = static_cast(func)]( folly::Executor::KeepAlive<>&& keepAlive, folly::Try&& val) mutable { return static_cast(func)( std::move(keepAlive), static_cast(val)); }) .semi(); this->core_ = nullptr; // Carry deferred executor through chain as constructor from Future will // nullify it sf.setExecutor( futures::detail::KeepAliveOrDeferred{std::move(deferredExecutor)}); return sf; } template template SemiFuture::value_type> SemiFuture::deferValue(F&& func) && { return std::move(*this).defer( [f = static_cast(func)](folly::Try&& t) mutable { return futures::detail::wrapInvoke(std::move(t), static_cast(f)); }); } template template SemiFuture< typename futures::detail::valueExecutorCallableResult::value_type> SemiFuture::deferExValue(F&& func) && { return std::move(*this).deferExTry( [f = static_cast(func)]( folly::Executor::KeepAlive<> ka, folly::Try&& t) mutable { return static_cast(f)( ka, t.template get< false, typename futures::detail::valueExecutorCallableResult:: ValueArg>()); }); } template template SemiFuture SemiFuture::deferError(tag_t, F&& func) && { return std::move(*this).defer( [func = static_cast(func)](Try&& t) mutable { if (auto e = t.template tryGetExceptionObject()) { return makeSemiFutureWith( [&]() mutable { return static_cast(func)(*e); }); } else { return makeSemiFuture(std::move(t)); } }); } template template SemiFuture SemiFuture::deferError(F&& func) && { return std::move(*this).defer( [func = static_cast(func)](Try t) mutable { if (t.hasException()) { return makeSemiFutureWith([&]() mutable { return static_cast(func)(std::move(t.exception())); }); } else { return makeSemiFuture(std::move(t)); } }); } template SemiFuture SemiFuture::unit() && { return std::move(*this).deferValue([](T&&) {}); } template SemiFuture SemiFuture::delayed(HighResDuration dur, Timekeeper* tk) && { return collectAll(*this, futures::sleep(dur, tk)) .deferValue([](std::tuple, Try> tup) { Try& t = std::get<0>(tup); return makeFuture(std::move(t)); }); } template Future Future::makeEmpty() { return Future(futures::detail::EmptyConstruct{}); } template Future::Future(Future&& other) noexcept : futures::detail::FutureBase(std::move(other)) {} template Future& Future::operator=(Future&& other) noexcept { this->assign(std::move(other)); return *this; } // unwrap template template typename std:: enable_if::value, Future::Inner>>::type Future::unwrap() && { return std::move(*this).thenValue( [](Future::Inner> internal_future) { return internal_future; }); } template Future Future::via(Executor::KeepAlive<> executor) && { folly::async_tracing::logFutureVia(this->getExecutor(), executor.get()); this->setExecutor(std::move(executor)); auto newFuture = Future(this->core_); this->core_ = nullptr; return newFuture; } template Future Future::via(Executor::KeepAlive<> executor, int8_t priority) && { return std::move(*this).via( ExecutorWithPriority::create(std::move(executor), priority)); } template Future Future::via(Executor::KeepAlive<> executor) & { folly::async_tracing::logFutureVia(this->getExecutor(), executor.get()); this->throwIfInvalid(); Promise p; auto sf = p.getSemiFuture(); auto func = [p = std::move(p)](Executor::KeepAlive<>&&, Try&& t) mutable { p.setTry(std::move(t)); }; using R = futures::detail::tryExecutorCallableResult; this->thenImplementation( std::move(func), R{}, futures::detail::InlineContinuation::forbid); // Construct future from semifuture manually because this may not have // an executor set due to legacy code. This means we can bypass the executor // check in SemiFuture::via auto f = Future(sf.core_); sf.core_ = nullptr; return std::move(f).via(std::move(executor)); } template Future Future::via(Executor::KeepAlive<> executor, int8_t priority) & { return this->via(ExecutorWithPriority::create(std::move(executor), priority)); } template template Future::Inner> Future::then( R (Caller::*func)(Args...), Caller* instance) && { using FirstArg = remove_cvref_t::FirstArg>; return std::move(*this).thenTry([instance, func](Try&& t) { return (instance->*func)(t.template get::value, Args>()...); }); } template template Future::value_type> Future::thenTry(F&& func) && { auto lambdaFunc = [f = static_cast(func)]( folly::Executor::KeepAlive<>&&, folly::Try&& t) mutable { return static_cast(f)(std::move(t)); }; using R = futures::detail::tryExecutorCallableResult; return this->thenImplementation( std::move(lambdaFunc), R{}, futures::detail::InlineContinuation::forbid); } template template Future::value_type> Future::thenTryInline(F&& func) && { auto lambdaFunc = [f = static_cast(func)]( folly::Executor::KeepAlive<>&&, folly::Try&& t) mutable { return static_cast(f)(std::move(t)); }; using R = futures::detail::tryExecutorCallableResult; return this->thenImplementation( std::move(lambdaFunc), R{}, futures::detail::InlineContinuation::permit); } template template Future::value_type> Future::thenExTry(F&& func) && { auto lambdaFunc = [f = static_cast(func)]( Executor::KeepAlive<>&& ka, folly::Try&& t) mutable { // Enforce that executor cannot be null DCHECK(ka); return static_cast(f)(std::move(ka), std::move(t)); }; using R = futures::detail::tryExecutorCallableResult; return this->thenImplementation( std::move(lambdaFunc), R{}, futures::detail::InlineContinuation::forbid); } template template Future::value_type> Future::thenExTryInline(F&& func) && { auto lambdaFunc = [f = static_cast(func)]( Executor::KeepAlive<>&& ka, folly::Try&& t) mutable { // Enforce that executor cannot be null DCHECK(ka); return static_cast(f)(std::move(ka), std::move(t)); }; using R = futures::detail::tryExecutorCallableResult; return this->thenImplementation( std::move(lambdaFunc), R{}, futures::detail::InlineContinuation::permit); } template template Future::value_type> Future::thenValue(F&& func) && { auto lambdaFunc = [f = static_cast(func)]( Executor::KeepAlive<>&&, folly::Try&& t) mutable { return futures::detail::wrapInvoke(std::move(t), static_cast(f)); }; using R = futures::detail::tryExecutorCallableResult; return this->thenImplementation( std::move(lambdaFunc), R{}, futures::detail::InlineContinuation::forbid); } template template Future::value_type> Future::thenValueInline(F&& func) && { auto lambdaFunc = [f = static_cast(func)]( Executor::KeepAlive<>&&, folly::Try&& t) mutable { return futures::detail::wrapInvoke(std::move(t), static_cast(f)); }; using R = futures::detail::tryExecutorCallableResult; return this->thenImplementation( std::move(lambdaFunc), R{}, futures::detail::InlineContinuation::permit); } template template Future::value_type> Future::thenExValue(F&& func) && { auto lambdaFunc = [f = static_cast(func)]( Executor::KeepAlive<>&& ka, folly::Try&& t) mutable { // Enforce that executor cannot be null DCHECK(ka); return static_cast(f)( std::move(ka), t.template get< false, typename futures::detail::valueExecutorCallableResult:: ValueArg>()); }; using R = futures::detail::tryExecutorCallableResult; return this->thenImplementation( std::move(lambdaFunc), R{}, futures::detail::InlineContinuation::forbid); } template template Future::value_type> Future::thenExValueInline(F&& func) && { auto lambdaFunc = [f = static_cast(func)]( Executor::KeepAlive<>&& ka, folly::Try&& t) mutable { // Enforce that executor cannot be null DCHECK(ka); return static_cast(f)( std::move(ka), t.template get< false, typename futures::detail::valueExecutorCallableResult:: ValueArg>()); }; using R = futures::detail::tryExecutorCallableResult; return this->thenImplementation( std::move(lambdaFunc), R{}, futures::detail::InlineContinuation::permit); } template template typename std::enable_if< isFutureOrSemiFuture>::value, Future>::type Future::thenError(tag_t, F&& func) && { Promise p; p.core_->initCopyInterruptHandlerFrom(this->getCore()); auto sf = p.getSemiFuture(); auto* ePtr = this->getExecutor(); auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); this->setCallback_([state = futures::detail::makeCoreCallbackState( std::move(p), static_cast(func))]( Executor::KeepAlive<>&& ka, Try&& t) mutable { if (auto ex = t.template tryGetExceptionObject< std::remove_reference_t>()) { auto tf2 = state.tryInvoke(std::move(*ex)); if (tf2.hasException()) { state.setException(std::move(ka), std::move(tf2.exception())); } else { tf2->setCallback_( [p = state.stealPromise()]( Executor::KeepAlive<>&& innerKA, Try&& t3) mutable { p.setTry(std::move(innerKA), std::move(t3)); }); } } else { state.setTry(std::move(ka), std::move(t)); } }); return std::move(sf).via(std::move(e)); } template template typename std::enable_if< !isFutureOrSemiFuture>::value, Future>::type Future::thenError(tag_t, F&& func) && { Promise p; p.core_->initCopyInterruptHandlerFrom(this->getCore()); auto sf = p.getSemiFuture(); auto* ePtr = this->getExecutor(); auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); this->setCallback_([state = futures::detail::makeCoreCallbackState( std::move(p), static_cast(func))]( Executor::KeepAlive<>&& ka, Try&& t) mutable { if (auto ex = t.template tryGetExceptionObject< std::remove_reference_t>()) { state.setTry(std::move(ka), makeTryWith([&] { return state.invoke(std::move(*ex)); })); } else { state.setTry(std::move(ka), std::move(t)); } }); return std::move(sf).via(std::move(e)); } template template typename std::enable_if< isFutureOrSemiFuture>::value, Future>::type Future::thenError(F&& func) && { auto* ePtr = this->getExecutor(); auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); Promise p; p.core_->initCopyInterruptHandlerFrom(this->getCore()); auto sf = p.getSemiFuture(); this->setCallback_([state = futures::detail::makeCoreCallbackState( std::move(p), static_cast(func))]( Executor::KeepAlive<>&& ka, Try t) mutable { if (t.hasException()) { auto tf2 = state.tryInvoke(std::move(t.exception())); if (tf2.hasException()) { state.setException(std::move(ka), std::move(tf2.exception())); } else { tf2->setCallback_( [p = state.stealPromise()]( Executor::KeepAlive<>&& innerKA, Try&& t3) mutable { p.setTry(std::move(innerKA), std::move(t3)); }); } } else { state.setTry(std::move(ka), std::move(t)); } }); return std::move(sf).via(std::move(e)); } template template typename std::enable_if< !isFutureOrSemiFuture>::value, Future>::type Future::thenError(F&& func) && { auto* ePtr = this->getExecutor(); auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); Promise p; p.core_->initCopyInterruptHandlerFrom(this->getCore()); auto sf = p.getSemiFuture(); this->setCallback_([state = futures::detail::makeCoreCallbackState( std::move(p), static_cast(func))]( Executor::KeepAlive<>&& ka, Try&& t) mutable { if (t.hasException()) { state.setTry(std::move(ka), makeTryWith([&] { return state.invoke(std::move(t.exception())); })); } else { state.setTry(std::move(ka), std::move(t)); } }); return std::move(sf).via(std::move(e)); } template Future Future::then() && { return std::move(*this).thenValue([](T&&) {}); } template template Future Future::ensure(F&& func) && { return std::move(*this).thenTry( [funcw = static_cast(func)](Try&& t) mutable { static_cast(funcw)(); return makeFuture(std::move(t)); }); } template template Future Future::onTimeout( HighResDuration dur, F&& func, Timekeeper* tk) && { return std::move(*this).within(dur, tk).thenError( tag_t{}, [funcw = static_cast(func)](auto const&) mutable { return static_cast(funcw)(); }); } template auto via(Executor::KeepAlive<> x, Func&& func) -> Future< typename isFutureOrSemiFuture()())>::Inner> { return via(std::move(x)) .thenValue([f = static_cast(func)](auto&&) mutable { return static_cast(f)(); }); } // makeFuture template Future::type> makeFuture(T&& t) { return makeFuture(Try::type>(static_cast(t))); } inline Future makeFuture() { return makeFuture(Unit{}); } // makeFutureWith(Future()) -> Future template typename std:: enable_if>::value, invoke_result_t>::type makeFutureWith(F&& func) { using InnerType = typename isFuture>::Inner; try { return static_cast(func)(); } catch (...) { return makeFuture(exception_wrapper(std::current_exception())); } } // makeFutureWith(T()) -> Future // makeFutureWith(void()) -> Future template typename std::enable_if< !(isFuture>::value), Future>>>::type makeFutureWith(F&& func) { using LiftedResult = lift_unit_t>; return makeFuture( makeTryWith([&func]() mutable { return static_cast(func)(); })); } template Future makeFuture(std::exception_ptr const& e) { return makeFuture(Try(e)); } template Future makeFuture(exception_wrapper ew) { return makeFuture(Try(std::move(ew))); } template typename std::enable_if::value, Future>:: type makeFuture(E const& e) { return makeFuture(Try(make_exception_wrapper(e))); } template Future makeFuture(Try t) { return Future(Future::Core::make(std::move(t))); } // via Future via(Executor::KeepAlive<> executor) { return makeFuture().via(std::move(executor)); } Future via(Executor::KeepAlive<> executor, int8_t priority) { return makeFuture().via(std::move(executor), priority); } namespace futures { namespace detail { template FOLLY_ERASE void foreach_(std::index_sequence, V&& v, Fs&&... fs) { using _ = int[]; void(_{0, (void(v(index_constant{}, static_cast(fs))), 0)...}); } template FOLLY_ERASE void foreach(V&& v, Fs&&... fs) { using _ = std::index_sequence_for; foreach_(_{}, static_cast(v), static_cast(fs)...); } template futures::detail::DeferredExecutor* getDeferredExecutor(SemiFuture& future) { return future.getDeferredExecutor(); } template futures::detail::DeferredWrapper stealDeferredExecutor(SemiFuture& future) { return future.stealDeferredExecutor(); } template futures::detail::DeferredWrapper stealDeferredExecutor(Future&) { return {}; } template void stealDeferredExecutorsVariadic( std::vector& executors, Ts&... ts) { foreach( [&](auto, auto& future) { if (auto executor = stealDeferredExecutor(future)) { executors.push_back(std::move(executor)); } }, ts...); } template void stealDeferredExecutors( std::vector& executors, InputIterator first, InputIterator last) { for (auto it = first; it != last; ++it) { if (auto executor = stealDeferredExecutor(*it)) { executors.push_back(std::move(executor)); } } } } // namespace detail } // namespace futures // collectAll (variadic) template SemiFuture::value_type>...>> collectAll(Fs&&... fs) { using Result = std::tuple::value_type>...>; struct Context { ~Context() { p.setValue(std::move(results)); } Promise p; Result results; }; std::vector executors; futures::detail::stealDeferredExecutorsVariadic(executors, fs...); auto ctx = std::make_shared(); futures::detail::foreach( [&](auto i, auto&& f) { f.setCallback_([i, ctx](auto&&, auto&& t) { std::get(ctx->results) = std::move(t); }); }, static_cast(fs)...); auto future = ctx->p.getSemiFuture(); if (!executors.empty()) { auto work = [](Try&& t) { return std::move(t).value(); }; future = std::move(future).defer(work); auto deferredExecutor = futures::detail::getDeferredExecutor(future); deferredExecutor->setNestedExecutors(std::move(executors)); } return future; } template Future::value_type>...>> collectAllUnsafe(Fs&&... fs) { return collectAll(static_cast(fs)...).toUnsafeFuture(); } // collectAll (iterator) template SemiFuture::value_type::value_type>>> collectAll(InputIterator first, InputIterator last) { using F = typename std::iterator_traits::value_type; using T = typename F::value_type; struct Context { explicit Context(size_t n) : results(n), count(n) {} ~Context() { futures::detail::setTry( p, std::move(ka), Try>>(std::move(results))); } Promise>> p; Executor::KeepAlive<> ka; std::vector> results; std::atomic count; }; std::vector executors; futures::detail::stealDeferredExecutors(executors, first, last); auto ctx = std::make_shared(size_t(std::distance(first, last))); for (size_t i = 0; first != last; ++first, ++i) { first->setCallback_( [i, ctx](Executor::KeepAlive<>&& ka, Try&& t) { ctx->results[i] = std::move(t); if (ctx->count.fetch_sub(1, std::memory_order_acq_rel) == 1) { ctx->ka = std::move(ka); } }, futures::detail::InlineContinuation::permit); } auto future = ctx->p.getSemiFuture(); if (!executors.empty()) { future = std::move(future).defer( [](Try&& t) { return std::move(t).value(); }); auto deferredExecutor = futures::detail::getDeferredExecutor(future); deferredExecutor->setNestedExecutors(std::move(executors)); } return future; } template Future::value_type::value_type>>> collectAllUnsafe(InputIterator first, InputIterator last) { return collectAll(first, last).toUnsafeFuture(); } // collect (iterator) template SemiFuture::value_type::value_type>> collect(InputIterator first, InputIterator last) { using F = typename std::iterator_traits::value_type; using T = typename F::value_type; struct Context { explicit Context(size_t n) : result(n) { finalResult.reserve(n); } ~Context() { if (!threw.load(std::memory_order_relaxed)) { // map Optional -> T for (auto& value : result) { // if any of the input futures were off the end of a weakRef(), the // logic added in setCallback_ will not execute as an executor // weakRef() drops all callbacks added silently without executing them if (!value.has_value()) { p.setException(BrokenPromise{pretty_name>()}); return; } finalResult.push_back(std::move(value.value())); } p.setValue(std::move(finalResult)); } } Promise> p; std::vector> result; std::vector finalResult; std::atomic threw{false}; }; std::vector executors; futures::detail::stealDeferredExecutors(executors, first, last); auto ctx = std::make_shared(std::distance(first, last)); for (size_t i = 0; first != last; ++first, ++i) { first->setCallback_([i, ctx](Executor::KeepAlive<>&&, Try&& t) { if (t.hasException()) { if (!ctx->threw.exchange(true, std::memory_order_relaxed)) { ctx->p.setException(std::move(t.exception())); } } else if (!ctx->threw.load(std::memory_order_relaxed)) { ctx->result[i] = std::move(t.value()); } }); } auto future = ctx->p.getSemiFuture(); if (!executors.empty()) { auto work = [](Try&& t) { return std::move(t).value(); }; future = std::move(future).defer(work); const auto& deferredExecutor = futures::detail::getDeferredExecutor(future); deferredExecutor->setNestedExecutors(std::move(executors)); } return future; } template Future::value_type::value_type>> collectUnsafe(InputIterator first, InputIterator last) { return collect(first, last).toUnsafeFuture(); } // collect (variadic) template SemiFuture::value_type...>> collect( Fs&&... fs) { using Result = std::tuple::value_type...>; struct Context { ~Context() { if (!threw.load(std::memory_order_relaxed)) { // if any of the input futures were off the end of a weakRef(), the // logic added in setCallback_ will not execute as an executor // weakRef() drops all callbacks added silently without executing them auto brokenPromise = false; folly::for_each(results, [&](auto& result) { if (!result.hasValue() && !result.hasException()) { brokenPromise = true; } }); if (brokenPromise) { p.setException(BrokenPromise{pretty_name()}); } else { p.setValue(unwrapTryTuple(std::move(results))); } } } Promise p; std::tuple::value_type>...> results; std::atomic threw{false}; }; std::vector executors; futures::detail::stealDeferredExecutorsVariadic(executors, fs...); auto ctx = std::make_shared(); futures::detail::foreach( [&](auto i, auto&& f) { f.setCallback_([i, ctx](Executor::KeepAlive<>&&, auto&& t) { if (t.hasException()) { if (!ctx->threw.exchange(true, std::memory_order_relaxed)) { ctx->p.setException(std::move(t.exception())); } } else if (!ctx->threw.load(std::memory_order_relaxed)) { std::get(ctx->results) = std::move(t); } }); }, static_cast(fs)...); auto future = ctx->p.getSemiFuture(); if (!executors.empty()) { auto work = [](Try&& t) { return std::move(t).value(); }; future = std::move(future).defer(work); const auto& deferredExecutor = futures::detail::getDeferredExecutor(future); deferredExecutor->setNestedExecutors(std::move(executors)); } return future; } template Future::value_type...>> collectUnsafe( Fs&&... fs) { return collect(static_cast(fs)...).toUnsafeFuture(); } // collectAny (iterator) template SemiFuture::value_type::value_type>>> collectAny(InputIterator first, InputIterator last) { using F = typename std::iterator_traits::value_type; using T = typename F::value_type; struct Context { Promise>> p; std::atomic done{false}; }; std::vector executors; futures::detail::stealDeferredExecutors(executors, first, last); auto ctx = std::make_shared(); for (size_t i = 0; first != last; ++first, ++i) { first->setCallback_([i, ctx](Executor::KeepAlive<>&&, Try&& t) { if (!ctx->done.exchange(true, std::memory_order_relaxed)) { ctx->p.setValue(std::make_pair(i, std::move(t))); } }); } auto future = ctx->p.getSemiFuture(); if (!executors.empty()) { future = std::move(future).defer( [](Try&& t) { return std::move(t).value(); }); const auto& deferredExecutor = futures::detail::getDeferredExecutor(future); deferredExecutor->setNestedExecutors(std::move(executors)); } return future; } // collectAnyWithoutException (iterator) template SemiFuture::value_type::value_type>> collectAnyWithoutException(InputIterator first, InputIterator last) { using F = typename std::iterator_traits::value_type; using T = typename F::value_type; struct Context { Context(size_t n) : nTotal(n) {} Promise> p; std::atomic done{false}; std::atomic nFulfilled{0}; size_t nTotal; }; std::vector executors; futures::detail::stealDeferredExecutors(executors, first, last); auto ctx = std::make_shared(size_t(std::distance(first, last))); for (size_t i = 0; first != last; ++first, ++i) { first->setCallback_([i, ctx](Executor::KeepAlive<>&&, Try&& t) { if (!t.hasException() && !ctx->done.exchange(true, std::memory_order_relaxed)) { ctx->p.setValue(std::make_pair(i, std::move(t.value()))); } else if ( ctx->nFulfilled.fetch_add(1, std::memory_order_relaxed) + 1 == ctx->nTotal) { ctx->p.setException(t.exception()); } }); } auto future = ctx->p.getSemiFuture(); if (!executors.empty()) { future = std::move(future).defer( [](Try&& t) { return std::move(t).value(); }); const auto& deferredExecutor = futures::detail::getDeferredExecutor(future); deferredExecutor->setNestedExecutors(std::move(executors)); } return future; } // collectN (iterator) template SemiFuture::value_type::value_type>>>> collectN(InputIterator first, InputIterator last, size_t n) { using F = typename std::iterator_traits::value_type; using T = typename F::value_type; using Result = std::vector>>; struct Context { explicit Context(size_t numFutures, size_t min_) : v(numFutures), min(min_) {} std::vector>> v; size_t min; std::atomic completed = {0}; // # input futures completed std::atomic stored = {0}; // # output values stored Promise p; }; assert(n > 0); assert(std::distance(first, last) >= 0); if (size_t(std::distance(first, last)) < n) { return SemiFuture( exception_wrapper(std::runtime_error("Not enough futures"))); } std::vector executors; futures::detail::stealDeferredExecutors(executors, first, last); // for each completed Future, increase count and add to vector, until we // have n completed futures at which point we fulfil our Promise with the // vector auto ctx = std::make_shared(size_t(std::distance(first, last)), n); for (size_t i = 0; first != last; ++first, ++i) { first->setCallback_([i, ctx](Executor::KeepAlive<>&&, Try&& t) { // relaxed because this guards control but does not guard data auto const c = 1 + ctx->completed.fetch_add(1, std::memory_order_relaxed); if (c > ctx->min) { return; } ctx->v[i] = std::move(t); // release because the stored values in all threads must be visible below // acquire because no stored value is permitted to be fetched early auto const s = 1 + ctx->stored.fetch_add(1, std::memory_order_acq_rel); if (s < ctx->min) { return; } Result result; result.reserve(ctx->completed.load()); for (size_t j = 0; j < ctx->v.size(); ++j) { auto& entry = ctx->v[j]; if (entry.hasValue()) { result.emplace_back(j, std::move(entry).value()); } } ctx->p.setTry(Try(std::move(result))); }); } auto future = ctx->p.getSemiFuture(); if (!executors.empty()) { future = std::move(future).defer( [](Try&& t) { return std::move(t).value(); }); const auto& deferredExecutor = futures::detail::getDeferredExecutor(future); deferredExecutor->setNestedExecutors(std::move(executors)); } return future; } // reduce (iterator) template Future reduce(It first, It last, T&& initial, F&& func) { if (first == last) { return makeFuture(static_cast(initial)); } typedef typename std::iterator_traits::value_type::value_type ItT; typedef typename std:: conditional&&>::value, Try, ItT>::type Arg; typedef isTry IsTry; auto sfunc = std::make_shared>(static_cast(func)); auto f = std::move(*first).thenTry( [initial = static_cast(initial), sfunc](Try&& head) mutable { return (*sfunc)( std::move(initial), head.template get()); }); for (++first; first != last; ++first) { f = collectAllUnsafe(f, *first).thenValue( [sfunc](std::tuple, Try>&& t) { return (*sfunc)( std::move(std::get<0>(t).value()), // Either return a ItT&& or a Try&& depending // on the type of the argument of func. std::get<1>(t).template get()); }); } return f; } // window (collection) template std::vector> window(Collection input, F func, size_t n) { // Use global QueuedImmediateExecutor singleton to avoid stack overflow. auto executor = &QueuedImmediateExecutor::instance(); return window(executor, std::move(input), std::move(func), n); } template auto window(size_t times, F func, size_t n) -> std::vector> { return window(futures::detail::WindowFakeVector(times), std::move(func), n); } template std::vector> window( Executor::KeepAlive<> executor, Collection input, F func, size_t n) { struct WindowContext { WindowContext( Executor::KeepAlive<> executor_, Collection&& input_, F&& func_) : executor(std::move(executor_)), input(std::move(input_)), promises(input.size()), func(std::move(func_)) {} std::atomic i{0}; Executor::KeepAlive<> executor; Collection input; std::vector> promises; F func; static void spawn(std::shared_ptr ctx) { size_t i = ctx->i.fetch_add(1, std::memory_order_relaxed); if (i < ctx->input.size()) { auto fut = makeSemiFutureWith([&] { return ctx->func(std::move(ctx->input[i])); }).via(ctx->executor.get()); fut.setCallback_([ctx = std::move(ctx), i]( Executor::KeepAlive<>&&, Try&& t) mutable { ctx->promises[i].setTry(std::move(t)); // Chain another future onto this one spawn(std::move(ctx)); }); } } }; auto max = std::min(n, input.size()); auto ctx = std::make_shared( executor.copy(), std::move(input), std::move(func)); // Start the first n Futures for (size_t i = 0; i < max; ++i) { executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); }); } std::vector> futures; futures.reserve(ctx->promises.size()); for (auto& promise : ctx->promises) { futures.emplace_back(promise.getSemiFuture().via(executor.copy())); } return futures; } // reduce template template Future Future::reduce(In&& initial, F&& func) && { return std::move(*this).thenValue( [minitial = static_cast(initial), mfunc = static_cast(func)](T&& vals) mutable { auto ret = std::move(minitial); for (auto& val : vals) { ret = mfunc(std::move(ret), std::move(val)); } return ret; }); } // unorderedReduce (iterator) template SemiFuture unorderedReduceSemiFuture(It first, It last, T initial, F func) { using ItF = typename std::iterator_traits::value_type; using ItT = typename ItF::value_type; using Arg = MaybeTryArg; if (first == last) { return makeFuture(std::move(initial)); } typedef isTry IsTry; struct Context { Context(T&& memo, F&& fn, size_t n) : lock_(), memo_(makeFuture(std::move(memo))), func_(std::move(fn)), numThens_(0), numFutures_(n), promise_() {} folly::MicroSpinLock lock_; // protects memo_ and numThens_ Future memo_; F func_; size_t numThens_; // how many Futures completed and called .then() size_t numFutures_; // how many Futures in total Promise promise_; }; struct Fulfill { void operator()(Promise&& p, T&& v) const { p.setValue(std::move(v)); } void operator()(Promise&& p, Future&& f) const { f.setCallback_( [p = std::move(p)](Executor::KeepAlive<>&&, Try&& t) mutable { p.setTry(std::move(t)); }); } }; std::vector executors; futures::detail::stealDeferredExecutors(executors, first, last); auto ctx = std::make_shared( std::move(initial), std::move(func), std::distance(first, last)); for (size_t i = 0; first != last; ++first, ++i) { first->setCallback_([i, ctx](Executor::KeepAlive<>&&, Try&& t) { (void)i; // Futures can be completed in any order, simultaneously. // To make this non-blocking, we create a new Future chain in // the order of completion to reduce the values. // The spinlock just protects chaining a new Future, not actually // executing the reduce, which should be really fast. Promise p; auto f = p.getFuture(); { folly::MSLGuard lock(ctx->lock_); f = std::exchange(ctx->memo_, std::move(f)); if (++ctx->numThens_ == ctx->numFutures_) { // After reducing the value of the last Future, fulfill the Promise ctx->memo_.setCallback_([ctx](Executor::KeepAlive<>&&, Try&& t2) { ctx->promise_.setValue(std::move(t2)); }); } } f.setCallback_([ctx, mp = std::move(p), mt = std::move(t)]( Executor::KeepAlive<>&&, Try&& v) mutable { if (v.hasValue()) { exception_wrapper ew; try { Fulfill{}( std::move(mp), ctx->func_( std::move(v.value()), mt.template get())); } catch (...) { ew = exception_wrapper{std::current_exception()}; } if (ew) { mp.setException(std::move(ew)); } } else { mp.setTry(std::move(v)); } }); }); } auto future = ctx->promise_.getSemiFuture(); if (!executors.empty()) { future = std::move(future).defer( [](Try&& t) { return std::move(t).value(); }); const auto& deferredExecutor = futures::detail::getDeferredExecutor(future); deferredExecutor->setNestedExecutors(std::move(executors)); } return future; } template Future unorderedReduce(It first, It last, T initial, F func) { return unorderedReduceSemiFuture( first, last, std::move(initial), std::move(func)) .via(&InlineExecutor::instance()); } // within template Future Future::within(HighResDuration dur, Timekeeper* tk) && { return std::move(*this).within(dur, FutureTimeout(), tk); } template template Future Future::within(HighResDuration dur, E e, Timekeeper* tk) && { if (this->isReady()) { return std::move(*this); } auto* ePtr = this->getExecutor(); auto exe = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance()); return std::move(*this).semi().within(dur, e, tk).via(std::move(exe)); } template template SemiFuture SemiFuture::within( HighResDuration dur, E e, Timekeeper* tk) && { if (this->isReady()) { return std::move(*this); } struct Context { explicit Context(E ex) : exception(std::move(ex)) {} E exception; SemiFuture thisFuture{SemiFuture::makeEmpty()}; SemiFuture afterFuture{SemiFuture::makeEmpty()}; Promise promise; std::atomic token{false}; }; std::shared_ptr tks; if (LIKELY(!tk)) { tks = folly::detail::getTimekeeperSingleton(); tk = tks.get(); } if (UNLIKELY(!tk)) { return makeSemiFuture(FutureNoTimekeeper()); } auto ctx = std::make_shared(std::move(e)); ctx->thisFuture = std::move(*this).defer([ctx](Try&& t) { if (!ctx->token.exchange(true, std::memory_order_relaxed)) { ctx->promise.setTry(std::move(t)); ctx->afterFuture.cancel(); } }); // Have time keeper use a weak ptr to hold ctx, // so that ctx can be deallocated as soon as the future job finished. ctx->afterFuture = tk->after(dur).defer([weakCtx = to_weak_ptr(ctx)](Try&& t) mutable { if (t.hasException() && t.exception().is_compatible_with()) { // This got cancelled by thisFuture so we can just return. return; } auto lockedCtx = weakCtx.lock(); if (!lockedCtx) { // ctx already released. "this" completed first, cancel "after" return; } // "after" completed first, cancel "this" lockedCtx->thisFuture.raise(FutureTimeout()); if (!lockedCtx->token.exchange(true, std::memory_order_relaxed)) { if (t.hasException()) { lockedCtx->promise.setException(std::move(t.exception())); } else { lockedCtx->promise.setException(std::move(lockedCtx->exception)); } } }); // Properly propagate interrupt values through futures chained after within() ctx->promise.setInterruptHandler( [weakCtx = to_weak_ptr(ctx)](const exception_wrapper& ex) { if (auto lockedCtx = weakCtx.lock()) { lockedCtx->thisFuture.raise(ex); } }); // Construct the future to return, create a fresh DeferredExecutor and // nest the other two inside it, in case they already carry nested executors. auto fut = ctx->promise.getSemiFuture(); auto newDeferredExecutor = futures::detail::KeepAliveOrDeferred( futures::detail::DeferredExecutor::create()); fut.setExecutor(std::move(newDeferredExecutor)); std::vector nestedExecutors; nestedExecutors.emplace_back(ctx->thisFuture.stealDeferredExecutor()); nestedExecutors.emplace_back(ctx->afterFuture.stealDeferredExecutor()); // Set trivial callbacks to treat the futures as consumed ctx->thisFuture.setCallback_([](Executor::KeepAlive<>&&, Try&&) {}); ctx->afterFuture.setCallback_([](Executor::KeepAlive<>&&, Try&&) {}); futures::detail::getDeferredExecutor(fut)->setNestedExecutors( std::move(nestedExecutors)); return fut; } // delayed template Future Future::delayed(HighResDuration dur, Timekeeper* tk) && { auto e = this->getExecutor(); return collectAll(*this, futures::sleep(dur, tk)) .via(e ? e : &InlineExecutor::instance()) .thenValue([](std::tuple, Try>&& tup) { return makeFuture(std::get<0>(std::move(tup))); }); } namespace futures { namespace detail { template void waitImpl(FutureType& f) { if (std::is_base_of, FutureType>::value) { f = std::move(f).via(&InlineExecutor::instance()); } // short-circuit if there's nothing to do if (f.isReady()) { return; } Promise promise; auto ret = convertFuture(promise.getSemiFuture(), f); FutureBatonType baton; f.setCallback_([&baton, promise = std::move(promise)]( Executor::KeepAlive<>&&, Try&& t) mutable { promise.setTry(std::move(t)); baton.post(); }); f = std::move(ret); baton.wait(); assert(f.isReady()); } template Future convertFuture(SemiFuture&& sf, const Future& f) { // Carry executor from f, inserting an inline executor if it did not have one auto* exe = f.getExecutor(); auto newFut = std::move(sf).via(exe ? exe : &InlineExecutor::instance()); newFut.core_->initCopyInterruptHandlerFrom(*f.core_); return newFut; } template SemiFuture convertFuture(SemiFuture&& sf, const SemiFuture&) { return std::move(sf); } template void waitImpl(FutureType& f, HighResDuration dur) { if (std::is_base_of, FutureType>::value) { f = std::move(f).via(&InlineExecutor::instance()); } // short-circuit if there's nothing to do if (f.isReady()) { return; } Promise promise; auto ret = convertFuture(promise.getSemiFuture(), f); auto baton = std::make_shared(); f.setCallback_([baton, promise = std::move(promise)]( Executor::KeepAlive<>&&, Try&& t) mutable { promise.setTry(std::move(t)); baton->post(); }); f = std::move(ret); if (baton->try_wait_for(dur)) { assert(f.isReady()); } } template void waitViaImpl(Future& f, DrivableExecutor* e) { // Set callback so to ensure that the via executor has something on it // so that once the preceding future triggers this callback, drive will // always have a callback to satisfy it if (f.isReady()) { return; } f = std::move(f).via(e).thenTry([](Try&& t) { return std::move(t); }); while (!f.isReady()) { e->drive(); } assert(f.isReady()); f = std::move(f).via(&InlineExecutor::instance()); } template void waitViaImpl( Future& f, TimedDrivableExecutor* e, const std::chrono::duration& timeout) { // Set callback so to ensure that the via executor has something on it // so that once the preceding future triggers this callback, drive will // always have a callback to satisfy it if (f.isReady()) { return; } // Chain operations, ensuring that the executor is kept alive for the duration f = std::move(f).via(e).thenValue( [keepAlive = getKeepAliveToken(e)](T&& t) { return std::move(t); }); auto now = std::chrono::steady_clock::now(); auto deadline = now + timeout; while (!f.isReady() && (now < deadline)) { e->try_drive_until(deadline); now = std::chrono::steady_clock::now(); } assert(f.isReady() || (now >= deadline)); if (f.isReady()) { f = std::move(f).via(&InlineExecutor::instance()); } } } // namespace detail } // namespace futures template SemiFuture& SemiFuture::wait() & { if (auto deferredExecutor = this->getDeferredExecutor()) { // Make sure that the last callback in the future chain will be run on the // WaitExecutor. Promise promise; auto ret = promise.getSemiFuture(); setCallback_( [p = std::move(promise)](Executor::KeepAlive<>&&, auto&& r) mutable { p.setTry(std::move(r)); }); auto waitExecutor = futures::detail::WaitExecutor::create(); deferredExecutor->setExecutor(waitExecutor.copy()); while (!ret.isReady()) { waitExecutor->drive(); } waitExecutor->detach(); this->detach(); *this = std::move(ret); } else { futures::detail::waitImpl(*this); } return *this; } template SemiFuture&& SemiFuture::wait() && { return std::move(wait()); } template SemiFuture& SemiFuture::wait(HighResDuration dur) & { if (auto deferredExecutor = this->getDeferredExecutor()) { // Make sure that the last callback in the future chain will be run on the // WaitExecutor. Promise promise; auto ret = promise.getSemiFuture(); setCallback_( [p = std::move(promise)](Executor::KeepAlive<>&&, auto&& r) mutable { p.setTry(std::move(r)); }); auto waitExecutor = futures::detail::WaitExecutor::create(); auto deadline = futures::detail::WaitExecutor::Clock::now() + dur; deferredExecutor->setExecutor(waitExecutor.copy()); while (!ret.isReady()) { if (!waitExecutor->driveUntil(deadline)) { break; } } waitExecutor->detach(); this->detach(); *this = std::move(ret); } else { futures::detail::waitImpl(*this, dur); } return *this; } template bool SemiFuture::wait(HighResDuration dur) && { auto future = std::move(*this); future.wait(dur); return future.isReady(); } template T SemiFuture::get() && { return std::move(*this).getTry().value(); } template T SemiFuture::get(HighResDuration dur) && { return std::move(*this).getTry(dur).value(); } template Try SemiFuture::getTry() && { wait(); auto future = folly::Future(this->core_); this->core_ = nullptr; return std::move(std::move(future).result()); } template Try SemiFuture::getTry(HighResDuration dur) && { wait(dur); auto future = folly::Future(this->core_); this->core_ = nullptr; if (!future.isReady()) { throw_exception(); } return std::move(std::move(future).result()); } template Future& Future::wait() & { futures::detail::waitImpl(*this); return *this; } template Future&& Future::wait() && { futures::detail::waitImpl(*this); return std::move(*this); } template Future& Future::wait(HighResDuration dur) & { futures::detail::waitImpl(*this, dur); return *this; } template Future&& Future::wait(HighResDuration dur) && { futures::detail::waitImpl(*this, dur); return std::move(*this); } template Future& Future::waitVia(DrivableExecutor* e) & { futures::detail::waitViaImpl(*this, e); return *this; } template Future&& Future::waitVia(DrivableExecutor* e) && { futures::detail::waitViaImpl(*this, e); return std::move(*this); } template Future& Future::waitVia(TimedDrivableExecutor* e, HighResDuration dur) & { futures::detail::waitViaImpl(*this, e, dur); return *this; } template Future&& Future::waitVia( TimedDrivableExecutor* e, HighResDuration dur) && { futures::detail::waitViaImpl(*this, e, dur); return std::move(*this); } template T Future::get() && { return std::move(*this).getTry().value(); } template T Future::get(HighResDuration dur) && { return std::move(*this).getTry(dur).value(); } template Try Future::getTry() && { return std::move(*this).semi().getTry(); } template Try Future::getTry(HighResDuration dur) && { return std::move(*this).semi().getTry(dur); } template T Future::getVia(DrivableExecutor* e) && { return std::move(waitVia(e).value()); } template T Future::getVia(TimedDrivableExecutor* e, HighResDuration dur) && { waitVia(e, dur); if (!this->isReady()) { throw_exception(); } return std::move(value()); } template Try Future::getTryVia(DrivableExecutor* e) && { return std::move(waitVia(e).result()); } template Try Future::getTryVia(TimedDrivableExecutor* e, HighResDuration dur) && { waitVia(e, dur); if (!this->isReady()) { throw_exception(); } return std::move(result()); } namespace futures { namespace detail { template struct TryEquals { static bool equals(const Try& t1, const Try& t2) { return t1.value() == t2.value(); } }; } // namespace detail } // namespace futures template Future Future::willEqual(Future& f) { return collectAllUnsafe(*this, f).thenValue( [](const std::tuple, Try>& t) { if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) { return futures::detail::TryEquals::equals( std::get<0>(t), std::get<1>(t)); } else { return false; } }); } template template Future Future::filter(F&& predicate) && { return std::move(*this).thenValue([p = static_cast(predicate)](T val) { T const& valConstRef = val; if (!p(valConstRef)) { throw_exception(); } return val; }); } template auto when(bool p, F&& thunk) -> decltype(std::declval>().unit()) { return p ? static_cast(thunk)().unit() : makeFuture(); } template typename std:: enable_if>::value, SemiFuture>::type whileDo(P&& predicate, F&& thunk) { if (predicate()) { auto future = thunk(); return std::move(future).deferExValue( [predicate = static_cast(predicate), thunk = static_cast(thunk)](auto&& ex, auto&&) mutable { return whileDo(static_cast(predicate), static_cast(thunk)) .via(std::move(ex)); }); } return makeSemiFuture(); } template typename std::enable_if>::value, Future>::type whileDo(P&& predicate, F&& thunk) { if (predicate()) { auto future = thunk(); return std::move(future).thenValue( [predicate = static_cast(predicate), thunk = static_cast(thunk)](auto&&) mutable { return whileDo(static_cast(predicate), static_cast(thunk)); }); } return makeFuture(); } template auto times(const int n, F&& thunk) { return folly::whileDo( [n, count = std::make_unique>(0)]() mutable { return count->fetch_add(1, std::memory_order_relaxed) < n; }, static_cast(thunk)); } namespace futures { template std::vector> mapValue(It first, It last, F func) { std::vector> results; results.reserve(std::distance(first, last)); for (auto it = first; it != last; it++) { results.push_back(std::move(*it).thenValue(func)); } return results; } template std::vector> mapTry(It first, It last, F func, int) { std::vector> results; results.reserve(std::distance(first, last)); for (auto it = first; it != last; it++) { results.push_back(std::move(*it).thenTry(func)); } return results; } template std::vector> mapValue( Executor& exec, It first, It last, F func) { std::vector> results; results.reserve(std::distance(first, last)); for (auto it = first; it != last; it++) { results.push_back(std::move(*it).via(&exec).thenValue(func)); } return results; } template std::vector> mapTry( Executor& exec, It first, It last, F func, int) { std::vector> results; results.reserve(std::distance(first, last)); for (auto it = first; it != last; it++) { results.push_back(std::move(*it).via(&exec).thenTry(func)); } return results; } template auto ensure(F&& f, Ensure&& ensure) { return makeSemiFuture() .deferValue([f = static_cast(f)](auto) mutable { return f(); }) .defer([ensure = static_cast(ensure)](auto resultTry) mutable { ensure(); return std::move(resultTry).value(); }); } template void detachOn(folly::Executor::KeepAlive<> exec, folly::SemiFuture&& fut) { std::move(fut).via(exec).detach(); } template void detachOnGlobalCPUExecutor(folly::SemiFuture&& fut) { detachOn(folly::getGlobalCPUExecutor(), std::move(fut)); } template void maybeDetachOnGlobalExecutorAfter( HighResDuration dur, folly::SemiFuture&& fut) { sleep(dur).toUnsafeFuture().thenValue([fut = std::move(fut)](auto&&) mutable { if (auto ptr = folly::detail::tryGetImmutableCPUPtr()) { detachOn(folly::getKeepAliveToken(ptr.get()), std::move(fut)); } }); } template void detachWithoutExecutor(folly::SemiFuture&& fut) { auto executor = futures::detail::stealDeferredExecutor(fut); // Fail if we try to detach a SemiFuture with deferred work DCHECK(executor.get() == nullptr); if (executor) { executor.get()->detach(); } } } // namespace futures template SemiFuture Timekeeper::at(std::chrono::time_point when) { auto now = Clock::now(); if (when <= now) { return makeSemiFuture(); } return after(std::chrono::duration_cast(when - now)); } #if FOLLY_USE_EXTERN_FUTURE_UNIT // limited to the instances unconditionally forced by the futures library namespace futures { namespace detail { extern template class FutureBase; } // namespace detail } // namespace futures extern template class Future; extern template class SemiFuture; #endif } // namespace folly