// Copyright (c) Facebook, Inc. and its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include #include "yarpl/flowable/Flowable.h" #include "yarpl/flowable/Subscriber.h" #include "yarpl/flowable/Subscription.h" #include "yarpl/utils/credits.h" #include #include #include #include #include namespace yarpl { namespace flowable { /** * Base (helper) class for operators. Operators are templated on two types: D * (downstream) and U (upstream). Operators are created by method calls on an * upstream Flowable, and are Flowables themselves. Multi-stage pipelines can * be built: a Flowable heading a sequence of Operators. */ template class FlowableOperator : public Flowable { protected: /// An Operator's subscription. /// /// When a pipeline chain is active, each Flowable has a corresponding /// subscription. Except for the first one, the subscriptions are created /// against Operators. Each operator subscription has two functions: as a /// subscriber for the previous stage; as a subscription for the next one, the /// user-supplied subscriber being the last of the pipeline stages. class Subscription : public yarpl::flowable::Subscription, public BaseSubscriber { protected: explicit Subscription(std::shared_ptr> subscriber) : subscriber_(std::move(subscriber)) { CHECK(yarpl::atomic_load(&subscriber_)); } // Subscriber will be provided by the init(Subscriber) call Subscription() {} virtual void init(std::shared_ptr> subscriber) { if (yarpl::atomic_load(&subscriber_)) { subscriber->onSubscribe(yarpl::flowable::Subscription::create()); subscriber->onError(std::runtime_error("already initialized")); return; } subscriber_ = std::move(subscriber); } void subscriberOnNext(D value) { if (auto subscriber = yarpl::atomic_load(&subscriber_)) { subscriber->onNext(std::move(value)); } } /// Terminates both ends of an operator normally. void terminate() { std::shared_ptr> null; auto subscriber = yarpl::atomic_exchange(&subscriber_, null); BaseSubscriber::cancel(); if (subscriber) { subscriber->onComplete(); } } /// Terminates both ends of an operator with an error. void terminateErr(folly::exception_wrapper ew) { std::shared_ptr> null; auto subscriber = yarpl::atomic_exchange(&subscriber_, null); BaseSubscriber::cancel(); if (subscriber) { subscriber->onError(std::move(ew)); } } // Subscription. void request(int64_t n) override { BaseSubscriber::request(n); } void cancel() override { std::shared_ptr> null; auto subscriber = yarpl::atomic_exchange(&subscriber_, null); BaseSubscriber::cancel(); } // Subscriber. void onSubscribeImpl() override { yarpl::atomic_load(&subscriber_)->onSubscribe(this->ref_from_this(this)); } void onCompleteImpl() override { std::shared_ptr> null; if (auto subscriber = yarpl::atomic_exchange(&subscriber_, null)) { subscriber->onComplete(); } } void onErrorImpl(folly::exception_wrapper ew) override { std::shared_ptr> null; if (auto subscriber = yarpl::atomic_exchange(&subscriber_, null)) { subscriber->onError(std::move(ew)); } } private: /// This subscription controls the life-cycle of the subscriber. The /// subscriber is retained as long as calls on it can be made. (Note: the /// subscriber in turn maintains a reference on this subscription object /// until cancellation and/or completion.) AtomicReference> subscriber_; }; }; template class MapOperator : public FlowableOperator { using Super = FlowableOperator; static_assert(std::is_same, F>::value, "undecayed"); static_assert(folly::is_invocable_r::value, "not invocable"); static_assert( folly::is_invocable_r< folly::exception_wrapper, EF, folly::exception_wrapper&&>::value, "exception handler not invocable"); public: template MapOperator( std::shared_ptr> upstream, Func&& function, ErrorFunc&& errFunction) : upstream_(std::move(upstream)), function_(std::forward(function)), errFunction_(std::move(errFunction)) {} void subscribe(std::shared_ptr> subscriber) override { upstream_->subscribe(std::make_shared( this->ref_from_this(this), std::move(subscriber))); } private: using SuperSubscription = typename Super::Subscription; class Subscription : public SuperSubscription { public: Subscription( std::shared_ptr flowable, std::shared_ptr> subscriber) : SuperSubscription(std::move(subscriber)), flowable_(std::move(flowable)) {} void onNextImpl(U value) override { try { if (auto flowable = yarpl::atomic_load(&flowable_)) { this->subscriberOnNext(flowable->function_(std::move(value))); } } catch (const std::exception& exn) { folly::exception_wrapper ew{std::current_exception(), exn}; this->terminateErr(std::move(ew)); } } void onErrorImpl(folly::exception_wrapper ew) override { try { if (auto flowable = yarpl::atomic_load(&flowable_)) { SuperSubscription::onErrorImpl(flowable->errFunction_(std::move(ew))); } } catch (const std::exception& exn) { this->terminateErr( folly::exception_wrapper{std::current_exception(), exn}); } } void onTerminateImpl() override { yarpl::atomic_exchange(&flowable_, nullptr); SuperSubscription::onTerminateImpl(); } private: AtomicReference flowable_; }; std::shared_ptr> upstream_; F function_; EF errFunction_; }; template class FilterOperator : public FlowableOperator { // for use in subclasses using Super = FlowableOperator; static_assert(std::is_same, F>::value, "undecayed"); static_assert(folly::is_invocable_r::value, "not invocable"); public: template FilterOperator(std::shared_ptr> upstream, Func&& function) : upstream_(std::move(upstream)), function_(std::forward(function)) {} void subscribe(std::shared_ptr> subscriber) override { upstream_->subscribe(std::make_shared( this->ref_from_this(this), std::move(subscriber))); } private: using SuperSubscription = typename Super::Subscription; class Subscription : public SuperSubscription { public: Subscription( std::shared_ptr flowable, std::shared_ptr> subscriber) : SuperSubscription(std::move(subscriber)), flowable_(std::move(flowable)) {} void onNextImpl(U value) override { if (auto flowable = yarpl::atomic_load(&flowable_)) { if (flowable->function_(value)) { SuperSubscription::subscriberOnNext(std::move(value)); } else { SuperSubscription::request(1); } } } void onTerminateImpl() override { yarpl::atomic_exchange(&flowable_, nullptr); SuperSubscription::onTerminateImpl(); } private: AtomicReference flowable_; }; std::shared_ptr> upstream_; F function_; }; template class ReduceOperator : public FlowableOperator { using Super = FlowableOperator; static_assert(std::is_same, F>::value, "undecayed"); static_assert(std::is_assignable::value, "not assignable"); static_assert(folly::is_invocable_r::value, "not invocable"); public: template ReduceOperator(std::shared_ptr> upstream, Func&& function) : upstream_(std::move(upstream)), function_(std::forward(function)) {} void subscribe(std::shared_ptr> subscriber) override { upstream_->subscribe(std::make_shared( this->ref_from_this(this), std::move(subscriber))); } private: using SuperSubscription = typename Super::Subscription; class Subscription : public SuperSubscription { public: Subscription( std::shared_ptr flowable, std::shared_ptr> subscriber) : SuperSubscription(std::move(subscriber)), flowable_(std::move(flowable)), accInitialized_(false) {} void request(int64_t) override { // Request all of the items. SuperSubscription::request(credits::kNoFlowControl); } void onNextImpl(U value) override { if (accInitialized_) { if (auto flowable = yarpl::atomic_load(&flowable_)) { acc_ = flowable->function_(std::move(acc_), std::move(value)); } } else { acc_ = std::move(value); accInitialized_ = true; } } void onCompleteImpl() override { if (accInitialized_) { SuperSubscription::subscriberOnNext(std::move(acc_)); } SuperSubscription::onCompleteImpl(); } void onTerminateImpl() override { yarpl::atomic_exchange(&flowable_, nullptr); SuperSubscription::onTerminateImpl(); } private: AtomicReference flowable_; bool accInitialized_; D acc_; }; std::shared_ptr> upstream_; F function_; }; template class TakeOperator : public FlowableOperator { using Super = FlowableOperator; public: TakeOperator(std::shared_ptr> upstream, int64_t limit) : upstream_(std::move(upstream)), limit_(limit) {} void subscribe(std::shared_ptr> subscriber) override { upstream_->subscribe( std::make_shared(limit_, std::move(subscriber))); } private: using SuperSubscription = typename Super::Subscription; class Subscription : public SuperSubscription { public: Subscription(int64_t limit, std::shared_ptr> subscriber) : SuperSubscription(std::move(subscriber)), limit_(limit) {} void onSubscribeImpl() override { SuperSubscription::onSubscribeImpl(); if (limit_ <= 0) { SuperSubscription::terminate(); } } void onNextImpl(T value) override { if (limit_-- > 0) { if (pending_ > 0) { --pending_; } SuperSubscription::subscriberOnNext(std::move(value)); if (limit_ == 0) { SuperSubscription::terminate(); } } } void request(int64_t delta) override { delta = std::min(delta, limit_ - pending_); if (delta > 0) { pending_ += delta; SuperSubscription::request(delta); } } private: int64_t pending_{0}; int64_t limit_; }; std::shared_ptr> upstream_; const int64_t limit_; }; template class SkipOperator : public FlowableOperator { using Super = FlowableOperator; public: SkipOperator(std::shared_ptr> upstream, int64_t offset) : upstream_(std::move(upstream)), offset_(offset) {} void subscribe(std::shared_ptr> subscriber) override { upstream_->subscribe( std::make_shared(offset_, std::move(subscriber))); } private: using SuperSubscription = typename Super::Subscription; class Subscription : public SuperSubscription { public: Subscription(int64_t offset, std::shared_ptr> subscriber) : SuperSubscription(std::move(subscriber)), offset_(offset) {} void onNextImpl(T value) override { if (offset_ > 0) { --offset_; } else { SuperSubscription::subscriberOnNext(std::move(value)); } } void request(int64_t delta) override { if (firstRequest_) { firstRequest_ = false; delta = credits::add(delta, offset_); } SuperSubscription::request(delta); } private: int64_t offset_; bool firstRequest_{true}; }; std::shared_ptr> upstream_; const int64_t offset_; }; template class IgnoreElementsOperator : public FlowableOperator { using Super = FlowableOperator; public: explicit IgnoreElementsOperator(std::shared_ptr> upstream) : upstream_(std::move(upstream)) {} void subscribe(std::shared_ptr> subscriber) override { upstream_->subscribe(std::make_shared(std::move(subscriber))); } private: using SuperSubscription = typename Super::Subscription; class Subscription : public SuperSubscription { public: Subscription(std::shared_ptr> subscriber) : SuperSubscription(std::move(subscriber)) {} void onNextImpl(T) override {} }; std::shared_ptr> upstream_; }; template class SubscribeOnOperator : public FlowableOperator { using Super = FlowableOperator; public: SubscribeOnOperator( std::shared_ptr> upstream, folly::Executor& executor) : upstream_(std::move(upstream)), executor_(executor) {} void subscribe(std::shared_ptr> subscriber) override { executor_.add([this, self = this->ref_from_this(this), subscriber] { upstream_->subscribe( std::make_shared(executor_, std::move(subscriber))); }); } private: using SuperSubscription = typename Super::Subscription; class Subscription : public SuperSubscription { public: Subscription( folly::Executor& executor, std::shared_ptr> subscriber) : SuperSubscription(std::move(subscriber)), executor_(executor) {} void request(int64_t delta) override { executor_.add([delta, this, self = this->ref_from_this(this)] { this->callSuperRequest(delta); }); } void cancel() override { executor_.add([this, self = this->ref_from_this(this)] { this->callSuperCancel(); }); } void onNextImpl(T value) override { SuperSubscription::subscriberOnNext(std::move(value)); } private: // Trampoline to call superclass method; gcc bug 58972. void callSuperRequest(int64_t delta) { SuperSubscription::request(delta); } // Trampoline to call superclass method; gcc bug 58972. void callSuperCancel() { SuperSubscription::cancel(); } folly::Executor& executor_; }; std::shared_ptr> upstream_; folly::Executor& executor_; }; template class FromPublisherOperator : public Flowable { static_assert( std::is_same, OnSubscribe>::value, "undecayed"); public: template explicit FromPublisherOperator(F&& function) : function_(std::forward(function)) {} void subscribe(std::shared_ptr> subscriber) override { function_(std::move(subscriber)); } private: OnSubscribe function_; }; template class FlatMapOperator : public FlowableOperator { using Super = FlowableOperator; public: FlatMapOperator( std::shared_ptr> upstream, folly::Function>(T)> func) : upstream_(std::move(upstream)), function_(std::move(func)) {} void subscribe(std::shared_ptr> subscriber) override { upstream_->subscribe(std::make_shared( this->ref_from_this(this), std::move(subscriber))); } private: using SuperSubscription = typename Super::Subscription; class FMSubscription : public SuperSubscription { struct MappedStreamSubscriber; public: FMSubscription( std::shared_ptr flowable, std::shared_ptr> subscriber) : SuperSubscription(std::move(subscriber)), flowable_(std::move(flowable)) {} void onSubscribeImpl() final { liveSubscribers_++; SuperSubscription::onSubscribeImpl(); } void onNextImpl(T value) final { std::shared_ptr> mappedStream; try { mappedStream = flowable_->function_(std::move(value)); } catch (const std::exception& exn) { folly::exception_wrapper ew{std::current_exception(), exn}; { std::lock_guard g(onErrorExGuard_); onErrorEx_ = ew; } // next iteration of drainLoop will cancel this subscriber as well drainLoop(); return; } std::shared_ptr mappedSubscriber = std::make_shared(this->ref_from_this(this)); mappedSubscriber->fmReference_ = mappedSubscriber; { // put into pendingValue queue because once the mappedSubscriber // is subscribed to, it will request elements. We don't want the // drainLoop to execute while it's on withoutValue, and request // a second element before the first arrives. auto l = lists.wlock(); CHECK(!mappedSubscriber->is_linked()); l->pendingValue.push_back(*mappedSubscriber.get()); } liveSubscribers_++; mappedStream->subscribe(mappedSubscriber); drainLoop(); } void drainImpl() { // phase 1: clear out terminated subscribers { auto clearList = [](auto& list, SubscriberList& t) { while (!list.empty()) { auto& elem = list.front(); auto r = elem.sync.wlock(); r->freeze = true; elem.unlink(); t.push_back(elem); } }; SubscriberList clearTrash; if (clearAllSubscribers_.load()) { auto l = lists.wlock(); clearList(l->withValue, clearTrash); clearList(l->withoutValue, clearTrash); clearList(l->pendingValue, clearTrash); } // clear elements while no locks are held while (!clearTrash.empty()) { auto& elem = clearTrash.front(); elem.unlink(); elem.cancel(); elem.fmReference_ = nullptr; } } // phase 2: check if the subscriber should terminate due to error // or all subscribers completing if (!calledDownstreamTerminate_) { folly::exception_wrapper ex; { std::lock_guard exg(onErrorExGuard_); ex = std::move(onErrorEx_); } if (ex) { calledDownstreamTerminate_ = true; cancel(); this->terminateErr(std::move(ex)); } else if (liveSubscribers_ == 0) { calledDownstreamTerminate_ = true; this->terminate(); } } // phase 3: if the downstream has requested elements, pop values out of // subscribers which have received a value and call downstream->onNext while (requested_ != 0) { R val; { auto l = lists.wlock(); if (l->withValue.empty()) { break; } requested_--; auto& elem = l->withValue.front(); elem.unlink(); { auto r = elem.sync.wlock(); CHECK(r->hasValue); r->hasValue = false; val = std::move(r->value); l->withoutValue.push_back(elem); } } SuperSubscription::subscriberOnNext(std::move(val)); } // phase 4: ask any upstream flowables which don't have pending // requests for their next element kick off any more requests. // Put subscribers which have terminated into the trash. { SubscriberList terminatedTrash; while (true) { MappedStreamSubscriber* elem; { auto l = lists.wlock(); if (l->withoutValue.empty()) { break; } elem = &l->withoutValue.front(); auto r = elem->sync.wlock(); CHECK(!r->hasValue) << "failed for elem=" << elem; // sanity elem->unlink(); // Subscribers might call onNext and then terminate; delay // removing its liveSubscriber reference until we've delivered // its element to the downstream subscriber and dropped its // synchronized reference to `r`, as dropping the // flatMapSubscription_ reference may invoke its destructor if (r->isTerminated) { r->freeze = true; terminatedTrash.push_back(*elem); continue; // skips the next elem->request(1) } // else, the stream hasn't terminated, request another // element l->pendingValue.push_back(*elem); } elem->request(1); } // phase 5: destroy any mapped subscribers which have terminated, // enqueue another drain loop run if we do end up discarding any // subscribers, as our live subscriber count may have gone to zero if (!terminatedTrash.empty()) { drainLoopMutex_++; } while (!terminatedTrash.empty()) { auto& elem = terminatedTrash.front(); CHECK(elem.sync.wlock()->isTerminated); elem.unlink(); elem.fmReference_ = nullptr; liveSubscribers_--; } } } // called from MappedStreamSubscriber, receives the R and the // subscriber which generated the R void drainLoop() { auto self = this->ref_from_this(this); if (drainLoopMutex_++ == 0) { do { drainImpl(); } while (drainLoopMutex_-- != 1); } } void onMappedSubscriberNext(MappedStreamSubscriber* elem, R value) { { // `elem` may not be in a list, as it may have been canceled. Push it // on the withValue list and let drainLoop clear it if that's the case. auto l = lists.wlock(); auto r = elem->sync.wlock(); if (r->freeze) { return; } CHECK(!r->hasValue) << "failed for elem=" << elem; r->hasValue = true; r->value = std::move(value); elem->unlink(); l->withValue.push_back(*elem); } drainLoop(); } void onMappedSubscriberTerminate(MappedStreamSubscriber* elem) { { auto r = elem->sync.wlock(); r->isTerminated = true; if (r->onErrorEx) { std::lock_guard exg(onErrorExGuard_); onErrorEx_ = std::move(r->onErrorEx); } if (r->freeze) { return; } } { auto l = lists.wlock(); auto r = elem->sync.wlock(); if (r->freeze) { return; } CHECK(elem->is_linked()); elem->unlink(); if (r->hasValue) { l->withValue.push_back(*elem); } else { liveSubscribers_--; elem->fmReference_ = nullptr; } } drainLoop(); } // onComplete/onError fall through to onTerminateImpl, which // will call drainLoop and update the liveSubscribers_ count void onCompleteImpl() final {} void onErrorImpl(folly::exception_wrapper ex) final { std::lock_guard g(onErrorExGuard_); onErrorEx_ = std::move(ex); clearAllSubscribers_.store(true); } void onTerminateImpl() final { liveSubscribers_--; drainLoop(); flowable_.reset(); } void request(int64_t n) override { if ((n + requested_) < requested_) { requested_ = std::numeric_limits::max(); } else { requested_ += n; } if (n > 0) { // TODO: make max parallelism configurable a-la RxJava 2.x's // FlowableFlatMapOperator SuperSubscription::request(std::numeric_limits::max()); } drainLoop(); } void cancel() override { clearAllSubscribers_.store(true); drainLoop(); } private: // buffers at most a single element of type R struct MappedStreamSubscriber : public BaseSubscriber, public boost::intrusive::list_base_hook< boost::intrusive::link_mode> { MappedStreamSubscriber(std::shared_ptr subscription) : flatMapSubscription_(std::move(subscription)) {} void onSubscribeImpl() final { auto fmsb = yarpl::atomic_load(&flatMapSubscription_); if (!fmsb || fmsb->clearAllSubscribers_) { BaseSubscriber::cancel(); return; } #ifndef NDEBUG if (auto fms = yarpl::atomic_load(&flatMapSubscription_)) { auto l = fms->lists.wlock(); auto r = sync.wlock(); if (!is_in_list(*this, l->pendingValue, l)) { LOG(INFO) << "failed: this=" << this; LOG(INFO) << "in list: "; debug_is_in_list(*this, l); DCHECK(r->freeze); } else { } DCHECK(!r->hasValue); } #endif BaseSubscriber::request(1); } void onNextImpl(R value) final { if (auto fms = yarpl::atomic_load(&flatMapSubscription_)) { fms->onMappedSubscriberNext(this, std::move(value)); } } // noop void onCompleteImpl() final {} void onErrorImpl(folly::exception_wrapper ex) final { auto r = sync.wlock(); r->onErrorEx = std::move(ex); } void onTerminateImpl() override { std::shared_ptr null; if (auto fms = yarpl::atomic_exchange(&flatMapSubscription_, null)) { fms->onMappedSubscriberTerminate(this); } } struct SyncData { R value; bool hasValue{false}; bool isTerminated{false}; bool freeze{false}; folly::exception_wrapper onErrorEx{nullptr}; }; folly::Synchronized sync; // FMSubscription's 'reference' to this object. FMSubscription // clears this reference when it drops the MappedStreamSubscriber // from one of its atomic lists std::shared_ptr fmReference_{nullptr}; // this is both a Subscriber and a Subscription AtomicReference flatMapSubscription_{nullptr}; }; // used to make sure only one thread at a time is calling subscriberOnNext std::atomic drainLoopMutex_{0}; using SubscriberList = boost::intrusive::list< MappedStreamSubscriber, boost::intrusive::constant_time_size>; struct Lists { // subscribers with a ready R SubscriberList withValue{}; // subscribers that have requested 1 R, waiting for it to arrive via // onNext SubscriberList pendingValue{}; // idle subscribers SubscriberList withoutValue{}; }; folly::Synchronized lists; template static bool is_in_list( MappedStreamSubscriber const& elem, SubscriberList const& list, L const& lists) { return in_list_impl(elem, list, lists, true); } template static bool not_in_list( MappedStreamSubscriber const& elem, SubscriberList const& list, L const& lists) { return in_list_impl(elem, list, lists, false); } template static bool in_list_impl( MappedStreamSubscriber const& elem, SubscriberList const& list, L const& lists, bool should) { if (is_in_list(elem, list) != should) { #ifndef NDEBUG debug_is_in_list(elem, lists); #else (void)lists; #endif return false; } return true; } template static void debug_is_in_list( MappedStreamSubscriber const& elem, L const& lists) { LOG(INFO) << "in without: " << is_in_list(elem, lists->withoutValue); LOG(INFO) << "in pending: " << is_in_list(elem, lists->pendingValue); LOG(INFO) << "in withval: " << is_in_list(elem, lists->withValue); } static bool is_in_list( MappedStreamSubscriber const& elem, SubscriberList const& list) { bool found = false; for (auto& e : list) { if (&e == &elem) { found = true; break; } } return found; } std::shared_ptr flowable_; // got a terminating signal from the upstream flowable // always modified in the protected drainImpl() bool calledDownstreamTerminate_{false}; std::mutex onErrorExGuard_; folly::exception_wrapper onErrorEx_{nullptr}; // clear all lists of std::atomic clearAllSubscribers_{false}; std::atomic requested_{0}; // number of subscribers (FMSubscription + MappedStreamSubscriber) which // have not received a terminating signal yet std::atomic liveSubscribers_{0}; }; std::shared_ptr> upstream_; folly::Function>(T)> function_; }; } // namespace flowable } // namespace yarpl #include "yarpl/flowable/FlowableConcatOperators.h" #include "yarpl/flowable/FlowableDoOperator.h" #include "yarpl/flowable/FlowableObserveOnOperator.h" #include "yarpl/flowable/FlowableTimeoutOperator.h"