/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <utility> #include <glog/logging.h> #include <folly/Portability.h> #include <folly/detail/MPMCPipelineDetail.h> namespace folly { /** * Helper tag template to use amplification > 1 */ template <class T, size_t Amp> class MPMCPipelineStage; /** * Multi-Producer, Multi-Consumer pipeline. * * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h). * * At each stage, you may dequeue the results from the previous stage (possibly * from multiple threads) and enqueue results to the next stage. Regardless of * the order of completion, data is delivered to the next stage in the original * order. Each input is matched with a "ticket" which must be produced * when enqueueing to the next stage. * * A given stage must produce exactly K ("amplification factor", default K=1) * results for every input. This is enforced by requiring that each ticket * is used exactly K times. * * Usage: * * // arguments are queue sizes * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10); * * pipeline.blockingWrite(42); * * { * int val; * auto ticket = pipeline.blockingReadStage<0>(val); * pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val)); * } * * { * std::string val; * auto ticket = pipeline.blockingReadStage<1>(val); * int ival = 0; * try { * ival = folly::to<int>(val); * } catch (...) { * // We must produce exactly 1 output even on exception! * } * pipeline.blockingWriteStage<1>(ticket, ival); * } * * int result; * pipeline.blockingRead(result); * // result == 42 * * To specify amplification factors greater than 1, use * MPMCPipelineStage<T, amplification> instead of T in the declaration: * * MPMCPipeline<int, * MPMCPipelineStage<std::string, 2>, * MPMCPipelineStage<int, 4>> * * declares a two-stage pipeline: the first stage produces 2 strings * for each input int, the second stage produces 4 ints for each input string, * so, overall, the pipeline produces 2*4 = 8 ints for each input int. * * Implementation details: we use N+1 MPMCQueue objects; each intermediate * queue connects two adjacent stages. The MPMCQueue implementation is abused; * instead of using it as a queue, we insert in the output queue at the * position determined by the input queue's popTicket_. We guarantee that * all slots are filled (and therefore the queue doesn't freeze) because * we require that each step produces exactly K outputs for every input. */ template <class In, class... Stages> class MPMCPipeline { typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos; typedef std::tuple< detail::MPMCPipelineStageImpl<In>, detail::MPMCPipelineStageImpl< typename detail::PipelineStageInfo<Stages>::value_type>...> StageTuple; static constexpr size_t kAmplification = detail::AmplificationProduct<StageInfos>::value; class TicketBaseDebug { public: TicketBaseDebug() noexcept : owner_(nullptr), value_(0xdeadbeeffaceb00c) {} TicketBaseDebug(TicketBaseDebug&& other) noexcept : owner_(std::exchange(other.owner_, nullptr)), value_(std::exchange(other.value_, 0xdeadbeeffaceb00c)) {} explicit TicketBaseDebug(MPMCPipeline* owner, uint64_t value) noexcept : owner_(owner), value_(value) {} void check_owner(MPMCPipeline* owner) const { CHECK(owner == owner_); } MPMCPipeline* owner_; uint64_t value_; }; class TicketBaseNDebug { public: TicketBaseNDebug() = default; TicketBaseNDebug(TicketBaseNDebug&&) = default; explicit TicketBaseNDebug(MPMCPipeline*, uint64_t value) noexcept : value_(value) {} void check_owner(MPMCPipeline*) const {} uint64_t value_; }; using TicketBase = std::conditional_t<kIsDebug, TicketBaseDebug, TicketBaseNDebug>; public: /** * Ticket, returned by blockingReadStage, must be given back to * blockingWriteStage. Tickets are not thread-safe. */ template <size_t Stage> class Ticket : TicketBase { public: ~Ticket() noexcept { CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!"; } Ticket() noexcept : remainingUses_(0) {} Ticket(Ticket&& other) noexcept : TicketBase(static_cast<TicketBase&&>(other)), remainingUses_(std::exchange(other.remainingUses_, 0)) {} Ticket& operator=(Ticket&& other) noexcept { if (this != &other) { this->~Ticket(); new (this) Ticket(std::move(other)); } return *this; } private: friend class MPMCPipeline; size_t remainingUses_; Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept : TicketBase(owner, value * amplification), remainingUses_(amplification) {} uint64_t use(MPMCPipeline* owner) { CHECK_GT(remainingUses_--, 0); TicketBase::check_owner(owner); return TicketBase::value_++; } }; /** * Default-construct pipeline. Useful to move-assign later, * just like MPMCQueue, see MPMCQueue.h for more details. */ MPMCPipeline() = default; /** * Construct a pipeline with N+1 queue sizes. */ template <class... Sizes> explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) {} /** * Push an element into (the first stage of) the pipeline. Blocking. */ template <class... Args> void blockingWrite(Args&&... args) { std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...); } /** * Try to push an element into (the first stage of) the pipeline. * Non-blocking. */ template <class... Args> bool write(Args&&... args) { return std::get<0>(stages_).write(std::forward<Args>(args)...); } /** * Read an element for stage Stage and obtain a ticket. Blocking. */ template <size_t Stage> Ticket<Stage> blockingReadStage( typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) { return Ticket<Stage>( this, std::tuple_element<Stage, StageInfos>::type::kAmplification, std::get<Stage>(stages_).blockingRead(elem)); } /** * Try to read an element for stage Stage and obtain a ticket. * Non-blocking. */ template <size_t Stage> bool readStage( Ticket<Stage>& ticket, typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) { uint64_t tval; if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) { return false; } ticket = Ticket<Stage>( this, std::tuple_element<Stage, StageInfos>::type::kAmplification, tval); return true; } /** * Complete an element in stage Stage (pushing it for stage Stage+1). * Blocking. */ template <size_t Stage, class... Args> void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) { std::get<Stage + 1>(stages_).blockingWriteWithTicket( ticket.use(this), std::forward<Args>(args)...); } /** * Pop an element from (the final stage of) the pipeline. Blocking. */ void blockingRead(typename std::tuple_element<sizeof...(Stages), StageTuple>:: type::value_type& elem) { std::get<sizeof...(Stages)>(stages_).blockingRead(elem); } /** * Try to pop an element from (the final stage of) the pipeline. * Non-blocking. */ bool read(typename std::tuple_element<sizeof...(Stages), StageTuple>::type:: value_type& elem) { return std::get<sizeof...(Stages)>(stages_).read(elem); } /** * Estimate queue size, measured as values from the last stage. * (so if the pipeline has an amplification factor > 1, pushing an element * into the first stage will cause sizeGuess() to be == amplification factor) * Elements "in flight" (currently processed as part of a stage, so not * in any queue) are also counted. */ ssize_t sizeGuess() const noexcept { return ssize_t( std::get<0>(stages_).writeCount() * kAmplification - std::get<sizeof...(Stages)>(stages_).readCount()); } private: StageTuple stages_; }; } // namespace folly