// Copyright (c) Facebook, Inc. and its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include "yarpl/Observable.h" #include "yarpl/observable/Observer.h" #include "yarpl/observable/Observable.h" namespace yarpl { namespace observable { /** * Base (helper) class for operators. Operators are templated on two types: * D (downstream) and U (upstream). Operators are created by method calls on * an upstream Observable, and are Observables themselves. Multi-stage * pipelines * can be built: a Observable heading a sequence of Operators. */ template class ObservableOperator : public Observable { protected: /// An Operator's subscription. /// /// When a pipeline chain is active, each Observable has a corresponding /// subscription. Except for the first one, the subscriptions are created /// against Operators. Each operator subscription has two functions: as a /// subscriber for the previous stage; as a subscription for the next one, /// the user-supplied subscriber being the last of the pipeline stages. class OperatorSubscription : public ::yarpl::observable::Subscription, public Observer { protected: explicit OperatorSubscription(std::shared_ptr> observer) : observer_(std::move(observer)) { assert(observer_); } void observerOnNext(D value) { if (observer_) { observer_->onNext(std::move(value)); } } /// Terminates both ends of an operator normally. void terminate() { terminateImpl(TerminateState::Both()); } /// Terminates both ends of an operator with an error. void terminateErr(folly::exception_wrapper ex) { terminateImpl(TerminateState::Both(), std::move(ex)); } // Subscription. void cancel() override { Subscription::cancel(); terminateImpl(TerminateState::Up()); } // Observer. void onSubscribe(std::shared_ptr subscription) override { if (upstream_) { DLOG(ERROR) << "attempt to subscribe twice"; subscription->cancel(); return; } upstream_ = std::move(subscription); observer_->onSubscribe(this->ref_from_this(this)); } void onComplete() override { terminateImpl(TerminateState::Down()); } void onError(folly::exception_wrapper ex) override { terminateImpl(TerminateState::Down(), std::move(ex)); } private: struct TerminateState { TerminateState(bool u, bool d) : up{u}, down{d} {} static TerminateState Down() { return TerminateState{false, true}; } static TerminateState Up() { return TerminateState{true, false}; } static TerminateState Both() { return TerminateState{true, true}; } const bool up{false}; const bool down{false}; }; bool isTerminated() const { return !upstream_ && !observer_; } /// Terminates an operator, sending cancel() and on{Complete,Error}() /// signals as necessary. void terminateImpl( TerminateState state, folly::exception_wrapper ex = folly::exception_wrapper{nullptr}) { if (isTerminated()) { return; } if (auto upstream = std::move(upstream_)) { if (state.up) { upstream->cancel(); } } if (auto observer = std::move(observer_)) { if (state.down) { if (ex) { observer->onError(std::move(ex)); } else { observer->onComplete(); } } } } /// This subscription controls the life-cycle of the observer. The /// observer is retained as long as calls on it can be made. (Note: /// the observer in turn maintains a reference on this subscription /// object until cancellation and/or completion.) std::shared_ptr> observer_; /// In an active pipeline, cancel and (possibly modified) request(n) /// calls should be forwarded upstream. Note that `this` is also a /// observer for the upstream stage: thus, there are cycles; all of /// the objects drop their references at cancel/complete. // TODO(lehecka): this is extra field... base class has this member so // remove it std::shared_ptr<::yarpl::observable::Subscription> upstream_; }; }; template class MapOperator : public ObservableOperator { using Super = ObservableOperator; static_assert(std::is_same, F>::value, "undecayed"); static_assert(folly::is_invocable_r::value, "not invocable"); public: template MapOperator(std::shared_ptr> upstream, Func&& function) : upstream_(std::move(upstream)), function_(std::forward(function)) {} std::shared_ptr subscribe( std::shared_ptr> observer) override { auto subscription = std::make_shared( this->ref_from_this(this), std::move(observer)); upstream_->subscribe( // Note: implicit cast to a reference to a observer. subscription); return subscription; } private: class MapSubscription : public Super::OperatorSubscription { using SuperSub = typename Super::OperatorSubscription; public: MapSubscription( std::shared_ptr observable, std::shared_ptr> observer) : SuperSub(std::move(observer)), observable_(std::move(observable)) {} void onNext(U value) override { try { this->observerOnNext(observable_->function_(std::move(value))); } catch (const std::exception& exn) { folly::exception_wrapper ew{std::current_exception(), exn}; this->terminateErr(std::move(ew)); } } private: std::shared_ptr observable_; }; std::shared_ptr> upstream_; F function_; }; template class FilterOperator : public ObservableOperator { using Super = ObservableOperator; static_assert(std::is_same, F>::value, "undecayed"); static_assert(folly::is_invocable_r::value, "not invocable"); public: template FilterOperator(std::shared_ptr> upstream, Func&& function) : upstream_(std::move(upstream)), function_(std::forward(function)) {} std::shared_ptr subscribe( std::shared_ptr> observer) override { auto subscription = std::make_shared( this->ref_from_this(this), std::move(observer)); upstream_->subscribe( // Note: implicit cast to a reference to a observer. subscription); return subscription; } private: class FilterSubscription : public Super::OperatorSubscription { using SuperSub = typename Super::OperatorSubscription; public: FilterSubscription( std::shared_ptr observable, std::shared_ptr> observer) : SuperSub(std::move(observer)), observable_(std::move(observable)) {} void onNext(U value) override { if (observable_->function_(value)) { SuperSub::observerOnNext(std::move(value)); } } private: std::shared_ptr observable_; }; std::shared_ptr> upstream_; F function_; }; template class ReduceOperator : public ObservableOperator { using Super = ObservableOperator; static_assert(std::is_same, F>::value, "undecayed"); static_assert(std::is_assignable::value, "not assignable"); static_assert(folly::is_invocable_r::value, "not invocable"); public: template ReduceOperator(std::shared_ptr> upstream, Func&& function) : upstream_(std::move(upstream)), function_(std::forward(function)) {} std::shared_ptr subscribe( std::shared_ptr> subscriber) override { auto subscription = std::make_shared( this->ref_from_this(this), std::move(subscriber)); upstream_->subscribe( // Note: implicit cast to a reference to a subscriber. subscription); return subscription; } private: class ReduceSubscription : public Super::OperatorSubscription { using SuperSub = typename Super::OperatorSubscription; public: ReduceSubscription( std::shared_ptr observable, std::shared_ptr> observer) : SuperSub(std::move(observer)), observable_(std::move(observable)), accInitialized_(false) {} void onNext(U value) override { if (accInitialized_) { acc_ = observable_->function_(std::move(acc_), std::move(value)); } else { acc_ = std::move(value); accInitialized_ = true; } } void onComplete() override { if (accInitialized_) { SuperSub::observerOnNext(std::move(acc_)); } SuperSub::onComplete(); } private: std::shared_ptr observable_; bool accInitialized_; D acc_; }; std::shared_ptr> upstream_; F function_; }; template class TakeOperator : public ObservableOperator { using Super = ObservableOperator; public: TakeOperator(std::shared_ptr> upstream, int64_t limit) : upstream_(std::move(upstream)), limit_(limit) {} std::shared_ptr subscribe( std::shared_ptr> observer) override { auto subscription = std::make_shared(limit_, std::move(observer)); upstream_->subscribe(subscription); return subscription; } private: class TakeSubscription : public Super::OperatorSubscription { using SuperSub = typename Super::OperatorSubscription; public: TakeSubscription(int64_t limit, std::shared_ptr> observer) : SuperSub(std::move(observer)), limit_(limit) {} void onSubscribe(std::shared_ptr subscription) override { SuperSub::onSubscribe(std::move(subscription)); if (limit_ <= 0) { SuperSub::terminate(); } } void onNext(T value) override { if (limit_-- > 0) { SuperSub::observerOnNext(std::move(value)); if (limit_ == 0) { SuperSub::terminate(); } } } private: int64_t limit_; }; std::shared_ptr> upstream_; const int64_t limit_; }; template class SkipOperator : public ObservableOperator { using Super = ObservableOperator; public: SkipOperator(std::shared_ptr> upstream, int64_t offset) : upstream_(std::move(upstream)), offset_(offset) {} std::shared_ptr subscribe( std::shared_ptr> observer) override { auto subscription = std::make_shared(offset_, std::move(observer)); upstream_->subscribe(subscription); return subscription; } private: class SkipSubscription : public Super::OperatorSubscription { using SuperSub = typename Super::OperatorSubscription; public: SkipSubscription(int64_t offset, std::shared_ptr> observer) : SuperSub(std::move(observer)), offset_(offset) {} void onNext(T value) override { if (offset_ <= 0) { SuperSub::observerOnNext(std::move(value)); } else { --offset_; } } private: int64_t offset_; }; std::shared_ptr> upstream_; const int64_t offset_; }; template class IgnoreElementsOperator : public ObservableOperator { using Super = ObservableOperator; public: explicit IgnoreElementsOperator(std::shared_ptr> upstream) : upstream_(std::move(upstream)) {} std::shared_ptr subscribe( std::shared_ptr> observer) override { auto subscription = std::make_shared(std::move(observer)); upstream_->subscribe(subscription); return subscription; } private: class IgnoreElementsSubscription : public Super::OperatorSubscription { using SuperSub = typename Super::OperatorSubscription; public: IgnoreElementsSubscription(std::shared_ptr> observer) : SuperSub(std::move(observer)) {} void onNext(T) override {} }; std::shared_ptr> upstream_; }; template class SubscribeOnOperator : public ObservableOperator { using Super = ObservableOperator; public: SubscribeOnOperator( std::shared_ptr> upstream, folly::Executor& executor) : upstream_(std::move(upstream)), executor_(executor) {} std::shared_ptr subscribe( std::shared_ptr> observer) override { auto subscription = std::make_shared( executor_, std::move(observer)); executor_.add([subscription, upstream = upstream_]() mutable { upstream->subscribe(std::move(subscription)); }); return subscription; } private: class SubscribeOnSubscription : public Super::OperatorSubscription { using SuperSub = typename Super::OperatorSubscription; public: SubscribeOnSubscription( folly::Executor& executor, std::shared_ptr> observer) : SuperSub(std::move(observer)), executor_(executor) {} void cancel() override { executor_.add([self = this->ref_from_this(this), this] { this->callSuperCancel(); }); } void onNext(T value) override { SuperSub::observerOnNext(std::move(value)); } private: // Trampoline to call superclass method; gcc bug 58972. void callSuperCancel() { SuperSub::cancel(); } folly::Executor& executor_; }; std::shared_ptr> upstream_; folly::Executor& executor_; }; template class FromPublisherOperator : public Observable { static_assert( std::is_same, OnSubscribe>::value, "undecayed"); public: template explicit FromPublisherOperator(F&& function) : function_(std::forward(function)) {} private: class PublisherObserver : public Observer { public: PublisherObserver( std::shared_ptr> inner, std::shared_ptr subscription) : inner_(std::move(inner)) { Observer::onSubscribe(std::move(subscription)); } void onSubscribe(std::shared_ptr) override { DLOG(ERROR) << "not allowed to call"; CHECK(false); } void onComplete() override { if (auto inner = atomic_exchange(&inner_, nullptr)) { inner->onComplete(); } Observer::onComplete(); } void onError(folly::exception_wrapper ex) override { if (auto inner = atomic_exchange(&inner_, nullptr)) { inner->onError(std::move(ex)); } Observer::onError(folly::exception_wrapper()); } void onNext(T t) override { atomic_load(&inner_)->onNext(std::move(t)); } private: AtomicReference> inner_; }; public: std::shared_ptr subscribe( std::shared_ptr> observer) override { auto subscription = Subscription::create(); observer->onSubscribe(subscription); if (!subscription->isCancelled()) { function_(std::make_shared( std::move(observer), subscription), subscription); } return subscription; } private: OnSubscribe function_; }; } // namespace observable } // namespace yarpl #include "yarpl/observable/ObservableConcatOperators.h" #include "yarpl/observable/ObservableDoOperator.h"