/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <array> #include <atomic> #include <cstdint> #include <limits> #include <stdexcept> #include <thread> #include <utility> #include <glog/logging.h> #include <folly/ConstexprMath.h> #include <folly/Likely.h> #include <folly/Portability.h> #include <folly/ScopeGuard.h> #include <folly/Utility.h> #include <folly/chrono/Hardware.h> #include <folly/detail/Futex.h> #include <folly/functional/Invoke.h> #include <folly/lang/Align.h> #include <folly/lang/Bits.h> #include <folly/portability/Asm.h> #include <folly/synchronization/AtomicNotification.h> #include <folly/synchronization/AtomicUtil.h> #include <folly/synchronization/Lock.h> #include <folly/synchronization/detail/InlineFunctionRef.h> #include <folly/synchronization/detail/Sleeper.h> namespace folly { namespace detail { namespace distributed_mutex { // kUnlocked is used to show unlocked state // // When locking threads encounter kUnlocked in the underlying storage, they // can just acquire the lock without any further effort constexpr auto kUnlocked = std::uintptr_t{0b0}; // kLocked is used to show that the mutex is currently locked, and future // attempts to lock the mutex should enqueue on the central storage // // Locking threads find this on central storage only when there is a // contention chain that is undergoing wakeups, in every other case, a locker // will either find kUnlocked or an arbitrary address with the kLocked bit set constexpr auto kLocked = std::uintptr_t{0b1}; // kTimedWaiter is set when there is at least one timed waiter on the mutex // // Timed waiters do not follow the sleeping strategy employed by regular, // non-timed threads. They sleep on the central mutex atomic through an // extended futex() interface that allows sleeping with the same semantics for // non-standard integer widths // // When a regular non-timed thread unlocks or enqueues on the mutex, and sees // a timed waiter, it takes ownership of all the timed waiters. The thread // that has taken ownership of the timed waiter releases the timed waiters // when it gets a chance at the critical section. At which point it issues a // wakeup to single timed waiter, timed waiters always issue wake() calls to // other timed waiters constexpr auto kTimedWaiter = std::uintptr_t{0b10}; // kUninitialized means that the thread has just enqueued, and has not yet // gotten to initializing itself with the address of its successor // // this becomes significant for threads that are trying to wake up the // uninitialized thread, if they see that the thread is not yet initialized, // they can do nothing but spin, and wait for the thread to get initialized // // This also plays a role in the functioning of flat combining as implemented // in DistributedMutex. When a thread owning the lock goes through the // contention chain to either unlock the mutex or combine critical sections // from the other end. The presence of kUninitialized means that the // combining thread is not able to make progress after this point. So we // transfer the lock. constexpr auto kUninitialized = std::uint32_t{0b0}; // kWaiting will be set in the waiter's futex structs while they are spinning // while waiting for the mutex constexpr auto kWaiting = std::uint32_t{0b1}; // kWake will be set by threads that are waking up waiters that have enqueued constexpr auto kWake = std::uint32_t{0b10}; // kSkipped will be set by a waker when they see that a waiter has been // preempted away by the kernel, in this case the thread that got skipped will // have to wake up and put itself back on the queue constexpr auto kSkipped = std::uint32_t{0b11}; // kAboutToWait will be set by a waiter that enqueues itself with the purpose // of waiting on a futex constexpr auto kAboutToWait = std::uint32_t{0b100}; // kSleeping will be set by a waiter right before enqueueing on a futex. When // a thread wants to wake up a waiter that has enqueued on a futex, it should // set the futex to contain kWake // // a thread that is unlocking and wants to skip over a sleeping thread also // calls futex_.exchange(kSleeping) on the sleeping thread's futex word. It // does this to 1. detect whether the sleeping thread had actually gone to // sleeping on the futex word so it can skip it, and 2. to synchronize with // other non atomic writes in the sleeping thread's context (such as the write // to track the next waiting thread). // // We reuse kSleeping instead of say using another constant kEarlyDelivery to // avoid situations where a thread has to enter kernel mode due to calling // futexWait() twice because of the presence of a waking thread. This // situation can arise when an unlocking thread goes to skip over a sleeping // thread, sees that the thread has slept and move on, but the sleeping thread // had not yet entered futex(). This interleaving causes the thread calling // futex() to return spuriously, as the futex word is not what it should be constexpr auto kSleeping = std::uint32_t{0b101}; // kCombined is set by the lock holder to let the waiter thread know that its // combine request was successfully completed by the lock holder. A // successful combine means that the thread requesting the combine operation // does not need to unlock the mutex; in fact, doing so would be an error. constexpr auto kCombined = std::uint32_t{0b111}; // kCombineUninitialized is like kUninitialized but is set by a thread when it // enqueues in hopes of getting its critical section combined with the lock // holder constexpr auto kCombineUninitialized = std::uint32_t{0b1000}; // kCombineWaiting is set by a thread when it is ready to have its combine // record fulfilled by the lock holder. In particular, this signals to the // lock holder that the thread has set its next_ pointer in the contention // chain constexpr auto kCombineWaiting = std::uint32_t{0b1001}; // kExceptionOccurred is set on the waiter futex when the remote task throws // an exception. It is the caller's responsibility to retrieve the exception // and rethrow it in their own context. Note that when the caller uses a // noexcept function as their critical section, they can avoid checking for // this value // // This allows us to avoid all cost of exceptions in the memory layout of the // fast path (no errors) as exceptions are stored as an std::exception_ptr in // the same union that stores the return value of the critical section. We // also avoid all CPU overhead because the combiner uses a try-catch block // without any additional branching to handle exceptions constexpr auto kExceptionOccurred = std::uint32_t{0b1010}; // The number of spins that we are allowed to do before we resort to marking a // thread as having slept // // This is just a magic number from benchmarks constexpr auto kScheduledAwaySpinThreshold = std::chrono::nanoseconds{200}; // The maximum number of spins before a thread starts yielding its processor // in hopes of getting skipped constexpr auto kMaxSpins = 4000; // The maximum number of contention chains we can resolve with flat combining. // After this number of contention chains, the mutex falls back to regular // two-phased mutual exclusion to ensure that we don't starve the combiner // thread constexpr auto kMaxCombineIterations = 2; /** * Write only data that is available to the thread that is waking up another. * Only the waking thread is allowed to write to this, the thread to be woken * is allowed to read from this after a wakeup has been issued */ template <template <typename> class Atomic> class WakerMetadata { public: // This is the thread that initiated wakeups for the contention chain. // There can only ever be one thread that initiates the wakeup for a // chain in the spin only version of this mutex. When a thread that just // woke up sees this as the next thread to wake up, it knows that it is the // terminal node in the contention chain. This means that it was the one // that took off the thread that had acquired the mutex off the centralized // state. Therefore, the current thread is the last in its contention // chain. It will fall back to centralized storage to pick up the next // waiter or release the mutex // // When we move to a full sleeping implementation, this might need to change // to a small_vector<> to account for failed wakeups, or we can put threads // to sleep on the central futex, which is an easier implementation // strategy. Although, since this is allocated on the stack, we can set a // prohitively large threshold to avoid heap allocations, this strategy // however, might cause increased cache misses on wakeup signalling std::uintptr_t waker_{0}; // the list of threads that the waker had previously seen to be sleeping on // a futex(), // // this is given to the current thread as a means to pass on // information. When the current thread goes to unlock the mutex and does // not see contention, it should go and wake up the head of this list. If // the current thread sees a contention chain on the mutex, it should pass // on this list to the next thread that gets woken up std::uintptr_t waiters_{0}; // The futex that this waiter will sleep on // // how can we reuse futex_ from above for futex management? Futex<Atomic> sleeper_{kUninitialized}; }; /** * Type of the type-erased callable that is used for combining from the lock * holder's end. This has 48 bytes of inline storage that can be used to * minimize cache misses when combining */ using CombineFunction = detail::InlineFunctionRef<void(), 48>; /** * Waiter encapsulates the state required for waiting on the mutex, this * contains potentially heavy state and is intended to be allocated on the * stack as part of a lock() function call * * To ensure that synchronization does not cause unintended side effects on * the rest of the thread stack (eg. metadata in lockImplementation(), or any * other data in the user's thread), we aggresively pad this struct and use * custom alignment internally to ensure that the relevant data fits within a * single cacheline. The added alignment here also gives us some room to * wiggle in the bottom few bits of the mutex, where we store extra metadata */ template <template <typename> class Atomic> class Waiter { public: Waiter() {} Waiter(Waiter&&) = delete; Waiter(const Waiter&) = delete; Waiter& operator=(Waiter&&) = delete; Waiter& operator=(const Waiter&) = delete; void initialize(std::uint64_t futex, CombineFunction task) { // we only initialize the function if we were actually given a non-null // task, otherwise if (task) { DCHECK_EQ(futex, kCombineUninitialized); new (&function_) CombineFunction{task}; } else { DCHECK((futex == kUninitialized) || (futex == kAboutToWait)); new (&metadata_) WakerMetadata<Atomic>{}; } // this pedantic store is needed to ensure that the waking thread // synchronizes with the state in the waiter struct when it loads the // value of the futex word // // on x86, this gets optimized away to just a regular store, it might be // needed on platforms where explicit acquire-release barriers are // required for synchronization // // note that we release here at the end of the constructor because // construction is complete here, any thread that acquires this release // will see a well constructed wait node futex_.store(futex, std::memory_order_release); } std::array<std::uint8_t, hardware_destructive_interference_size> padding1; // the atomic that this thread will spin on while waiting for the mutex to // be unlocked alignas(hardware_destructive_interference_size) Atomic<std::uint64_t> futex_{ kUninitialized}; // The successor of this node. This will be the thread that had its address // on the mutex previously // // We can do without making this atomic since the remote thread synchronizes // on the futex variable above. If this were not atomic, the remote thread // would only be allowed to read from it after the waiter has moved into the // waiting state to avoid risk of a load racing with a write. However, it // helps to make this atomic because we can use an unconditional load and make // full use of the load buffer to coalesce both reads into a single clock // cycle after the line arrives in the combiner core. This is a heavily // contended line, so an RFO from the enqueueing thread is highly likely and // has the potential to cause an immediate invalidation; blocking the combiner // thread from making progress until the line is pulled back to read this // value // // Further, making this atomic prevents the compiler from making an incorrect // optimization where it does not load the value as written in the code, but // rather dereferences it through a pointer whenever needed (since the value // of the pointer to this is readily available on the stack). Doing this // causes multiple invalidation requests from the enqueueing thread, blocking // remote progress // // Note that we use relaxed loads and stores, so this should not have any // additional overhead compared to a regular load on most architectures std::atomic<std::uintptr_t> next_{0}; // We use an anonymous union for the combined critical section request and // the metadata that will be filled in from the leader's end. Only one is // active at a time - if a leader decides to combine the requested critical // section into its execution, it will not touch the metadata field. If a // leader decides to migrate the lock to the waiter, it will not touch the // function // // this allows us to transfer more state when combining a critical section // and reduce the cache misses originating from executing an arbitrary // lambda // // note that this is an anonymous union, not an unnamed union, the members // leak into the surrounding scope union { // metadata for the waker WakerMetadata<Atomic> metadata_; // The critical section that can potentially be combined into the critical // section of the locking thread // // This is kept as a FunctionRef because the original function is preserved // until the lock_combine() function returns. A consequence of using // FunctionRef here is that we don't need to do any allocations and can // allow users to capture unbounded state into the critical section. Flat // combining means that the user does not have access to the thread // executing the critical section, so assumptions about thread local // references can be invalidated. Being able to capture arbitrary state // allows the user to do thread local accesses right before the critical // section and pass them as state to the callable being referenced here CombineFunction function_; // The user is allowed to use a combined critical section that returns a // value. This buffer is used to implement the value transfer to the // waiting thread. We reuse the same union because this helps us combine // one synchronization operation with a material value transfer. // // The waker thread needs to synchronize on this cacheline to issue a // wakeup to the waiter, meaning that the entire line needs to be pulled // into the remote core in exclusive mode. So we reuse the coherence // operation to transfer the return value in addition to the // synchronization signal. In the case that the user's data item is // small, the data is transferred all inline as part of the same line, // which pretty much arrives into the CPU cache in the same clock cycle or // two after a read-for-ownership request. This gives us a high chance of // coalescing the entire transitive store buffer together into one cache // coherence operation from the waker's end. This allows us to make use // of the CPU bus bandwidth which would have otherwise gone to waste. // Benchmarks prove this theory under a wide range of contention, value // sizes, NUMA interactions and processor models // // The current version of the Intel optimization manual confirms this // theory somewhat as well in section 2.3.5.1 (Load and Store Operation // Overview) // // When an instruction writes data to a memory location [...], the // processor ensures that it has the line containing this memory location // is in its L1d cache [...]. If the cache line is not there, it fetches // from the next levels using a RFO request [...] RFO and storing the // data happens after instruction retirement. Therefore, the store // latency usually does not affect the store instruction itself // // This gives the user the ability to input up to 48 bytes into the // combined critical section through an InlineFunctionRef and output 48 // bytes from it basically without any cost. The type of the entity // stored in the buffer has to be matched by the type erased callable that // the caller has used. At this point, the caller is still in the // template instantiation leading to the combine request, so it has // knowledge of the return type and can apply the appropriate // reinterpret_cast and launder operation to safely retrieve the data from // this buffer std::aligned_storage_t<48, 8> storage_; }; std::array<std::uint8_t, hardware_destructive_interference_size> padding2; }; /** * A template that helps us differentiate between the different ways to return * a value from a combined critical section. A return value of type void * cannot be stored anywhere, so we use specializations and pick the right one * switched through std::conditional_t * * This is then used by CoalescedTask and its family of functions to implement * efficient return value transfers to the waiting threads */ template <typename Func> class RequestWithReturn { public: using F = Func; using ReturnType = folly::invoke_result_t<const Func&>; explicit RequestWithReturn(Func func) : func_{std::move(func)} {} /** * We need to define the destructor here because C++ requires (with good * reason) that a union with non-default destructor be explicitly destroyed * from the surrounding class, as neither the runtime nor compiler have the * knowledge of what to do with a union at the time of destruction * * Each request that has a valid return value set will have the value * retrieved from the get() method, where the value is destroyed. So we * don't need to destroy it here */ ~RequestWithReturn() {} /** * This method can be used to return a value from the request. This returns * the underlying value because return type of the function we were * instantiated with is not void */ ReturnType get() && { // when the return value has been processed, we destroy the value // contained in this request. Using a scope_exit means that we don't have // to worry about storing the value somewhere and causing potentially an // extra move // // note that the invariant here is that this function is only called if the // requesting thread had it's critical section combined, and the value_ // member constructed through detach() SCOPE_EXIT { value_.~ReturnType(); }; return std::move(value_); } // this contains a copy of the function the waiter had requested to be // executed as a combined critical section Func func_; // this stores the return value used in the request, we use a union here to // avoid laundering and allow return types that are not default // constructible to be propagated through the execution of the critical // section // // note that this is an anonymous union, the member leaks into the // surrounding scope as a member variable union { ReturnType value_; }; }; template <typename Func> class RequestWithoutReturn { public: using F = Func; using ReturnType = void; explicit RequestWithoutReturn(Func func) : func_{std::move(func)} {} /** * In this version of the request class, get() returns nothing as there is * no stored value */ void get() && {} // this contains a copy of the function the waiter had requested to be // executed as a combined critical section Func func_; }; // we need to use std::integral_constant::value here as opposed to // std::integral_constant::operator T() because MSVC errors out with the // implicit conversion template <typename Func> using Request = std::conditional_t< std::is_void<folly::invoke_result_t<const Func&>>::value, RequestWithoutReturn<Func>, RequestWithReturn<Func>>; /** * A template that helps us to transform a callable returning a value to one * that returns void so it can be type erased and passed on to the waker. If * the return value is small enough, it gets coalesced into the wait struct * for optimal data transfer. When it's not small enough to fit in the waiter * storage buffer, we place it on it's own cacheline with isolation to prevent * false-sharing with the on-stack metadata of the waiter thread * * This helps a combined critical section feel more normal in the case where * the user wants to return a value, for example * * auto value = mutex_.lock_combine([&]() { * return data_.value(); * }); * * Without this, the user would typically create a dummy object that they * would then assign to from within the lambda. With return value chaining, * this pattern feels more natural * * Note that it is important to copy the entire callble into this class. * Storing something like a reference instead is not desirable because it does * not allow InlineFunctionRef to use inline storage to represent the user's * callable without extra indirections * * We use std::conditional_t and switch to the right type of task with the * CoalescedTask type alias */ template <typename Func, typename Waiter> class TaskWithCoalesce { public: using ReturnType = folly::invoke_result_t<const Func&>; using StorageType = folly::Unit; explicit TaskWithCoalesce(Func func, Waiter& waiter) : func_{std::move(func)}, waiter_{waiter} {} void operator()() const { auto value = func_(); new (&waiter_.storage_) ReturnType{std::move(value)}; } private: Func func_; Waiter& waiter_; static_assert(!std::is_void<ReturnType>{}, ""); static_assert(alignof(decltype(waiter_.storage_)) >= alignof(ReturnType), ""); static_assert(sizeof(decltype(waiter_.storage_)) >= sizeof(ReturnType), ""); }; template <typename Func, typename Waiter> class TaskWithoutCoalesce { public: using ReturnType = void; using StorageType = folly::Unit; explicit TaskWithoutCoalesce(Func func, Waiter&) : func_{std::move(func)} {} void operator()() const { func_(); } private: Func func_; }; template <typename Func, typename Waiter> class TaskWithBigReturnValue { public: // Using storage that is aligned on the cacheline boundary helps us avoid a // situation where the data ends up being allocated on two separate // cachelines. This would require the remote thread to pull in both lines // to issue a write. // // We also isolate the storage by appending some padding to the end to // ensure we avoid false-sharing with the metadata used while the waiter // waits using ReturnType = folly::invoke_result_t<const Func&>; static const auto kReturnValueAlignment = folly::constexpr_max( alignof(ReturnType), folly::hardware_destructive_interference_size); using StorageType = std::aligned_storage_t< sizeof(std::aligned_storage_t<sizeof(ReturnType), kReturnValueAlignment>), kReturnValueAlignment>; explicit TaskWithBigReturnValue(Func func, Waiter&) : func_{std::move(func)} {} void operator()() const { DCHECK(storage_); auto value = func_(); new (storage_) ReturnType{std::move(value)}; } void attach(StorageType* storage) { DCHECK(!storage_); storage_ = storage; } private: Func func_; StorageType* storage_{nullptr}; static_assert(!std::is_void<ReturnType>{}, ""); static_assert(sizeof(Waiter::storage_) < sizeof(ReturnType), ""); }; template <typename T, bool> struct Sizeof_; template <typename T> struct Sizeof_<T, false> : index_constant<sizeof(T)> {}; template <typename T> struct Sizeof_<T, true> : index_constant<0> {}; template <typename T> struct Sizeof : Sizeof_<T, std::is_void<T>::value> {}; // we need to use std::integral_constant::value here as opposed to // std::integral_constant::operator T() because MSVC errors out with the // implicit conversion template <typename Func, typename Waiter> using CoalescedTask = std::conditional_t< std::is_void<folly::invoke_result_t<const Func&>>::value, TaskWithoutCoalesce<Func, Waiter>, std::conditional_t< Sizeof<folly::invoke_result_t<const Func&>>::value <= sizeof(Waiter::storage_), TaskWithCoalesce<Func, Waiter>, TaskWithBigReturnValue<Func, Waiter>>>; /** * Given a request and a wait node, coalesce them into a CoalescedTask that * coalesces the return value into the wait node when invoked from a remote * thread * * When given a null request through nullptr_t, coalesce() returns null as well */ template <typename Waiter> std::nullptr_t coalesce(std::nullptr_t&, Waiter&) { return nullptr; } template < typename Request, typename Waiter, typename Func = typename Request::F> CoalescedTask<Func, Waiter> coalesce(Request& request, Waiter& waiter) { static_assert(!std::is_same<Request, std::nullptr_t>{}, ""); return CoalescedTask<Func, Waiter>{request.func_, waiter}; } /** * Given a task, create storage for the return value. When we get a type * of CoalescedTask, this returns an instance of CoalescedTask::StorageType. * std::nullptr_t otherwise */ inline std::nullptr_t makeReturnValueStorageFor(std::nullptr_t&) { return {}; } template < typename CoalescedTask, typename StorageType = typename CoalescedTask::StorageType> StorageType makeReturnValueStorageFor(CoalescedTask&) { return {}; } /** * Given a task and storage, attach them together if needed. This only helps * when we have a task that returns a value bigger than can be coalesced. In * that case, we need to attach the storage with the task so the return value * can be transferred to this thread from the remote thread */ template <typename Task, typename Storage> void attach(Task&, Storage&) { static_assert( std::is_same<Storage, std::nullptr_t>{} || std::is_same<Storage, folly::Unit>{}, ""); } template < typename R, typename W, typename StorageType = typename TaskWithBigReturnValue<R, W>::StorageType> void attach(TaskWithBigReturnValue<R, W>& task, StorageType& storage) { task.attach(&storage); } template <typename Request, typename Waiter> void throwIfExceptionOccurred(Request&, Waiter& waiter, bool exception) { using Storage = decltype(waiter.storage_); using F = typename Request::F; static_assert(sizeof(Storage) >= sizeof(std::exception_ptr), ""); static_assert(alignof(Storage) >= alignof(std::exception_ptr), ""); // we only need to check for an exception in the waiter struct if the passed // callable is not noexcept // // we need to make another instance of the exception with automatic storage // duration and destroy the exception held in the storage *before throwing* to // avoid leaks. If we don't destroy the exception_ptr in storage, the // refcount for the internal exception will never hit zero, thereby leaking // memory if (UNLIKELY(!folly::is_nothrow_invocable_v<const F&> && exception)) { auto storage = &waiter.storage_; auto exc = folly::launder(reinterpret_cast<std::exception_ptr*>(storage)); auto copy = std::move(*exc); exc->std::exception_ptr::~exception_ptr(); std::rethrow_exception(std::move(copy)); } } /** * Given a CoalescedTask, a wait node and a request. Detach the return value * into the request from the wait node and task. */ template <typename Waiter> void detach(std::nullptr_t&, Waiter&, bool exception, std::nullptr_t&) { DCHECK(!exception); } template <typename Waiter, typename F> void detach( RequestWithoutReturn<F>& request, Waiter& waiter, bool exception, folly::Unit&) { throwIfExceptionOccurred(request, waiter, exception); } template <typename Waiter, typename F> void detach( RequestWithReturn<F>& request, Waiter& waiter, bool exception, folly::Unit&) { throwIfExceptionOccurred(request, waiter, exception); using ReturnType = typename RequestWithReturn<F>::ReturnType; static_assert(!std::is_same<ReturnType, void>{}, ""); static_assert(sizeof(waiter.storage_) >= sizeof(ReturnType), ""); auto& val = *folly::launder(reinterpret_cast<ReturnType*>(&waiter.storage_)); new (&request.value_) ReturnType{std::move(val)}; val.~ReturnType(); } template <typename Waiter, typename F, typename Storage> void detach( RequestWithReturn<F>& request, Waiter& waiter, bool exception, Storage& storage) { throwIfExceptionOccurred(request, waiter, exception); using ReturnType = typename RequestWithReturn<F>::ReturnType; static_assert(!std::is_same<ReturnType, void>{}, ""); static_assert(sizeof(storage) >= sizeof(ReturnType), ""); auto& val = *folly::launder(reinterpret_cast<ReturnType*>(&storage)); new (&request.value_) ReturnType{std::move(val)}; val.~ReturnType(); } /** * Get the time since epoch in nanoseconds * * This is faster than std::chrono::steady_clock because it avoids a VDSO * access to get the timestamp counter * * Note that the hardware timestamp counter on x86, like std::steady_clock is * guaranteed to be monotonically increasing - * https://c9x.me/x86/html/file_module_x86_id_278.html */ inline std::chrono::nanoseconds time() { return std::chrono::nanoseconds{hardware_timestamp()}; } /** * Zero out the other bits used by the implementation and return just an * address from a uintptr_t */ template <typename Type> Type* extractPtr(std::uintptr_t from) { // shift one bit off the end, to get all 1s followed by a single 0 auto mask = std::numeric_limits<std::uintptr_t>::max(); mask >>= 1; mask <<= 1; CHECK(!(mask & 0b1)); return folly::bit_cast<Type*>(from & mask); } /** * Strips the given nanoseconds into only the least significant 56 bits by * moving the least significant 56 bits over by 8 zeroing out the bottom 8 * bits to be used as a medium of information transfer for the thread wait * nodes */ inline std::uint64_t strip(std::chrono::nanoseconds t) { auto time = t.count(); return static_cast<std::uint64_t>(time) << 8; } /** * Recover the timestamp value from an integer that has the timestamp encoded * in it */ inline std::uint64_t recover(std::uint64_t from) { return from >> 8; } template <template <typename> class Atomic, bool TimePublishing> class DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy { public: DistributedMutexStateProxy() = default; DistributedMutexStateProxy(DistributedMutexStateProxy const&) = default; DistributedMutexStateProxy& operator=(DistributedMutexStateProxy const&) = default; // The proxy is valid when a mutex acquisition attempt was successful, // lock() is guaranteed to return a valid proxy, try_lock() is not explicit operator bool() const { return expected_; } // private: // friend the mutex class, since that will be accessing state private to // this class friend class DistributedMutex<Atomic, TimePublishing>; DistributedMutexStateProxy( Waiter<Atomic>* next, std::uintptr_t expected, bool timedWaiter = false, bool combined = false, std::uintptr_t waker = 0, Waiter<Atomic>* waiters = nullptr, Waiter<Atomic>* ready = nullptr) : next_{next}, expected_{expected}, timedWaiters_{timedWaiter}, combined_{combined}, waker_{waker}, waiters_{waiters}, ready_{ready} {} // the next thread that is to be woken up, this being null at the time of // unlock() shows that the current thread acquired the mutex without // contention or it was the terminal thread in the queue of threads waking up Waiter<Atomic>* next_{nullptr}; // this is the value that the current thread should expect to find on // unlock, and if this value is not there on unlock, the current thread // should assume that other threads are enqueued waiting for the mutex // // note that if the mutex has the same state set at unlock time, and this is // set to an address (and not say kLocked in the case of a terminal waker) // then it must have been the case that no other thread had enqueued itself, // since threads in the domain of this mutex do not share stack space // // if we want to support stack sharing, we can solve the problem by looping // at lock time, and setting a variable that says whether we have acquired // the lock or not perhaps std::uintptr_t expected_{0}; // a boolean that will be set when the mutex has timed waiters that the // current thread is responsible for waking, in such a case, the current // thread will issue an atomic_notify_one() call after unlocking the mutex // // note that a timed waiter will itself always have this flag set. This is // done so we can avoid having to issue a atomic_notify_all() call (and // subsequently a thundering herd) when waking up timed-wait threads bool timedWaiters_{false}; // a boolean that contains true if the state proxy is not meant to be passed // to the unlock() function. This is set only when there is contention and // a thread had asked for its critical section to be combined bool combined_{false}; // metadata passed along from the thread that woke this thread up std::uintptr_t waker_{0}; // the list of threads that are waiting on a futex // // the current threads is meant to wake up this list of waiters if it is // able to commit an unlock() on the mutex without seeing a contention chain Waiter<Atomic>* waiters_{nullptr}; // after a thread has woken up from a futex() call, it will have the rest of // the threads that it were waiting behind it in this list, a thread that // unlocks has to wake up threads from this list if it has any, before it // goes to sleep to prevent pathological unfairness Waiter<Atomic>* ready_{nullptr}; }; template <template <typename> class Atomic, bool TimePublishing> DistributedMutex<Atomic, TimePublishing>::DistributedMutex() : state_{kUnlocked} {} template <typename Waiter> std::uint64_t publish( std::uint64_t spins, bool& shouldPublish, std::chrono::nanoseconds& previous, Waiter& waiter, std::uint32_t waitMode) { // time publishing has some overhead because it executes an atomic exchange on // the futex word. If this line is in a remote thread (eg. the combiner), // then each time we publish a timestamp, this thread has to submit an RFO to // the remote core for the cacheline, blocking progress for both threads. // // the remote core uses a store in the fast path - why then does an RFO make a // difference? The only educated guess we have here is that the added // roundtrip delays draining of the store buffer, which essentially exerts // backpressure on future stores, preventing parallelization // // if we have requested a combine, time publishing is less important as it // only comes into play when the combiner has exhausted their max combine // passes. So we defer time publishing to the point when the current thread // gets preempted auto current = time(); if (previous != decltype(time())::zero() && (current - previous) >= kScheduledAwaySpinThreshold) { shouldPublish = true; } previous = current; // if we have requested a combine, and this is the first iteration of the // wait-loop, we publish a max timestamp to optimistically convey that we have // not yet been preempted (the remote knows the meaning of max timestamps) // // then if we are under the maximum number of spins allowed before sleeping, // we publish the exact timestamp, otherwise we publish the minimum possible // timestamp to force the waking thread to skip us auto now = ((waitMode == kCombineWaiting) && !spins) ? decltype(time())::max() : (spins < kMaxSpins) ? previous : decltype(time())::zero(); // the wait mode information is published in the bottom 8 bits of the futex // word, the rest contains time information as computed above. Overflows are // not really a correctness concern because time publishing is only a // heuristic. This leaves us 56 bits of nanoseconds (2 years) before we hit // two consecutive wraparounds, so the lack of bits to respresent time is // neither a performance nor correctness concern auto data = strip(now) | waitMode; auto signal = (shouldPublish || !spins || (waitMode != kCombineWaiting)) ? waiter.futex_.exchange(data, std::memory_order_acq_rel) : waiter.futex_.load(std::memory_order_acquire); return signal & std::numeric_limits<std::uint8_t>::max(); } template <typename Waiter> bool spin(Waiter& waiter, std::uint32_t& sig, std::uint32_t mode) { auto spins = std::uint64_t{0}; auto waitMode = (mode == kCombineUninitialized) ? kCombineWaiting : kWaiting; auto previous = decltype(time())::zero(); auto shouldPublish = false; while (true) { auto signal = publish(spins++, shouldPublish, previous, waiter, waitMode); // if we got skipped, make a note of it and return if we got a skipped // signal or a signal to wake up auto skipped = (signal == kSkipped); auto combined = (signal == kCombined); auto exceptionOccurred = (signal == kExceptionOccurred); auto woken = (signal == kWake); if (skipped || woken || combined || exceptionOccurred) { sig = static_cast<std::uint32_t>(signal); return !skipped; } // if we are under the spin threshold, pause to allow the other // hyperthread to run. If not, then sleep if (spins < kMaxSpins) { asm_volatile_pause(); } else { std::this_thread::sleep_for(folly::detail::Sleeper::kMinYieldingSleep); } } } template <typename Waiter> void doFutexWake(Waiter* waiter) { if (waiter) { // We can use a simple store operation here and not worry about checking // to see if the thread had actually started waiting on the futex, that is // already done in tryWake() when a sleeping thread is collected // // We now do not know whether the waiter had already enqueued on the futex // or whether it had just stored kSleeping in its futex and was about to // call futexWait(). We treat both these scenarios the same // // the below can theoretically cause a problem if we set the // wake signal and the waiter was in between setting kSleeping in its // futex and enqueueing on the futex. In this case the waiter will just // return from futexWait() immediately. This leaves the address that the // waiter was using for futexWait() possibly dangling, and the thread that // we woke in the exchange above might have used that address for some // other object // // however, even if the thread had indeed woken up simply becasue of the // above exchange(), the futexWake() below is not incorrect. It is not // incorrect because futexWake() does not actually change the memory of // the futex word. It just uses the address to do a lookup in the kernel // futex table. And even if we call futexWake() on some other address, // and that address was being used to wait on futex() that thread will // protect itself from spurious wakeups, check the value in the futex word // and enqueue itself back on the futex // // this dangilng pointer possibility is why we use a pointer to the futex // word, and avoid dereferencing after the store() operation auto sleeper = &waiter->metadata_.sleeper_; sleeper->store(kWake, std::memory_order_release); futexWake(sleeper, 1); } } template <typename Waiter> bool doFutexWait(Waiter* waiter, Waiter*& next) { // first we get ready to sleep by calling exchange() on the futex with a // kSleeping value DCHECK(waiter->futex_.load(std::memory_order_relaxed) == kAboutToWait); // note the semantics of using a futex here, when we exchange the sleeper_ // with kSleeping, we are getting ready to sleep, but before sleeping we get // ready to sleep, and we return from futexWait() when the value of // sleeper_ might have changed. We can also wake up because of a spurious // wakeup, so we always check against the value in sleeper_ after returning // from futexWait(), if the value is not kWake, then we continue auto pre = waiter->metadata_.sleeper_.exchange(kSleeping, std::memory_order_acq_rel); // Seeing a kSleeping on a futex word before we set it ourselves means only // one thing - an unlocking thread caught us before we went to futex(), and // we now have the lock, so we abort // // if we were given an early delivery, we can return from this function with // a true, meaning that we now have the lock if (pre == kSleeping) { return true; } // if we reach here then were were not given an early delivery, and any // thread that goes to wake us up will see a consistent view of the rest of // the contention chain (since the next_ variable is set before the // kSleeping exchange above) while (pre != kWake) { // before enqueueing on the futex, we wake any waiters that we were // possibly responsible for doFutexWake(std::exchange(next, nullptr)); // then we wait on the futex // // note that we have to protect ourselves against spurious wakeups here. // Because the corresponding futexWake() above does not synchronize // wakeups around the futex word. Because doing so would become // inefficient futexWait(&waiter->metadata_.sleeper_, kSleeping); pre = waiter->metadata_.sleeper_.load(std::memory_order_acquire); DCHECK((pre == kSleeping) || (pre == kWake)); } // when coming out of a futex, we might have some other sleeping threads // that we were supposed to wake up, assign that to the next pointer DCHECK(next == nullptr); next = extractPtr<Waiter>(waiter->next_.load(std::memory_order_relaxed)); return false; } template <typename Waiter> bool wait(Waiter* waiter, std::uint32_t mode, Waiter*& next, uint32_t& signal) { if (mode == kAboutToWait) { return doFutexWait(waiter, next); } return spin(*waiter, signal, mode); } inline void recordTimedWaiterAndClearTimedBit( bool& timedWaiter, std::uintptr_t& previous) { // the previous value in the mutex can never be kTimedWaiter, timed waiters // always set (kTimedWaiter | kLocked) in the mutex word when they try and // acquire the mutex DCHECK(previous != kTimedWaiter); if (UNLIKELY(previous & kTimedWaiter)) { // record whether there was a timed waiter in the previous mutex state, and // clear the timed bit from the previous state timedWaiter = true; previous = previous & (~kTimedWaiter); } } template <typename Atomic> void wakeTimedWaiters(Atomic* state, bool timedWaiters) { if (UNLIKELY(timedWaiters)) { folly::atomic_notify_one(state); // evade ADL } } template <template <typename> class Atomic, bool TimePublishing> template <typename Func> auto DistributedMutex<Atomic, TimePublishing>::lock_combine(Func func) -> folly::invoke_result_t<const Func&> { // invoke the lock implementation function and check whether we came out of // it with our task executed as a combined critical section. This usually // happens when the mutex is contended. // // In the absence of contention, we just return from the try_lock() function // with the lock acquired. So we need to invoke the task and unlock // the mutex before returning auto&& task = Request<Func>{func}; auto&& state = lockImplementation(*this, state_, task); if (!state.combined_) { // to avoid having to play a return-value dance when the combinable // returns void, we use a scope exit to perform the unlock after the // function return has been processed SCOPE_EXIT { unlock(std::move(state)); }; return func(); } // if we are here, that means we were able to get our request combined, we // can return the value that was transferred to us // // each thread that enqueues as a part of a contention chain takes up the // responsibility of any timed waiter that had come immediately before it, // so we wake up timed waiters before exiting the lock function. Another // strategy might be to add the timed waiter information to the metadata and // let a single leader wake up a timed waiter for better concurrency. But // this has proven not to be useful in benchmarks beyond a small 5% delta, // so we avoid taking the complexity hit and branch to wake up timed waiters // from each thread wakeTimedWaiters(&state_, state.timedWaiters_); return std::move(task).get(); } template <template <typename> class Atomic, bool TimePublishing> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy DistributedMutex<Atomic, TimePublishing>::lock() { auto null = nullptr; return lockImplementation(*this, state_, null); } template <template <typename> class Atomic, bool TimePublishing> template <typename Rep, typename Period, typename Func> folly::Optional<invoke_result_t<Func&>> DistributedMutex<Atomic, TimePublishing>::try_lock_combine_for( const std::chrono::duration<Rep, Period>& duration, Func func) { auto state = try_lock_for(duration); if (state) { SCOPE_EXIT { unlock(std::move(state)); }; return func(); } return folly::none; } template <template <typename> class Atomic, bool TimePublishing> template <typename Clock, typename Duration, typename Func> folly::Optional<invoke_result_t<Func&>> DistributedMutex<Atomic, TimePublishing>::try_lock_combine_until( const std::chrono::time_point<Clock, Duration>& deadline, Func func) { auto state = try_lock_until(deadline); if (state) { SCOPE_EXIT { unlock(std::move(state)); }; return func(); } return folly::none; } template <typename Atomic, template <typename> class A, bool T> auto tryLockNoLoad(Atomic& atomic, DistributedMutex<A, T>&) { // Try and set the least significant bit of the centralized lock state to 1, // if this succeeds, it must have been the case that we had a kUnlocked (or // 0) in the central storage before, since that is the only case where a 0 // can be found in the least significant bit // // If this fails, then it is a no-op using Proxy = typename DistributedMutex<A, T>::DistributedMutexStateProxy; auto previous = atomic_fetch_set(atomic, 0, std::memory_order_acquire); return Proxy{nullptr, previous ? 0 : kLocked}; } template <template <typename> class Atomic, bool TimePublishing> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy DistributedMutex<Atomic, TimePublishing>::try_lock() { // The lock attempt below requires an expensive atomic fetch-and-mutate or // an even more expensive atomic compare-and-swap loop depending on the // platform. These operations require pulling the lock cacheline into the // current core in exclusive mode and are therefore hard to parallelize // // This probabilistically avoids the expense by first checking whether the // mutex is currently locked if (state_.load(std::memory_order_relaxed) != kUnlocked) { return DistributedMutexStateProxy{nullptr, 0}; } return tryLockNoLoad(state_, *this); } template < template <typename> class Atomic, bool TimePublishing, typename State, typename Request> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy lockImplementation( DistributedMutex<Atomic, TimePublishing>& mutex, State& atomic, Request& request) { // first try and acquire the lock as a fast path, the underlying // implementation is slightly faster than using std::atomic::exchange() as // is used in this function. So we get a small perf boost in the // uncontended case // // We only go through this fast path for the lock/unlock usage and avoid this // for combined critical sections. This check adds unnecessary overhead in // that case as it causes an extra cacheline bounce constexpr auto combineRequested = !std::is_same<Request, std::nullptr_t>{}; if (!combineRequested) { if (auto state = tryLockNoLoad(atomic, mutex)) { return state; } } auto previous = std::uintptr_t{0}; auto waitMode = combineRequested ? kCombineUninitialized : kUninitialized; auto nextWaitMode = kAboutToWait; auto timedWaiter = false; Waiter<Atomic>* nextSleeper = nullptr; while (true) { // construct the state needed to wait // // We can't use auto here because MSVC errors out due to a missing copy // constructor Waiter<Atomic> state{}; auto&& task = coalesce(request, state); auto&& storage = makeReturnValueStorageFor(task); auto&& address = folly::bit_cast<std::uintptr_t>(&state); attach(task, storage); state.initialize(waitMode, std::move(task)); DCHECK(!(address & 0b1)); // set the locked bit in the address we will be persisting in the mutex address |= kLocked; // attempt to acquire the mutex, mutex acquisition is successful if the // previous value is zeroed out // // we use memory_order_acq_rel here because we want the read-modify-write // operation to be both acquire and release. Acquire because if this is a // successful lock acquisition, we want to acquire state any other thread // has released from a prior unlock. We want release semantics because // other threads that read the address of this value should see the full // well-initialized node we are going to wait on if the mutex acquisition // was unsuccessful previous = atomic.exchange(address, std::memory_order_acq_rel); recordTimedWaiterAndClearTimedBit(timedWaiter, previous); state.next_.store(previous, std::memory_order_relaxed); if (previous == kUnlocked) { return { /* next */ nullptr, /* expected */ address, /* timedWaiter */ timedWaiter, /* combined */ false, /* waker */ 0, /* waiters */ nullptr, /* ready */ nextSleeper}; } DCHECK(previous & kLocked); // wait until we get a signal from another thread, if this returns false, // we got skipped and had probably been scheduled out, so try again auto signal = kUninitialized; if (!wait(&state, waitMode, nextSleeper, signal)) { std::swap(waitMode, nextWaitMode); continue; } // at this point it is safe to access the other fields in the waiter state, // since the thread that woke us up is gone and nobody will be touching this // state again, note that this requires memory ordering, and this is why we // use memory_order_acquire (among other reasons) in the above wait // // first we see if the value we took off the mutex state was the thread that // initated the wakeups, if so, we are the terminal node of the current // contention chain. If we are the terminal node, then we should expect to // see a kLocked in the mutex state when we unlock, if we see that, we can // commit the unlock to the centralized mutex state. If not, we need to // continue wakeups // // a nice consequence of passing kLocked as the current address if we are // the terminal node is that it naturally just works with the algorithm. If // we get a contention chain when coming out of a contention chain, the tail // of the new contention chain will have kLocked set as the previous, which, // as it happens "just works", since we have now established a recursive // relationship until broken auto next = previous; auto expected = address; if (previous == state.metadata_.waker_) { next = 0; expected = kLocked; } // if we were given a combine signal, detach the return value from the // wait struct into the request, so the current thread can access it // outside this function auto combined = (signal == kCombined); auto exceptionOccurred = (signal == kExceptionOccurred); if (combined || exceptionOccurred) { detach(request, state, exceptionOccurred, storage); } // if we are just coming out of a futex call, then it means that the next // waiter we are responsible for is also a waiter waiting on a futex, so // we return that list in the list of ready threads. We wlil be waking up // the ready threads on unlock no matter what return { /* next */ extractPtr<Waiter<Atomic>>(next), /* expected */ expected, /* timedWaiter */ timedWaiter, /* combined */ combineRequested && (combined || exceptionOccurred), /* waker */ state.metadata_.waker_, /* waiters */ extractPtr<Waiter<Atomic>>(state.metadata_.waiters_), /* ready */ nextSleeper}; } } inline bool preempted(std::uint64_t value, std::chrono::nanoseconds now) { auto currentTime = recover(strip(now)); auto nodeTime = recover(value); auto preempted = (currentTime > nodeTime + kScheduledAwaySpinThreshold.count()) && (nodeTime != recover(strip(std::chrono::nanoseconds::max()))); // we say that the thread has been preempted if its timestamp says so, and // also if it is neither uninitialized nor skipped DCHECK(value != kSkipped); return (preempted) && (value != kUninitialized) && (value != kCombineUninitialized); } inline bool isSleeper(std::uintptr_t value) { return (value == kAboutToWait); } inline bool isInitialized(std::uintptr_t value) { return (value != kUninitialized) && (value != kCombineUninitialized); } inline bool isCombiner(std::uintptr_t value) { auto mode = (value & 0xff); return (mode == kCombineWaiting) || (mode == kCombineUninitialized); } inline bool isWaitingCombiner(std::uintptr_t value) { return (value & 0xff) == kCombineWaiting; } template <typename Waiter> CombineFunction loadTask(Waiter* current, std::uintptr_t value) { // if we know that the waiter is a combiner of some sort, it is safe to read // and copy the value of the function in the waiter struct, since we know // that a waiter would have set it before enqueueing if (isCombiner(value)) { return current->function_; } return nullptr; } template <typename Waiter> FOLLY_COLD void transferCurrentException(Waiter* waiter) { DCHECK(std::current_exception()); new (&waiter->storage_) std::exception_ptr{std::current_exception()}; waiter->futex_.store(kExceptionOccurred, std::memory_order_release); } template <template <typename> class Atomic> FOLLY_ALWAYS_INLINE std::uintptr_t tryCombine( Waiter<Atomic>* waiter, std::uintptr_t value, std::uintptr_t next, std::uint64_t iteration, std::chrono::nanoseconds now, CombineFunction task) { // if the waiter has asked for a combine operation, we should combine its // critical section and move on to the next waiter // // the waiter is combinable if the following conditions are satisfied // // 1) the state in the futex word is not uninitialized (kUninitialized) // 2) it has a valid combine function // 3) we are not past the limit of the number of combines we can perform // or the waiter thread been preempted. If the waiter gets preempted, // its better to just execute their critical section before moving on. // As they will have to re-queue themselves after preemption anyway, // leading to further delays in critical section completion // // if all the above are satisfied, then we can combine the critical section. // Note that if the waiter is in a combineable state, that means that it had // finished its writes to both the task and the next_ value. And observing // a waiting state also means that we have acquired the writes to the other // members of the waiter struct, so it's fine to use those values here if (isWaitingCombiner(value) && (iteration <= kMaxCombineIterations || preempted(value, now))) { try { task(); waiter->futex_.store(kCombined, std::memory_order_release); } catch (...) { transferCurrentException(waiter); } return next; } return 0; } template <typename Waiter> FOLLY_ALWAYS_INLINE std::uintptr_t tryWake( bool publishing, Waiter* waiter, std::uintptr_t value, std::uintptr_t next, std::uintptr_t waker, Waiter*& sleepers, std::uint64_t iteration, CombineFunction task) { // try and combine the waiter's request first, if that succeeds that means // we have successfully executed their critical section and can move on to // the rest of the chain auto now = time(); if (tryCombine(waiter, value, next, iteration, now, task)) { return next; } // first we see if we can wake the current thread that is spinning if ((!publishing || !preempted(value, now)) && !isSleeper(value)) { // the Metadata class should be trivially destructible as we use placement // new to set the relevant metadata without calling any destructor. We // need to use placement new because the class contains a futex, which is // non-movable and non-copyable using Metadata = std::decay_t<decltype(waiter->metadata_)>; static_assert(std::is_trivially_destructible<Metadata>{}, ""); // we need release here because of the write to waker_ and also because we // are unlocking the mutex, the thread we do the handoff to here should // see the modified data new (&waiter->metadata_) Metadata{waker, bit_cast<uintptr_t>(sleepers)}; waiter->futex_.store(kWake, std::memory_order_release); return 0; } // if the thread is not a sleeper, and we were not able to catch it before // preemption, we can just return a false, it is safe to read next_ because // the thread was preempted. Preemption signals can only come after the // thread has set the next_ pointer, since the timestamp writes only start // occurring after that point // // if a thread was preempted it must have stored next_ in the waiter struct, // as the store to futex_ that resets the value from kUninitialized happens // after the write to next CHECK(publishing); if (!isSleeper(value)) { // go on to the next one // // Also, we need a memory_order_release here to prevent missed wakeups. A // missed wakeup here can happen when we see that a thread had been // preempted and skip it. Then go on to release the lock, and then when // the thread which got skipped does an exchange on the central storage, // still sees the locked bit, and never gets woken up // // Can we relax this? DCHECK(preempted(value, now)); DCHECK(!isCombiner(value)); next = waiter->next_.load(std::memory_order_relaxed); waiter->futex_.store(kSkipped, std::memory_order_release); return next; } // if we are here the thread is a sleeper // // we attempt to catch the thread before it goes to futex(). If we are able // to catch the thread before it sleeps on a futex, we are done, and don't // need to go any further // // if we are not able to catch the thread before it goes to futex, we // collect the current thread in the list of sleeping threads represented by // sleepers, and return the next thread in the list and return false along // with the previous next value // // it is safe to read the next_ pointer in the waiter struct if we were // unable to catch the thread before it went to futex() because we use // acquire-release ordering for the exchange operation below. And if we see // that the thread was already sleeping, we have synchronized with the write // to next_ in the context of the sleeping thread // // Also we need to set the value of waiters_ and waker_ in the thread before // doing the exchange because we need to pass on the list of sleepers in the // event that we were able to catch the thread before it went to futex(). // If we were unable to catch the thread before it slept, these fields will // be ignored when the thread wakes up anyway DCHECK(isSleeper(value)); waiter->metadata_.waker_ = waker; waiter->metadata_.waiters_ = folly::bit_cast<std::uintptr_t>(sleepers); auto pre = waiter->metadata_.sleeper_.exchange(kSleeping, std::memory_order_acq_rel); // we were able to catch the thread before it went to sleep, return true if (pre != kSleeping) { return 0; } // otherwise return false, with the value of next_, it is safe to read next // because of the same logic as when a thread was preempted // // we also need to collect this sleeper in the list of sleepers being built // up next = waiter->next_.load(std::memory_order_relaxed); auto head = folly::bit_cast<std::uintptr_t>(sleepers); waiter->next_.store(head, std::memory_order_relaxed); sleepers = waiter; return next; } template <typename Waiter> bool wake( bool publishing, Waiter& waiter, std::uintptr_t waker, Waiter*& sleepers, std::uint64_t iter) { // loop till we find a node that is either at the end of the list (as // specified by waker) or we find a node that is active (as specified by // the last published timestamp of the node) auto current = &waiter; while (current) { // it is important that we load the value of function and next_ after the // initial acquire load. This is required because we need to synchronize // with the construction of the waiter struct before reading from it // // the load from the next_ variable is an optimistic load that assumes // that the waiting thread has probably gone to the waiting state. If the // waiitng thread is in the waiting state (as revealed by the acquire load // from the futex word), we will see a well formed next_ value because it // happens-before the release store to the futex word. The atomic load from // next_ is an optimization to avoid branching before loading and prevent // the compiler from eliding the load altogether (and using a pointer // dereference when needed) auto value = current->futex_.load(std::memory_order_acquire); auto next = current->next_.load(std::memory_order_relaxed); auto task = loadTask(current, value); next = tryWake(publishing, current, value, next, waker, sleepers, iter, task); // if there is no next node, we have managed to wake someone up and have // successfully migrated the lock to another thread if (!next) { return true; } // we need to read the value of the next node in the list before skipping // it, this is because after we skip it the node might wake up and enqueue // itself, and thereby gain a new next node CHECK(publishing); current = (next == waker) ? nullptr : extractPtr<Waiter>(next); } return false; } template <typename Atomic, typename Proxy, typename Sleepers> bool tryUnlockClean(Atomic& state, Proxy& proxy, Sleepers sleepers) { auto expected = proxy.expected_; while (true) { if (state.compare_exchange_strong( expected, kUnlocked, std::memory_order_release, std::memory_order_relaxed)) { // if we were able to commit an unlocked, we need to wake up the futex // waiters, if any doFutexWake(sleepers); return true; } // if we failed the compare_exchange_strong() above, we check to see if // the failure was because of the presence of a timed waiter. If that // was the case then we try one more time with the kTimedWaiter bit set if (UNLIKELY(expected == (proxy.expected_ | kTimedWaiter))) { proxy.timedWaiters_ = true; continue; } // otherwise break, we have a contention chain return false; } } template <template <typename> class Atomic, bool Publish> void DistributedMutex<Atomic, Publish>::unlock( DistributedMutex::DistributedMutexStateProxy const& proxy_) { auto proxy = proxy_; // we always wake up ready threads and timed waiters if we saw either DCHECK(proxy) << "Invalid proxy passed to DistributedMutex::unlock()"; DCHECK(!proxy.combined_) << "Cannot unlock mutex after a successful combine"; SCOPE_EXIT { doFutexWake(proxy.ready_); wakeTimedWaiters(&state_, proxy.timedWaiters_); }; // if there is a wait queue we are responsible for, try and start wakeups, // don't bother with the mutex state auto sleepers = proxy.waiters_; if (proxy.next_) { if (wake(Publish, *proxy.next_, proxy.waker_, sleepers, 0)) { return; } // At this point, if are in the if statement, we were not the terminal // node of the wakeup chain. Terminal nodes have the next_ pointer set to // null in lock() // // So we need to pretend we were the end of the contention chain. Coming // out of a contention chain always has the kLocked state set in the // mutex. Unless there is another contention chain lined up, which does // not matter since we are the terminal node anyway proxy.expected_ = kLocked; } for (std::uint64_t i = 0; true; ++i) { // otherwise, since we don't have anyone we need to wake up, we try and // release the mutex just as is // // if this is successful, we can return, the unlock was successful, we have // committed a nice kUnlocked to the central storage, yay if (tryUnlockClean(state_, proxy, sleepers)) { return; } // here we have a contention chain built up on the mutex. We grab the // wait queue and start executing wakeups. We leave a locked bit on the // centralized storage and handoff control to the head of the queue // // we use memory_order_acq_rel here because we want to see the // full well-initialized node that the other thread is waiting on // // If we are unable to wake the contention chain, it is possible that when // we come back to looping here, a new contention chain will form. In // that case we need to use kLocked as the waker_ value because the // terminal node of the new chain will see kLocked in the central storage auto head = state_.exchange(kLocked, std::memory_order_acq_rel); recordTimedWaiterAndClearTimedBit(proxy.timedWaiters_, head); auto next = extractPtr<Waiter<Atomic>>(head); auto expected = std::exchange(proxy.expected_, kLocked); DCHECK((head & kLocked) && (head != kLocked)) << "incorrect state " << head; if (wake(Publish, *next, expected, sleepers, i)) { break; } } } template <typename Atomic, typename Deadline, typename MakeProxy> auto timedLock(Atomic& state, Deadline deadline, MakeProxy proxy) { while (true) { // we put a bit on the central state to show that there is a timed waiter // and go to sleep on the central state // // when this thread goes to unlock the mutex, it will expect a 0b1 in the // mutex state (0b1, not 0b11), but then it will see that the value in the // mutex state is 0b11 and not 0b1, meaning that there might have been // another timed waiter. Even though there might not have been another // timed waiter in the time being. This sort of missed wakeup is // desirable for timed waiters; it helps avoid thundering herds of timed // waiters. Because the mutex is packed in 8 bytes, and we need an // address to be stored in those 8 bytes, we don't have much room to play // with. The only other solution is to issue a futexWake(INT_MAX) to wake // up all waiters when a clean unlock is committed, when a thread saw a // timed waiter in the mutex previously. // // putting a 0b11 here works for a set of reasons that is a superset of // the set of reasons that make it okay to put a kLocked (0b1) in the // mutex state. Now that the thread has put (kTimedWaiter | kLocked) // (0b11) in the mutex state and it expects a kLocked (0b1), there are two // scenarios possible. The first being when there is no contention chain // formation in the mutex from the time a timed waiter got a lock to // unlock. In this case, the unlocker sees a 0b11 in the mutex state, // adjusts to the presence of a timed waiter and cleanly unlocks with a // kUnlocked (0b0). The second is when there is a contention chain. // When a thread puts its address in the mutex and sees the timed bit, it // records the presence of a timed waiter, and then pretends as if it // hadn't seen the timed bit. So future contention chain releases, will // terminate with a kLocked (0b1) and not a (kLocked | kTimedWaiter) // (0b11). This just works naturally with the rest of the algorithm // without incurring a perf hit for the regular non-timed case // // this strategy does however mean, that when threads try to acquire the // mutex and all time out, there will be a wasteful syscall to issue wakeups // to waiting threads. We don't do anything to try and minimize this // // we need to use a fetch_or() here because we need to convey two bits of // information - 1, whether the mutex is locked or not, and 2, whether // there is a timed waiter. The alternative here is to use the second bit // to convey information only, we can use a fetch_set() on the second bit // to make this faster, but that comes at the expense of requiring regular // fast path lock attempts. Which use a single bit read-modify-write for // better performance auto data = kTimedWaiter | kLocked; auto previous = state.fetch_or(data, std::memory_order_acquire); if (!(previous & 0b1)) { DCHECK(!previous); return proxy(nullptr, kLocked, true); } // wait on the futex until signalled, if we get a timeout, the try_lock // fails auto result = folly::atomic_wait_until( // evade ADL &state, previous | data, deadline); if (result == std::cv_status::timeout) { return proxy(nullptr, std::uintptr_t{0}, false); } } } template <template <typename> class Atomic, bool TimePublishing> template <typename Clock, typename Duration> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy DistributedMutex<Atomic, TimePublishing>::try_lock_until( const std::chrono::time_point<Clock, Duration>& deadline) { // fast path for the uncontended case // // we get the time after trying to acquire the mutex because in the // uncontended case, the price of getting the time is about 1/3 of the // actual mutex acquisition. So we only pay the price of that extra bit of // latency when needed // // this is even higher when VDSO is involved on architectures that do not // offer a direct interface to the timestamp counter if (auto state = try_lock()) { return state; } // fall back to the timed locking algorithm using Proxy = DistributedMutexStateProxy; return timedLock(state_, deadline, [](auto... as) { return Proxy{as...}; }); } template <template <typename> class Atomic, bool TimePublishing> template <typename Rep, typename Period> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy DistributedMutex<Atomic, TimePublishing>::try_lock_for( const std::chrono::duration<Rep, Period>& duration) { // fast path for the uncontended case. Reasoning for doing this here is the // same as in try_lock_until() if (auto state = try_lock()) { return state; } // fall back to the timed locking algorithm using Proxy = DistributedMutexStateProxy; auto deadline = std::chrono::steady_clock::now() + duration; return timedLock(state_, deadline, [](auto... as) { return Proxy{as...}; }); } } // namespace distributed_mutex } // namespace detail } // namespace folly namespace std { template <template <typename> class Atom, bool TimePublishing> class unique_lock< ::folly::detail::distributed_mutex::DistributedMutex<Atom, TimePublishing>> : public ::folly::unique_lock_base< ::folly::detail::distributed_mutex:: DistributedMutex<Atom, TimePublishing>> { public: using ::folly::unique_lock_base< ::folly::detail::distributed_mutex:: DistributedMutex<Atom, TimePublishing>>::unique_lock_base; }; template <template <typename> class Atom, bool TimePublishing> class lock_guard< ::folly::detail::distributed_mutex::DistributedMutex<Atom, TimePublishing>> : public ::folly::unique_lock_guard_base< ::folly::detail::distributed_mutex:: DistributedMutex<Atom, TimePublishing>> { public: using ::folly::unique_lock_guard_base< ::folly::detail::distributed_mutex:: DistributedMutex<Atom, TimePublishing>>::unique_lock_guard_base; }; } // namespace std