// Copyright (c) Facebook, Inc. and its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include #include "yarpl/single/Single.h" #include "yarpl/single/SingleObserver.h" #include "yarpl/single/SingleSubscriptions.h" namespace yarpl { namespace single { /** * Base (helper) class for operators. Operators are templated on two types: * D (downstream) and U (upstream). Operators are created by method calls on * an upstream Single, and are Observables themselves. Multi-stage * pipelines * can be built: a Single heading a sequence of Operators. */ template class SingleOperator : public Single { public: explicit SingleOperator(std::shared_ptr> upstream) : upstream_(std::move(upstream)) {} protected: /// /// \brief An Operator's subscription. /// /// When a pipeline chain is active, each Single has a corresponding /// subscription. Except for the first one, the subscriptions are created /// against Operators. Each operator subscription has two functions: as a /// observer for the previous stage; as a subscription for the next one, /// the user-supplied observer being the last of the pipeline stages. template class Subscription : public ::yarpl::single::SingleSubscription, public SingleObserver, public yarpl::enable_get_ref { protected: Subscription( std::shared_ptr single, std::shared_ptr> observer) : single_(std::move(single)), observer_(std::move(observer)) {} ~Subscription() { observer_.reset(); } void observerOnSuccess(D value) { terminateImpl(TerminateState::Down(), folly::Try{std::move(value)}); } void observerOnError(folly::exception_wrapper ew) { terminateImpl(TerminateState::Down(), folly::Try{std::move(ew)}); } std::shared_ptr getOperator() { return single_; } void terminateErr(folly::exception_wrapper ew) { terminateImpl(TerminateState::Both(), std::move(ew)); } // SingleSubscription. void cancel() override { terminateImpl(TerminateState::Up(), folly::Try{}); } // Subscriber. void onSubscribe(std::shared_ptr subscription) override { upstream_ = std::move(subscription); observer_->onSubscribe(this->ref_from_this(this)); } void onError(folly::exception_wrapper ew) override { terminateImpl(TerminateState::Down(), folly::Try{std::move(ew)}); } private: struct TerminateState { TerminateState(bool u, bool d) : up{u}, down{d} {} static TerminateState Down() { return TerminateState{false, true}; } static TerminateState Up() { return TerminateState{true, false}; } static TerminateState Both() { return TerminateState{true, true}; } const bool up{false}; const bool down{false}; }; bool isTerminated() const { return !upstream_ && !observer_; } void terminateImpl(TerminateState state, folly::Try maybe) { if (isTerminated()) { return; } if (auto upstream = std::move(upstream_)) { if (state.up) { upstream->cancel(); } } if (auto observer = std::move(observer_)) { if (state.down) { if (maybe.hasValue()) { observer->onSuccess(std::move(maybe).value()); } else { observer->onError(std::move(maybe).exception()); } } } } /// The Single has the lambda, and other creation parameters. std::shared_ptr single_; /// This subscription controls the life-cycle of the observer. The /// observer is retained as long as calls on it can be made. (Note: /// the observer in turn maintains a reference on this subscription /// object until cancellation and/or completion.) std::shared_ptr> observer_; /// In an active pipeline, cancel and (possibly modified) request(n) /// calls should be forwarded upstream. Note that `this` is also a /// observer for the upstream stage: thus, there are cycles; all of /// the objects drop their references at cancel/complete. std::shared_ptr upstream_; }; std::shared_ptr> upstream_; }; template < typename U, typename D, typename F> class MapOperator : public SingleOperator { using ThisOperatorT = MapOperator; using Super = SingleOperator; using OperatorSubscription = typename Super::template Subscription; static_assert(std::is_same, F>::value, "undecayed"); static_assert(folly::is_invocable_r::value, "not invocable"); public: template MapOperator(std::shared_ptr> upstream, Func&& function) : Super(std::move(upstream)), function_(std::forward(function)) {} void subscribe(std::shared_ptr> observer) override { Super::upstream_->subscribe( // Note: implicit cast to a reference to a observer. std::make_shared( this->ref_from_this(this), std::move(observer))); } private: class MapSubscription : public OperatorSubscription { public: MapSubscription( std::shared_ptr single, std::shared_ptr> observer) : OperatorSubscription(std::move(single), std::move(observer)) {} void onSuccess(U value) override { try { auto map_operator = this->getOperator(); this->observerOnSuccess(map_operator->function_(std::move(value))); } catch (const std::exception& exn) { folly::exception_wrapper ew{std::current_exception(), exn}; this->observerOnError(std::move(ew)); } } }; F function_; }; template class FromPublisherOperator : public Single { static_assert( std::is_same, OnSubscribe>::value, "undecayed"); public: template explicit FromPublisherOperator(F&& function) : function_(std::forward(function)) {} void subscribe(std::shared_ptr> observer) override { function_(std::move(observer)); } private: OnSubscribe function_; }; template class SingleVoidFromPublisherOperator : public Single { static_assert( std::is_same, OnSubscribe>::value, "undecayed"); public: template explicit SingleVoidFromPublisherOperator(F&& function) : function_(std::forward(function)) {} void subscribe(std::shared_ptr> observer) override { function_(std::move(observer)); } private: OnSubscribe function_; }; } // namespace single } // namespace yarpl