/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include namespace folly { namespace observer_detail { template observer::Observer> makeObserver(F&& creator) { return observer::makeObserver([creator = std::forward(creator)]() mutable { return std::make_shared>(creator()); }); } template observer::Observer> makeValueObserver(F&& creator) { return observer::makeValueObserver( [creator = std::forward(creator)]() mutable { return std::make_shared>(creator()); }); } } // namespace observer_detail namespace observer { template Snapshot Observer::getSnapshot() const { auto data = core_->getData(); return Snapshot( *core_, std::static_pointer_cast(std::move(data.data)), data.version); } template Observer::Observer(observer_detail::Core::Ptr core) : core_(std::move(core)) {} template Observer unwrap(Observer o) { return o; } template Observer unwrapValue(Observer o) { return makeValueObserver(std::move(o)); } template Observer unwrap(Observer> oo) { return makeObserver([oo = std::move(oo)] { return oo.getSnapshot()->getSnapshot().getShared(); }); } template Observer unwrapValue(Observer> oo) { return makeValueObserver([oo = std::move(oo)] { return oo.getSnapshot()->getSnapshot().getShared(); }); } template Observer> makeObserver( F&& creator) { auto core = observer_detail::Core::create( [creator = std::forward(creator)]() mutable { return std::static_pointer_cast(creator()); }); observer_detail::ObserverManager::initCore(core); return Observer>(core); } template Observer> makeObserver(F&& creator) { return observer_detail::makeObserver(std::forward(creator)); } template Observer> makeObserver(F&& creator) { return unwrap(observer_detail::makeObserver(std::forward(creator))); } template Observer makeStaticObserver(T value) { return makeStaticObserver(std::make_shared(std::move(value))); } template Observer makeStaticObserver(std::shared_ptr value) { return makeObserver([value = std::move(value)] { return value; }); } template AtomicObserver::AtomicObserver(Observer observer) : observer_(std::move(observer)) {} template AtomicObserver::AtomicObserver(const AtomicObserver& other) : AtomicObserver(other.observer_) {} template AtomicObserver::AtomicObserver(AtomicObserver&& other) noexcept : AtomicObserver(std::move(other.observer_)) {} template AtomicObserver& AtomicObserver::operator=( const AtomicObserver& other) { return *this = other.observer_; } template AtomicObserver& AtomicObserver::operator=( AtomicObserver&& other) noexcept { return *this = std::move(other.observer_); } template AtomicObserver& AtomicObserver::operator=(Observer observer) { observer_ = std::move(observer); cachedVersion_.store(0, std::memory_order_release); return *this; } template T AtomicObserver::get() const { auto version = cachedVersion_.load(std::memory_order_acquire); if (UNLIKELY( observer_.needRefresh(version) || observer_detail::ObserverManager::inManagerThread())) { SharedMutex::WriteHolder guard{refreshLock_}; version = cachedVersion_.load(std::memory_order_acquire); if (LIKELY( observer_.needRefresh(version) || observer_detail::ObserverManager::inManagerThread())) { auto snapshot = *observer_; cachedValue_.store(*snapshot, std::memory_order_relaxed); cachedVersion_.store(snapshot.getVersion(), std::memory_order_release); } } return cachedValue_.load(std::memory_order_relaxed); } template TLObserver::TLObserver(Observer observer) : observer_(std::move(observer)), snapshot_([&] { return new Snapshot(observer_.getSnapshot()); }) {} template TLObserver::TLObserver(const TLObserver& other) : TLObserver(other.observer_) {} template const Snapshot& TLObserver::getSnapshotRef() const { auto& snapshot = *snapshot_; if (observer_.needRefresh(snapshot) || observer_detail::ObserverManager::inManagerThread()) { snapshot = observer_.getSnapshot(); } return snapshot; } template ReadMostlyAtomicObserver::ReadMostlyAtomicObserver(Observer observer) : observer_(std::move(observer)), cachedValue_(**observer_), callback_(observer_.addCallback([this](Snapshot snapshot) { cachedValue_.store(*snapshot, std::memory_order_relaxed); })) {} template T ReadMostlyAtomicObserver::get() const { if (UNLIKELY(observer_detail::ObserverManager::inManagerThread())) { return **observer_; } return cachedValue_.load(std::memory_order_relaxed); } template ReadMostlyTLObserver::ReadMostlyTLObserver(Observer observer) : observer_(std::move(observer)) { refresh(); } template ReadMostlyTLObserver::ReadMostlyTLObserver( const ReadMostlyTLObserver& other) : ReadMostlyTLObserver(other.observer_) {} template ReadMostlySharedPtr ReadMostlyTLObserver::getShared() const { if (!observer_.needRefresh(localSnapshot_->version_) && !observer_detail::ObserverManager::inManagerThread()) { if (auto data = localSnapshot_->data_.lock()) { return data; } } return refresh(); } template ReadMostlySharedPtr ReadMostlyTLObserver::refresh() const { auto snapshot = observer_.getSnapshot(); auto globalData = globalData_.lock(); if (globalVersion_.load() < snapshot.getVersion()) { globalData->reset(snapshot.getShared()); globalVersion_ = snapshot.getVersion(); } *localSnapshot_ = LocalSnapshot(*globalData, globalVersion_.load()); return globalData->getShared(); } struct CallbackHandle::Context { Optional> observer; Synchronized canceled{false}; }; inline CallbackHandle::CallbackHandle() {} template CallbackHandle::CallbackHandle( Observer observer, Function)> callback) { context_ = std::make_shared(); context_->observer = makeObserver([observer = std::move(observer), callback = std::move(callback), context = context_]() mutable { auto rCanceled = context->canceled.rlock(); if (*rCanceled) { return folly::unit; } callback(*observer); return folly::unit; }); } inline CallbackHandle& CallbackHandle::operator=( CallbackHandle&& handle) noexcept { cancel(); context_ = std::move(handle.context_); return *this; } inline CallbackHandle::~CallbackHandle() { cancel(); } inline void CallbackHandle::cancel() { if (!context_) { return; } context_->observer.reset(); context_->canceled = true; context_.reset(); } template CallbackHandle Observer::addCallback( Function)> callback) const { return CallbackHandle(*this, std::move(callback)); } template Observer makeValueObserver(Observer observer) { return makeValueObserver( [observer] { return observer.getSnapshot().getShared(); }); } template Observer> makeValueObserver(F&& creator) { return observer_detail::makeValueObserver(std::forward(creator)); } template Observer> makeValueObserver( F&& creator) { return unwrapValue(observer_detail::makeObserver(std::forward(creator))); } template Observer> makeValueObserver( F&& creator) { auto activeValue = creator(); return makeObserver([activeValue = std::move(activeValue), creator = std::forward(creator)]() mutable { auto newValue = creator(); if (!(*activeValue == *newValue)) { activeValue = newValue; } return activeValue; }); } template typename HazptrObserver::DefaultSnapshot HazptrObserver::getSnapshot() const { if (UNLIKELY(observer_detail::ObserverManager::inManagerThread())) { // Wait for updates observer_.getSnapshot(); } return DefaultSnapshot(state_); } template typename HazptrObserver::LocalSnapshot HazptrObserver::getLocalSnapshot() const { if (UNLIKELY(observer_detail::ObserverManager::inManagerThread())) { // Wait for updates observer_.getSnapshot(); } return LocalSnapshot(state_); } } // namespace observer } // namespace folly