/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once namespace folly { namespace observer { namespace detail { template class ObserverCreatorContext { using T = typename Traits::element_type; public: template ObserverCreatorContext(Args&&... args) : observable_(std::forward(args)...) { state_.unsafeGetUnlocked().updateValue(Traits::get(observable_)); } ~ObserverCreatorContext() { if (state_.unsafeGetUnlocked().value) { Traits::unsubscribe(observable_); } } void setCore(observer_detail::Core::WeakPtr coreWeak) { coreWeak_ = std::move(coreWeak); } std::shared_ptr get() { auto state = state_.lock(); state->updateRequested = false; return state->value; } void update() { // This mutex ensures there's no race condition between initial update() // call and update() calls from the subsciption callback. // // Additionally it helps avoid races between two different subscription // callbacks (getting new value from observable and storing it into value_ // is not atomic). auto state = state_.lock(); if (!state->updateValue(Traits::get(observable_))) { // Value didn't change, so we can skip the version update. return; } if (!std::exchange(state->updateRequested, true)) { observer_detail::ObserverManager::scheduleRefreshNewVersion(coreWeak_); } } template void subscribe(F&& callback) { Traits::subscribe(observable_, std::forward(callback)); } private: struct State { bool updateValue(std::shared_ptr newValue) { auto newValuePtr = newValue.get(); if (!newValue) { throw std::logic_error("Observable returned nullptr."); } value.swap(newValue); return newValuePtr != newValue.get(); } std::shared_ptr value; bool updateRequested{false}; }; folly::Synchronized state_; observer_detail::Core::WeakPtr coreWeak_; Observable observable_; }; } // namespace detail // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the // the same Context object, but using a separate reference count. Primary // shared_ptr destructor then blocks until all shared_ptrs obtained from // derived weak_ptrs are released. template class ObserverCreator::ContextPrimaryPtr { public: explicit ContextPrimaryPtr(std::shared_ptr context) : contextPrimary_(std::move(context)), context_( contextPrimary_.get(), [destroyBaton = destroyBaton_](Context*) { destroyBaton->post(); }) {} ~ContextPrimaryPtr() { if (context_) { context_.reset(); destroyBaton_->wait(); } } ContextPrimaryPtr(const ContextPrimaryPtr&) = delete; ContextPrimaryPtr(ContextPrimaryPtr&&) = default; ContextPrimaryPtr& operator=(const ContextPrimaryPtr&) = delete; ContextPrimaryPtr& operator=(ContextPrimaryPtr&&) = default; Context* operator->() const { return contextPrimary_.get(); } std::weak_ptr get_weak() { return context_; } private: std::shared_ptr> destroyBaton_{ std::make_shared>()}; std::shared_ptr contextPrimary_; std::shared_ptr context_; }; template template ObserverCreator::ObserverCreator(Args&&... args) : context_(std::make_shared(std::forward(args)...)) {} template Observer::T> ObserverCreator::getObserver() && { // We want to make sure that Context can only be destroyed when Core is // destroyed. So we have to avoid the situation when subscribe callback is // locking Context shared_ptr and remains the last to release it. // We solve this by having Core hold the master shared_ptr and subscription // callback gets derived weak_ptr. ContextPrimaryPtr contextPrimary(context_); auto contextWeak = contextPrimary.get_weak(); auto observer = makeObserver( [context = std::move(contextPrimary)]() { return context->get(); }); context_->setCore(observer.core_); context_->subscribe([contextWeak = std::move(contextWeak)] { if (auto context = contextWeak.lock()) { context->update(); } }); // Do an extra update in case observable was updated between observer creation // and setting updates callback. context_->update(); context_.reset(); return observer; } } // namespace observer } // namespace folly