/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef FOLLY_GEN_PARALLEL_H_ #error This file may only be included from folly/gen/Parallel.h #endif #include #include #include #include #include #include namespace folly { namespace gen { namespace detail { template class ClosableMPMCQueue { MPMCQueue queue_; std::atomic producers_{0}; std::atomic consumers_{0}; folly::EventCount wakeProducer_; folly::EventCount wakeConsumer_; public: explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {} ~ClosableMPMCQueue() { CHECK(!producers()); CHECK(!consumers()); } void openProducer() { ++producers_; } void openConsumer() { ++consumers_; } void closeInputProducer() { size_t producers = producers_--; CHECK(producers); if (producers == 1) { // last producer wakeConsumer_.notifyAll(); } } void closeOutputConsumer() { size_t consumers = consumers_--; CHECK(consumers); if (consumers == 1) { // last consumer wakeProducer_.notifyAll(); } } size_t producers() const { return producers_.load(std::memory_order_acquire); } size_t consumers() const { return consumers_.load(std::memory_order_acquire); } template bool writeUnlessFull(Args&&... args) noexcept { if (queue_.write(std::forward(args)...)) { // wake consumers to pick up new value wakeConsumer_.notify(); return true; } return false; } template bool writeUnlessClosed(Args&&... args) { // write if there's room if (!queue_.writeIfNotFull(std::forward(args)...)) { while (true) { auto key = wakeProducer_.prepareWait(); // if write fails, check if there are still consumers listening if (!consumers()) { // no consumers left; bail out wakeProducer_.cancelWait(); return false; } if (queue_.writeIfNotFull(std::forward(args)...)) { wakeProducer_.cancelWait(); break; } wakeProducer_.wait(key); } } // wake consumers to pick up new value wakeConsumer_.notify(); return true; } bool readUnlessEmpty(T& out) { if (queue_.read(out)) { // wake producers to fill empty space wakeProducer_.notify(); return true; } return false; } bool readUnlessClosed(T& out) { if (!queue_.readIfNotEmpty(out)) { while (true) { auto key = wakeConsumer_.prepareWait(); if (queue_.readIfNotEmpty(out)) { wakeConsumer_.cancelWait(); break; } if (!producers()) { wakeConsumer_.cancelWait(); // wake producers to fill empty space wakeProducer_.notify(); return false; } wakeConsumer_.wait(key); } } // wake writers blocked by full queue wakeProducer_.notify(); return true; } }; template class Sub : public Operator> { Sink sink_; public: explicit Sub(Sink sink) : sink_(sink) {} template < class Value, class Source, class Result = decltype(std::declval().compose(std::declval())), class Just = SingleCopy::type>> Just compose(const GenImpl& source) const { return Just(source | sink_); } }; template class Parallel : public Operator> { Ops ops_; size_t threads_; public: Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {} template < class Input, class Source, class InputDecayed = typename std::decay::type, class Composed = decltype(std::declval().compose(Empty())), class Output = typename Composed::ValueType, class OutputDecayed = typename std::decay::type> class Generator : public GenImpl< OutputDecayed&&, Generator< Input, Source, InputDecayed, Composed, Output, OutputDecayed>> { Source source_; Ops ops_; size_t threads_; using InQueue = ClosableMPMCQueue; using OutQueue = ClosableMPMCQueue; class Puller : public GenImpl { InQueue* queue_; public: explicit Puller(InQueue* queue) : queue_(queue) {} template bool apply(Handler&& handler) const { InputDecayed input; while (queue_->readUnlessClosed(input)) { if (!handler(std::move(input))) { return false; } } return true; } template void foreach(Body&& body) const { InputDecayed input; while (queue_->readUnlessClosed(input)) { body(std::move(input)); } } }; template class Pusher : public Operator> { OutQueue* queue_; public: explicit Pusher(OutQueue* queue) : queue_(queue) {} template void compose(const GenImpl& source) const { if (all) { source.self().foreach([&](Value value) { queue_->writeUnlessClosed(std::forward(value)); }); } else { source.self().apply([&](Value value) { return queue_->writeUnlessClosed(std::forward(value)); }); } } }; template class Executor { InQueue inQueue_; OutQueue outQueue_; Puller puller_; Pusher pusher_; std::vector workers_; const Ops* ops_; void work() { puller_ | *ops_ | pusher_; } public: Executor(size_t threads, const Ops* ops) : inQueue_(threads * 4), outQueue_(threads * 4), puller_(&inQueue_), pusher_(&outQueue_), ops_(ops) { inQueue_.openProducer(); outQueue_.openConsumer(); for (size_t t = 0; t < threads; ++t) { inQueue_.openConsumer(); outQueue_.openProducer(); workers_.emplace_back([this] { SCOPE_EXIT { inQueue_.closeOutputConsumer(); outQueue_.closeInputProducer(); }; this->work(); }); } } ~Executor() { if (inQueue_.producers()) { inQueue_.closeInputProducer(); } if (outQueue_.consumers()) { outQueue_.closeOutputConsumer(); } while (!workers_.empty()) { workers_.back().join(); workers_.pop_back(); } CHECK(!inQueue_.consumers()); CHECK(!outQueue_.producers()); } void closeInputProducer() { inQueue_.closeInputProducer(); } void closeOutputConsumer() { outQueue_.closeOutputConsumer(); } bool writeUnlessClosed(Input&& input) { return inQueue_.writeUnlessClosed(std::forward(input)); } bool writeUnlessFull(Input&& input) { return inQueue_.writeUnlessFull(std::forward(input)); } bool readUnlessClosed(OutputDecayed& output) { return outQueue_.readUnlessClosed(output); } bool readUnlessEmpty(OutputDecayed& output) { return outQueue_.readUnlessEmpty(output); } }; public: Generator(Source source, Ops ops, size_t threads) : source_(std::move(source)), ops_(std::move(ops)), threads_( threads ? threads : size_t(std::max(1, sysconf(_SC_NPROCESSORS_CONF)))) {} template bool apply(Handler&& handler) const { Executor executor(threads_, &ops_); bool more = true; source_.apply([&](Input input) { if (executor.writeUnlessFull(std::forward(input))) { return true; } OutputDecayed output; while (executor.readUnlessEmpty(output)) { if (!handler(std::move(output))) { more = false; return false; } } if (!executor.writeUnlessClosed(std::forward(input))) { return false; } return true; }); executor.closeInputProducer(); if (more) { OutputDecayed output; while (executor.readUnlessClosed(output)) { if (!handler(std::move(output))) { more = false; break; } } } executor.closeOutputConsumer(); return more; } template void foreach(Body&& body) const { Executor executor(threads_, &ops_); source_.foreach([&](Input input) { if (executor.writeUnlessFull(std::forward(input))) { return; } OutputDecayed output; while (executor.readUnlessEmpty(output)) { body(std::move(output)); } CHECK(executor.writeUnlessClosed(std::forward(input))); }); executor.closeInputProducer(); OutputDecayed output; while (executor.readUnlessClosed(output)) { body(std::move(output)); } executor.closeOutputConsumer(); } }; template Generator compose(const GenImpl& source) const { return Generator(source.self(), ops_, threads_); } template Generator compose(GenImpl&& source) const { return Generator(std::move(source.self()), ops_, threads_); } }; /** * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a * maximum chunk size. * * Usually used through the 'chunked' helper, like: * * int n * = chunked(values) * | parallel // each thread processes a chunk * | concat // but can still process values one at a time * | filter(isPrime) * | atomic_count; */ template class ChunkedRangeSource : public GenImpl&&, ChunkedRangeSource> { int chunkSize_; Range range_; public: ChunkedRangeSource() = default; ChunkedRangeSource(int chunkSize, Range range) : chunkSize_(chunkSize), range_(std::move(range)) {} template bool apply(Handler&& handler) const { auto remaining = range_; while (!remaining.empty()) { auto chunk = remaining.subpiece(0, chunkSize_); remaining.advance(chunk.size()); auto gen = RangeSource(chunk); if (!handler(std::move(gen))) { return false; } } return true; } }; } // namespace detail } // namespace gen } // namespace folly