/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace folly { namespace observer { template Observer withJitter( Observer observer, std::chrono::milliseconds lag, std::chrono::milliseconds jitter) { class WithJitterObservable { public: using element_type = T; WithJitterObservable( Observer observer, std::chrono::milliseconds lag, std::chrono::milliseconds jitter) : observer_(std::move(observer)), state_(std::make_shared>( State(observer_.getSnapshot().getShared()))), lag_(lag), jitter_(jitter) {} std::shared_ptr get() { return state_->lock()->laggingValue; } void subscribe(std::function callback) { handle_ = observer_.addCallback([state = state_, observer = observer_, callback = std::move(callback), lag = lag_, jitter = jitter_](auto /* snapshot */) { if (std::exchange(state->lock()->delayedRefreshPending, true)) { return; } const auto sleepFor = lag - jitter + std::chrono::milliseconds{Random::rand64(2 * jitter.count())}; auto* executor = dynamic_cast( getGlobalCPUExecutor().get()); CHECK(executor); futures::sleep(sleepFor) .via(executor->weakRef()) .thenValue([callback, observer, state](auto&&) mutable { state->withLock([&](auto& s) { s.laggingValue = observer.getSnapshot().getShared(); s.delayedRefreshPending = false; }); callback(); }); }); } void unsubscribe() { handle_.cancel(); } private: struct State { explicit State(std::shared_ptr value) : laggingValue(std::move(value)) {} std::shared_ptr laggingValue; bool delayedRefreshPending{false}; }; Observer observer_; CallbackHandle handle_; std::shared_ptr> state_; const std::chrono::milliseconds lag_; const std::chrono::milliseconds jitter_; }; if (lag == std::chrono::milliseconds::zero()) { return observer; } if (jitter > lag) { throw std::invalid_argument( fmt::format("lag ({}) cannot be less than jitter ({})", lag, jitter)); } return ObserverCreator(std::move(observer), lag, jitter) .getObserver(); } } // namespace observer } // namespace folly