// Copyright (c) Facebook, Inc. and its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include "yarpl/Refcounted.h" #include "yarpl/single/SingleObserver.h" #include "yarpl/single/SingleObservers.h" #include "yarpl/single/SingleSubscription.h" namespace yarpl { namespace single { template class Single : public yarpl::enable_get_ref { public: virtual ~Single() = default; virtual void subscribe(std::shared_ptr>) = 0; /** * Subscribe overload that accepts lambdas. */ template < typename Success, typename = typename std::enable_if< folly::is_invocable&, T>::value>::type> void subscribe(Success&& next) { subscribe(SingleObservers::create(std::forward(next))); } /** * Subscribe overload that accepts lambdas. */ template < typename Success, typename Error, typename = typename std::enable_if< folly::is_invocable&, T>::value && folly::is_invocable&, folly::exception_wrapper>:: value>::type> void subscribe(Success next, Error error) { subscribe(SingleObservers::create( std::forward(next), std::forward(error))); } /** * Blocking subscribe that accepts lambdas. * * This blocks the current thread waiting on the response. */ template < typename Success, typename = typename std::enable_if< folly::is_invocable&, T>::value>::type> void subscribeBlocking(Success&& next) { auto waiting_ = std::make_shared>(); subscribe( SingleObservers::create([next = std::forward(next), waiting_](T t) { next(std::move(t)); waiting_->post(); })); // TODO get errors and throw if one is received waiting_->wait(); } template < typename OnSubscribe, typename = typename std::enable_if&, std::shared_ptr>>::value>::type> static std::shared_ptr> create(OnSubscribe&&); template auto map(Function&& function); }; template <> class Single { public: virtual ~Single() = default; virtual void subscribe(std::shared_ptr>) = 0; /** * Subscribe overload taking lambda for onSuccess that is called upon writing * to the network. */ template < typename Success, typename = typename std::enable_if< folly::is_invocable&>::value>::type> void subscribe(Success&& s) { class SuccessSingleObserver : public SingleObserverBase { public: explicit SuccessSingleObserver(Success&& success) : success_{std::forward(success)} {} void onSubscribe( std::shared_ptr subscription) override { SingleObserverBase::onSubscribe(std::move(subscription)); } void onSuccess() override { success_(); SingleObserverBase::onSuccess(); } // No further calls to the subscription after this method is invoked. void onError(folly::exception_wrapper ex) override { SingleObserverBase::onError(std::move(ex)); } private: std::decay_t success_; }; subscribe( std::make_shared(std::forward(s))); } template < typename OnSubscribe, typename = typename std::enable_if&, std::shared_ptr>>::value>::type> static auto create(OnSubscribe&&); }; } // namespace single } // namespace yarpl #include "yarpl/single/SingleOperator.h" namespace yarpl { namespace single { template template std::shared_ptr> Single::create(OnSubscribe&& function) { return std::make_shared>>( std::forward(function)); } template auto Single::create(OnSubscribe&& function) { return std::make_shared< SingleVoidFromPublisherOperator>>( std::forward(function)); } template template auto Single::map(Function&& function) { using D = typename folly::invoke_result_t; return std::make_shared>>( this->ref_from_this(this), std::forward(function)); } } // namespace single } // namespace yarpl