/* * Copyright 2017-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include namespace folly { /// PriorityMPMCQueue is a thin wrapper on MPMCQueue, providing priorities /// by managing multiple underlying MPMCQueues. As of now, this does /// not implement a blocking interface. For the purposes of this /// class, lower number is higher priority template < typename T, template class Atom = std::atomic, bool Dynamic = false> class PriorityMPMCQueue { public: PriorityMPMCQueue(size_t numPriorities, size_t capacity) { CHECK_GT(numPriorities, 0); queues_.reserve(numPriorities); for (size_t i = 0; i < numPriorities; i++) { queues_.emplace_back(capacity); } } size_t getNumPriorities() { return queues_.size(); } // Add at medium priority by default bool write(T&& item) { return writeWithPriority(std::move(item), getNumPriorities() / 2); } bool writeWithPriority(T&& item, size_t priority) { size_t queue = std::min(getNumPriorities() - 1, priority); CHECK_LT(queue, queues_.size()); return queues_.at(queue).write(std::move(item)); } bool writeWithPriority( T&& item, size_t priority, std::chrono::milliseconds timeout) { size_t queue = std::min(getNumPriorities() - 1, priority); CHECK_LT(queue, queues_.size()); return queues_.at(queue).tryWriteUntil( std::chrono::steady_clock::now() + timeout, std::move(item)); } bool read(T& item) { for (auto& q : queues_) { if (q.readIfNotEmpty(item)) { return true; } } return false; } bool readWithPriority(T& item, size_t priority) { return queues_[priority].readIfNotEmpty(item); } size_t size() const { size_t total_size = 0; for (auto& q : queues_) { // MPMCQueue can have a negative size if there are pending readers. // Since we don't expose a blocking interface this shouldn't happen, // But just in case we put a floor at 0 total_size += std::max(0, q.size()); } return total_size; } size_t sizeGuess() const { size_t total_size = 0; for (auto& q : queues_) { // MPMCQueue can have a negative size if there are pending readers. // Since we don't expose a blocking interface this shouldn't happen, // But just in case we put a floor at 0 total_size += std::max(0, q.sizeGuess()); } return total_size; } /// Returns true if there are no items available for dequeue bool isEmpty() const { return size() == 0; } private: std::vector> queues_; }; } // namespace folly