/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include namespace folly { namespace observer { /** * Observer - a library which lets you create objects which track updates of * their dependencies and get re-computed when any of the dependencies changes. * * * Given an Observer, you can get a snapshot of the current version of the * object it holds: * * Observer myObserver = ...; * Snapshot mySnapshot = myObserver.getSnapshot(); * or simply * Snapshot mySnapshot = *myObserver; * * Snapshot will hold a view of the object, even if object in the Observer * gets updated. * * * What makes Observer powerful is its ability to track updates to other * Observers. Imagine we have two separate Observers A and B which hold * integers. * * Observer observerA = ...; * Observer observerB = ...; * * To compute a sum of A and B we can create a new Observer which would track * updates to A and B and re-compute the sum only when necessary. * * Observer sumObserver = makeObserver( * [observerA, observerB] { * int a = **observerA; * int b = **observerB; * return a + b; * }); * * int sum = **sumObserver; * * Notice that a + b will be only called when either a or b is changed. Getting * a snapshot from sumObserver won't trigger any re-computation. * * See AtomicObserver and TLObserver for optimized reads. * * See ObserverCreator class if you want to wrap any existing subscription API * in an Observer object. */ template class Observer; /** * An AtomicObserver provides read-optimized caching for an Observer using * `std::atomic`. Reading only requires atomic loads unless the cached value * is stale. If the cache needs to be refreshed, a mutex is used to * synchronize the update. This avoids creating a shared_ptr for every read. * * AtomicObserver models CopyConstructible and MoveConstructible. Copying or * moving simply invalidates the cache. * * AtomicObserver is ideal when there are lots of reads on a trivially-copyable * type. if `std::atomic` is not possible but you still want to optimize * reads, consider a TLObserver. * * Observer observer = ...; * AtomicObserver atomicObserver(observer); * auto value = *atomicObserver; */ template class AtomicObserver; /** * A TLObserver provides read-optimized caching for an Observer using * thread-local storage. This avoids creating a shared_ptr for every read. * * The functionality is similar to that of AtomicObserver except it allows types * that don't support atomics. If possible, use AtomicObserver instead. * * TLObserver can consume significant amounts of memory if accessed from many * threads. The problem is exacerbated if you chain several TLObservers. * Therefore, TLObserver should be used sparingly. * * Observer observer = ...; * TLObserver tlObserver(observer); * auto& snapshot = *tlObserver; */ template class TLObserver; /** * A ReadMostlyAtomicObserver guarantees that reading is exactly one relaxed * atomic load. Like AtomicObserver, the value is cached using `std::atomic`. * However, there is no version check when reading which means that the * cached value may be out-of-date with the Observer value. The cached value * will be updated asynchronously in a background thread. * * Because there is no version check when reading, ReadMostlyAtomicObserver * does not capture observer dependencies when used from makeObserver. It is not * possible to create a dependent observer. Therefore, ReadMostlyAtomicObserver * models none of CopyConstructible, MoveConstructible, CopyAssignable, or * MoveAssignable. Dependent observers should be created using the underlying * observer. * * ReadMostlyAtomicObserver is ideal for fastest possible reads on a * trivially-copyable type when a slightly out-of-date value will suffice. It is * perfect for very frequent reads coupled with very infrequent writes. * * Observer observer = ...; * ReadMostlyAtomicObserver atomicObserver(observer); * auto value = *atomicObserver; */ template class ReadMostlyAtomicObserver; /** * A TLObserver that optimizes for getting shared_ptr to data */ template class ReadMostlyTLObserver; /** * HazptrObserver implements a read-optimized Observer which caches an * Observer's snapshot and protects access to it using hazptrs. The cached * snapshot is kept up to date using a callback which fires when the original * observer changes. This implementation incurs an additional allocation * on updates making it less suitable for write-heavy workloads. * * There are 2 main APIs: * 1) getSnapshot: Returns a Snapshot containing a const pointer to T and guards * access to it using folly::hazptr_holder. The pointer is only safe to use * while the returned Snapshot object is alive. * 2) getLocalSnapshot: Same as getSnapshot but backed by folly::hazptr_local. * This API is ~3ns faster than getSnapshot but is unsafe for the current * thread to construct any other hazptr holder type objects (hazptr_holder, * hazptr_array and other hazptr_local) while the returned snapshot exists. * * See folly/synchronization/Hazptr.h for more details on hazptrs. */ template class HazptrObserver; template class Snapshot { public: const T& operator*() const { return *get(); } const T* operator->() const { return get(); } const T* get() const { return data_.get(); } std::shared_ptr getShared() const& { return data_; } std::shared_ptr getShared() && { return std::move(data_); } /** * Return the version of the observed object. */ size_t getVersion() const { return version_; } private: friend class Observer; Snapshot( const observer_detail::Core& core, std::shared_ptr data, size_t version) : data_(std::move(data)), version_(version), core_(&core) { DCHECK(data_); } std::shared_ptr data_; size_t version_; const observer_detail::Core* core_; }; class CallbackHandle { public: CallbackHandle(); template CallbackHandle(Observer observer, Function)> callback); CallbackHandle(const CallbackHandle&) = delete; CallbackHandle(CallbackHandle&&) = default; CallbackHandle& operator=(const CallbackHandle&) = delete; CallbackHandle& operator=(CallbackHandle&&) noexcept; ~CallbackHandle(); // If callback is currently running, waits until it completes. // Callback will never be called after cancel() returns. void cancel(); private: struct Context; std::shared_ptr context_; }; template class ObserverCreator; template class Observer { public: explicit Observer(observer_detail::Core::Ptr core); Snapshot getSnapshot() const; Snapshot operator*() const { return getSnapshot(); } /** * Check if we have a newer version of the observed object than the snapshot. * Snapshot should have been originally from this Observer. */ bool needRefresh(const Snapshot& snapshot) const { DCHECK_EQ(core_.get(), snapshot.core_); return needRefresh(snapshot.getVersion()); } bool needRefresh(size_t version) const { return version < core_->getVersionLastChange(); } CallbackHandle addCallback(Function)> callback) const; private: template friend class ObserverCreator; observer_detail::Core::Ptr core_; }; template Observer unwrap(Observer); template Observer unwrapValue(Observer); template Observer unwrap(Observer>); template Observer unwrapValue(Observer>); /** * makeObserver(...) creates a new Observer object given a functor to * compute it. The functor can return T or std::shared_ptr. * * makeObserver(...) blocks until the initial version of Observer is computed. * If creator functor fails (throws or returns a nullptr) during this first * call, the exception is re-thrown by makeObserver(...). * * For all subsequent updates if creator functor fails (throws or returs a * nullptr), the Observer (and all its dependents) is not updated. */ template Observer> makeObserver(F&& creator); template Observer> makeObserver(F&& creator); template Observer> makeObserver(F&& creator); /** * The returned Observer will proxy updates from the input observer, but will * skip updates that contain the same (according to operator==) value even if * the actual object in the update is different. */ template Observer makeValueObserver(Observer observer); /** * A more efficient short-cut for makeValueObserver(makeObserver(...)). */ template Observer> makeValueObserver(F&& creator); template Observer> makeValueObserver( F&& creator); /** * The returned Observer will never update and always return the passed value. */ template Observer makeStaticObserver(T value); template Observer makeStaticObserver(std::shared_ptr value); template class AtomicObserver { public: explicit AtomicObserver(Observer observer); AtomicObserver(const AtomicObserver& other); AtomicObserver(AtomicObserver&& other) noexcept; AtomicObserver& operator=(const AtomicObserver& other); AtomicObserver& operator=(AtomicObserver&& other) noexcept; AtomicObserver& operator=(Observer observer); T get() const; T operator*() const { return get(); } Observer getUnderlyingObserver() const { return observer_; } private: mutable std::atomic cachedValue_{}; mutable std::atomic cachedVersion_{}; mutable SharedMutex refreshLock_; Observer observer_; }; template class TLObserver { public: explicit TLObserver(Observer observer); TLObserver(const TLObserver& other); const Snapshot& getSnapshotRef() const; const Snapshot& operator*() const { return getSnapshotRef(); } Observer getUnderlyingObserver() const { return observer_; } private: Observer observer_; ThreadLocal> snapshot_; }; template class ReadMostlyAtomicObserver { public: explicit ReadMostlyAtomicObserver(Observer observer); ReadMostlyAtomicObserver(const ReadMostlyAtomicObserver&) = delete; ReadMostlyAtomicObserver& operator=(const ReadMostlyAtomicObserver&) = delete; T get() const; T operator*() const { return get(); } Observer getUnderlyingObserver() const { return observer_; } private: Observer observer_; std::atomic cachedValue_{}; CallbackHandle callback_; }; template class ReadMostlyTLObserver { public: explicit ReadMostlyTLObserver(Observer observer); ReadMostlyTLObserver(const ReadMostlyTLObserver& other); ReadMostlySharedPtr getShared() const; Observer getUnderlyingObserver() const { return observer_; } private: ReadMostlySharedPtr refresh() const; struct LocalSnapshot { LocalSnapshot() {} LocalSnapshot(const ReadMostlyMainPtr& data, size_t version) : data_(data), version_(version) {} ReadMostlyWeakPtr data_; size_t version_; }; Observer observer_; mutable Synchronized, std::mutex> globalData_; mutable std::atomic globalVersion_{0}; ThreadLocal localSnapshot_; }; template class HazptrObserver { template struct HazptrSnapshot { template explicit HazptrSnapshot(const std::atomic& state) : holder_() { make(holder_); ptr_ = get(holder_).protect(state)->snapshot_.get(); } const T& operator*() const { return *get(); } const T* operator->() const { return get(); } const T* get() const { return ptr_; } private: static void make(hazptr_holder<>& holder) { holder = folly::make_hazard_pointer<>(); } static void make(hazptr_local<1>&) {} static hazptr_holder<>& get(hazptr_holder<>& holder) { return holder; } static hazptr_holder<>& get(hazptr_local<1>& holder) { return holder[0]; } Holder holder_; const T* ptr_; }; public: using DefaultSnapshot = HazptrSnapshot>; using LocalSnapshot = HazptrSnapshot>; explicit HazptrObserver(Observer observer) : observer_( makeObserver([o = std::move(observer), alive = alive_, this]() { auto snapshot = o.getSnapshot(); auto rAlive = alive->rlock(); if (*rAlive) { auto* newState = new State(snapshot); auto* oldState = state_.exchange(newState, std::memory_order_acq_rel); if (oldState) { oldState->retire(); } } return snapshot.getShared(); })) {} HazptrObserver(const HazptrObserver& r) : HazptrObserver(r.observer_) {} HazptrObserver& operator=(const HazptrObserver&) = delete; HazptrObserver(HazptrObserver&&) = default; HazptrObserver& operator=(HazptrObserver&&) = default; ~HazptrObserver() { *alive_->wlock() = false; auto* state = state_.load(std::memory_order_acquire); if (state) { state->retire(); } } DefaultSnapshot getSnapshot() const; LocalSnapshot getLocalSnapshot() const; private: struct State : public hazptr_obj_base { explicit State(Snapshot snapshot) : snapshot_(std::move(snapshot)) {} Snapshot snapshot_; }; std::atomic state_{nullptr}; std::shared_ptr> alive_{ std::make_shared>(true)}; Observer observer_; }; /** * Same as makeObserver(...), but creates AtomicObserver. */ template AtomicObserver makeAtomicObserver(Observer observer) { return AtomicObserver(std::move(observer)); } template auto makeAtomicObserver(F&& creator) { return makeAtomicObserver(makeObserver(std::forward(creator))); } /** * Same as makeObserver(...), but creates TLObserver. */ template TLObserver makeTLObserver(Observer observer) { return TLObserver(std::move(observer)); } template auto makeTLObserver(F&& creator) { return makeTLObserver(makeObserver(std::forward(creator))); } /** * Same as makeObserver(...), but creates ReadMostlyAtomicObserver. */ template ReadMostlyAtomicObserver makeReadMostlyAtomicObserver(Observer observer) { return ReadMostlyAtomicObserver(std::move(observer)); } template auto makeReadMostlyAtomicObserver(F&& creator) { return makeReadMostlyAtomicObserver(makeObserver(std::forward(creator))); } /** * Same as makeObserver(...), but creates ReadMostlyTLObserver. */ template ReadMostlyTLObserver makeReadMostlyTLObserver(Observer observer) { return ReadMostlyTLObserver(std::move(observer)); } template auto makeReadMostlyTLObserver(F&& creator) { return makeReadMostlyTLObserver(makeObserver(std::forward(creator))); } /** * Same as makeObserver(...), but creates HazptrObserver. */ template HazptrObserver makeHazptrObserver(Observer observer) { return HazptrObserver(std::move(observer)); } template auto makeHazptrObserver(F&& creator) { return makeHazptrObserver(makeObserver(std::forward(creator))); } template struct ObserverTraits {}; template struct ObserverTraits { using type = Observer; }; template struct ObserverTraits { using type = TLObserver; }; template using ObserverT = typename ObserverTraits::type; } // namespace observer } // namespace folly #include