/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include namespace folly { namespace fibers { /** * An exception class that gets thrown when the AtomicBatchDispatcher is used * incorrectly. This is indicative of a bug in the user code. * Examples are, multiple dispatch calls on the same token, trying to get more * tokens from the dispatcher after commit has been called, etc. */ class FOLLY_EXPORT ABDUsageException : public std::logic_error { using std::logic_error::logic_error; }; /** * An exception class that gets set on the promise for dispatched tokens, when * the AtomicBatchDispatcher was destroyed before commit was called on it. */ class FOLLY_EXPORT ABDCommitNotCalledException : public std::runtime_error { public: ABDCommitNotCalledException() : std::runtime_error( "AtomicBatchDispatcher destroyed before commit() was called") {} }; /** * An exception class that gets set on the promise for dispatched tokens, when * one or more other tokens in the batch were destroyed before dispatch was * called on them. * Only here so that the caller can distinguish the real failure cause * rather than these subsequently thrown exceptions. */ class FOLLY_EXPORT ABDTokenNotDispatchedException : public std::runtime_error { using std::runtime_error::runtime_error; }; /** * AtomicBatchDispatcher should be used if you want to process fiber tasks in * parallel, but require to synchronize them at some point. The canonical * example is to create a database transaction dispatch round. This API notably * enforces that all tasks in the batch have reached the synchronization point * before the user provided dispatch function is called with all the inputs * provided in one function call. It also provides a guarantee that the inputs * in the vector of inputs passed to the user provided dispatch function will be * in the same order as the order in which the token for the job was issued. * * Use this when you want all the inputs in the batch to be processed by a * single function call to the user provided dispatch function. * The user provided dispatch function takes a vector of InputT as input and * returns a vector of ResultT. * To use an AtomicBatchDispatcher, create it by providing a dispatch function: * TO EITHER the constructor of the AtomicBatchDispatcher class * (can call reserve method on the dispatcher to reserve space (for number of * inputs expected)), * OR the createAtomicBatchDispatcher function in folly::fibers namespace * (optionally specify an initial capacity (for number of inputs expected)). * The AtomicBatchDispatcher object created using this call (dispatcher), * is the only object that can issue tokens (Token objects) that are used to * add an input to the batch. A single Token is issued when the user calls * the getToken function on the dispatcher. * Token objects cannot be copied (can only be moved). User can call the public * dispatch function on the Token providing a single input value. The dispatch * function returns a folly::Future value that the user can then wait * on to obtain a ResultT value. The ResultT value will only be available once * the dispatch function has been called on all the Tokens in the batch and the * user has called dispatcher.commit() to indicate no more batched transactions * are to be added. * User code pertaining to a task can be run between the point where a token for * the task has been issued and before calling the dispatch function on the * token. Since this code can potentially throw, the token issued for a task * should be moved into this processing code in such a way that if an exception * is thrown and then handled, the token object for the task is destroyed. * The batch query dispatcher will wait until all tokens have either been * destroyed or have had the dispatch function called on them. Leaking an * issued token will cause the batch dispatch to wait forever to happen. * * The AtomicBatchDispatcher object is referred to as the dispatcher below. * * POSSIBLE ERRORS: * 1) The dispatcher is destroyed before calling commit on it, for example * because the user forgot to call commit OR an exception was thrown * in user code before the call to commit: * - The future ResultT has an exception of type ABDCommitNotCalledException * set for all tokens that were issued by the dispatcher (once all tokens * are either destroyed or have called dispatch) * 2) Calling the dispatch function more than once on the same Token object * (or a moved version of the same Token): * - Subsequent calls to dispatch (after the first one) will throw an * ABDUsageException exception (the batch itself will not have any errors * and will get processed) * 3) One/more of the Tokens issued are destroyed before calling dispatch on * it/them: * - The future ResultT has an ABDTokenNotDispatchedException set for all * tokens that were issued by the dispatcher (once all tokens are either * destroyed or have called dispatch) * 4) dispatcher.getToken() is called after calling dispatcher.commit() * - the call to getToken() will throw an ABDUsageException exception * (the batch itself will not have any errors and will get processed). * 5) All tokens were issued and called dispatch, the user provided batch * dispatch function is called, but that function throws any exception. * - The future ResultT has exception for all tokens that were issued by * the dispatcher. The result will contain the wrapped user exception. * * EXAMPLE (There are other ways to achieve this, but this is one example): * - User creates an AtomicBatchDispatcher on stack * auto dispatcher = * folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count); * - User creates "count" number of token objects by calling "getToken" count * number of times * std::vector jobs; * for (size_t i = 0; i < count; ++i) { * auto token = dispatcher.getToken(); * jobs.push_back(Job(std::move(token), singleInputValueToProcess); * } * - User calls commit() on the dispatcher to indicate that no new tokens will * be issued for this batch * dispatcher.commit(); * - Use any single threaded executor that will process the jobs * - On each execution (fiber) preprocess a single "Job" that has been moved in * from the original vector "jobs". This way if the preprocessing throws * the Job object being processed is destroyed and so is the token. * - On each execution (fiber) call the dispatch on the token * auto future = job.token.dispatch(job.input); * - Save the future returned so that eventually you can wait on the results * ResultT result; * try { * result = future.value(); * // future.hasValue() is true * } catch (...) { * // future.hasException() is true * } * } * * NOTES: * - AtomicBatchDispatcher is not thread safe. * - Works for executors that run tasks on a single thread. */ template class AtomicBatchDispatcher { private: struct DispatchBaton; friend struct DispatchBaton; public: using DispatchFunctionT = folly::Function(std::vector&&)>; class Token { public: explicit Token(std::shared_ptr baton, size_t sequenceNumber); Future dispatch(InputT input); // Allow moving a Token object Token(Token&&) = default; Token& operator=(Token&&) = default; size_t sequenceNumber() const; private: // Disallow copying a Token object Token(const Token&) = delete; Token& operator=(const Token&) = delete; std::shared_ptr baton_; size_t sequenceNumber_; }; explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc); ~AtomicBatchDispatcher(); // numEntries is a *hint* about the number of inputs to expect: // - It is used purely to reserve space for storing vector of inputs etc., // so that reeallocation and move copy are reduced / not needed. // - It is provided purely for performance reasons void reserve(size_t numEntries); Token getToken(); void commit(); // Allow moving an AtomicBatchDispatcher object AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default; AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default; private: // Disallow copying an AtomicBatchDispatcher object AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete; AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete; size_t numTokensIssued_; std::shared_ptr baton_; }; // initialCapacity is a *hint* about the number of inputs to expect: // - It is used purely to reserve space for storing vector of inputs etc., // so that reeallocation and move copy are reduced / not needed. // - It is provided purely for performance reasons template AtomicBatchDispatcher createAtomicBatchDispatcher( folly::Function(std::vector&&)> dispatchFunc, size_t initialCapacity = 0); } // namespace fibers } // namespace folly #include