918 lines
27 KiB
C++
918 lines
27 KiB
C++
/*
|
|
* Copyright (c) Facebook, Inc. and its affiliates.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <sys/types.h>
|
|
|
|
#include <algorithm>
|
|
#include <iterator>
|
|
#include <memory>
|
|
#include <stdexcept>
|
|
#include <utility>
|
|
|
|
#include <boost/intrusive/slist.hpp>
|
|
#include <folly/Exception.h>
|
|
#include <folly/FileUtil.h>
|
|
#include <folly/Likely.h>
|
|
#include <folly/ScopeGuard.h>
|
|
#include <folly/SpinLock.h>
|
|
#include <folly/io/async/DelayedDestruction.h>
|
|
#include <folly/io/async/EventBase.h>
|
|
#include <folly/io/async/EventHandler.h>
|
|
#include <folly/io/async/Request.h>
|
|
#include <folly/portability/Fcntl.h>
|
|
#include <folly/portability/Sockets.h>
|
|
#include <folly/portability/Unistd.h>
|
|
|
|
#include <glog/logging.h>
|
|
|
|
#if defined(__linux__) && !defined(__ANDROID__)
|
|
#define FOLLY_HAVE_EVENTFD
|
|
#include <folly/io/async/EventFDWrapper.h>
|
|
#endif
|
|
|
|
namespace folly {
|
|
|
|
/**
|
|
* A producer-consumer queue for passing messages between EventBase threads.
|
|
*
|
|
* Messages can be added to the queue from any thread. Multiple consumers may
|
|
* listen to the queue from multiple EventBase threads.
|
|
*
|
|
* A NotificationQueue may not be destroyed while there are still consumers
|
|
* registered to receive events from the queue. It is the user's
|
|
* responsibility to ensure that all consumers are unregistered before the
|
|
* queue is destroyed.
|
|
*
|
|
* MessageT should be MoveConstructible (i.e., must support either a move
|
|
* constructor or a copy constructor, or both). Ideally it's move constructor
|
|
* (or copy constructor if no move constructor is provided) should never throw
|
|
* exceptions. If the constructor may throw, the consumers could end up
|
|
* spinning trying to move a message off the queue and failing, and then
|
|
* retrying.
|
|
*/
|
|
template <typename MessageT>
|
|
class NotificationQueue {
|
|
struct Node : public boost::intrusive::slist_base_hook<
|
|
boost::intrusive::cache_last<true>> {
|
|
template <typename MessageTT>
|
|
Node(MessageTT&& msg, std::shared_ptr<RequestContext> ctx)
|
|
: msg_(std::forward<MessageTT>(msg)), ctx_(std::move(ctx)) {}
|
|
MessageT msg_;
|
|
std::shared_ptr<RequestContext> ctx_;
|
|
};
|
|
|
|
public:
|
|
/**
|
|
* A callback interface for consuming messages from the queue as they arrive.
|
|
*/
|
|
class Consumer : public DelayedDestruction, private EventHandler {
|
|
public:
|
|
enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
|
|
|
|
Consumer()
|
|
: queue_(nullptr),
|
|
destroyedFlagPtr_(nullptr),
|
|
maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
|
|
|
|
// create a consumer in-place, without the need to build new class
|
|
template <typename TCallback>
|
|
static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
|
|
TCallback&& callback);
|
|
|
|
/**
|
|
* messageAvailable() will be invoked whenever a new
|
|
* message is available from the pipe.
|
|
*/
|
|
virtual void messageAvailable(MessageT&& message) noexcept = 0;
|
|
|
|
/**
|
|
* Begin consuming messages from the specified queue.
|
|
*
|
|
* messageAvailable() will be called whenever a message is available. This
|
|
* consumer will continue to consume messages until stopConsuming() is
|
|
* called.
|
|
*
|
|
* A Consumer may only consume messages from a single NotificationQueue at
|
|
* a time. startConsuming() should not be called if this consumer is
|
|
* already consuming.
|
|
*/
|
|
void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
|
|
init(eventBase, queue);
|
|
registerHandler(READ | PERSIST);
|
|
}
|
|
|
|
/**
|
|
* Same as above but registers this event handler as internal so that it
|
|
* doesn't count towards the pending reader count for the IOLoop.
|
|
*/
|
|
void startConsumingInternal(
|
|
EventBase* eventBase,
|
|
NotificationQueue* queue) {
|
|
init(eventBase, queue);
|
|
registerInternalHandler(READ | PERSIST);
|
|
}
|
|
|
|
/**
|
|
* Stop consuming messages.
|
|
*
|
|
* startConsuming() may be called again to resume consumption of messages
|
|
* at a later point in time.
|
|
*/
|
|
void stopConsuming();
|
|
|
|
/**
|
|
* Consume messages off the queue until it is empty. No messages may be
|
|
* added to the queue while it is draining, so that the process is bounded.
|
|
* To that end, putMessage/tryPutMessage will throw an std::runtime_error,
|
|
* and tryPutMessageNoThrow will return false.
|
|
*
|
|
* @returns true if the queue was drained, false otherwise. In practice,
|
|
* this will only fail if someone else is already draining the queue.
|
|
*/
|
|
bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
|
|
|
|
/**
|
|
* Get the NotificationQueue that this consumer is currently consuming
|
|
* messages from. Returns nullptr if the consumer is not currently
|
|
* consuming events from any queue.
|
|
*/
|
|
NotificationQueue* getCurrentQueue() const {
|
|
return queue_;
|
|
}
|
|
|
|
/**
|
|
* Set a limit on how many messages this consumer will read each iteration
|
|
* around the event loop.
|
|
*
|
|
* This helps rate-limit how much work the Consumer will do each event loop
|
|
* iteration, to prevent it from starving other event handlers.
|
|
*
|
|
* A limit of 0 means no limit will be enforced. If unset, the limit
|
|
* defaults to kDefaultMaxReadAtOnce (defined to 10 above).
|
|
*/
|
|
void setMaxReadAtOnce(uint32_t maxAtOnce) {
|
|
maxReadAtOnce_ = maxAtOnce;
|
|
}
|
|
uint32_t getMaxReadAtOnce() const {
|
|
return maxReadAtOnce_;
|
|
}
|
|
|
|
EventBase* getEventBase() {
|
|
return base_;
|
|
}
|
|
|
|
void handlerReady(uint16_t events) noexcept override;
|
|
|
|
protected:
|
|
void destroy() override;
|
|
|
|
~Consumer() override {}
|
|
|
|
private:
|
|
/**
|
|
* Consume messages off the the queue until
|
|
* - the queue is empty (1), or
|
|
* - until the consumer is destroyed, or
|
|
* - until the consumer is uninstalled, or
|
|
* - an exception is thrown in the course of dequeueing, or
|
|
* - unless isDrain is true, until the maxReadAtOnce_ limit is hit
|
|
*
|
|
* (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
|
|
*/
|
|
void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
|
|
|
|
void setActive(bool active, bool shouldLock = false) {
|
|
if (!queue_) {
|
|
active_ = active;
|
|
return;
|
|
}
|
|
if (shouldLock) {
|
|
queue_->spinlock_.lock();
|
|
}
|
|
if (!active_ && active) {
|
|
++queue_->numActiveConsumers_;
|
|
} else if (active_ && !active) {
|
|
--queue_->numActiveConsumers_;
|
|
}
|
|
active_ = active;
|
|
if (shouldLock) {
|
|
queue_->spinlock_.unlock();
|
|
}
|
|
}
|
|
void init(EventBase* eventBase, NotificationQueue* queue);
|
|
|
|
NotificationQueue* queue_;
|
|
bool* destroyedFlagPtr_;
|
|
uint32_t maxReadAtOnce_;
|
|
EventBase* base_;
|
|
bool active_{false};
|
|
};
|
|
|
|
class SimpleConsumer {
|
|
public:
|
|
explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
|
|
++queue_.numConsumers_;
|
|
}
|
|
|
|
~SimpleConsumer() {
|
|
--queue_.numConsumers_;
|
|
}
|
|
|
|
int getFd() const {
|
|
return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
|
|
}
|
|
|
|
template <typename F>
|
|
void consumeUntilDrained(F&& foreach);
|
|
|
|
private:
|
|
NotificationQueue& queue_;
|
|
};
|
|
|
|
enum class FdType {
|
|
PIPE,
|
|
#ifdef FOLLY_HAVE_EVENTFD
|
|
EVENTFD,
|
|
#endif
|
|
};
|
|
|
|
/**
|
|
* Create a new NotificationQueue.
|
|
*
|
|
* If the maxSize parameter is specified, this sets the maximum queue size
|
|
* that will be enforced by tryPutMessage(). (This size is advisory, and may
|
|
* be exceeded if producers explicitly use putMessage() instead of
|
|
* tryPutMessage().)
|
|
*
|
|
* The fdType parameter determines the type of file descriptor used
|
|
* internally to signal message availability. The default (eventfd) is
|
|
* preferable for performance and because it won't fail when the queue gets
|
|
* too long. It is not available on on older and non-linux kernels, however.
|
|
* In this case the code will fall back to using a pipe, the parameter is
|
|
* mostly for testing purposes.
|
|
*/
|
|
explicit NotificationQueue(
|
|
uint32_t maxSize = 0,
|
|
#ifdef FOLLY_HAVE_EVENTFD
|
|
FdType fdType = FdType::EVENTFD)
|
|
#else
|
|
FdType fdType = FdType::PIPE)
|
|
#endif
|
|
: eventfd_(-1),
|
|
pipeFds_{-1, -1},
|
|
advisoryMaxQueueSize_(maxSize),
|
|
pid_(pid_t(getpid())) {
|
|
|
|
#ifdef FOLLY_HAVE_EVENTFD
|
|
if (fdType == FdType::EVENTFD) {
|
|
eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
|
|
if (eventfd_ == -1) {
|
|
if (errno == ENOSYS || errno == EINVAL) {
|
|
// eventfd not availalble
|
|
LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
|
|
<< errno << ", falling back to pipe mode (is your kernel "
|
|
<< "> 2.6.30?)";
|
|
fdType = FdType::PIPE;
|
|
} else {
|
|
// some other error
|
|
folly::throwSystemError(
|
|
"Failed to create eventfd for "
|
|
"NotificationQueue",
|
|
errno);
|
|
}
|
|
}
|
|
}
|
|
#endif
|
|
if (fdType == FdType::PIPE) {
|
|
if (pipe(pipeFds_)) {
|
|
folly::throwSystemError(
|
|
"Failed to create pipe for NotificationQueue", errno);
|
|
}
|
|
try {
|
|
// put both ends of the pipe into non-blocking mode
|
|
if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
|
|
folly::throwSystemError(
|
|
"failed to put NotificationQueue pipe read "
|
|
"endpoint into non-blocking mode",
|
|
errno);
|
|
}
|
|
if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
|
|
folly::throwSystemError(
|
|
"failed to put NotificationQueue pipe write "
|
|
"endpoint into non-blocking mode",
|
|
errno);
|
|
}
|
|
} catch (...) {
|
|
::close(pipeFds_[0]);
|
|
::close(pipeFds_[1]);
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
|
|
~NotificationQueue() {
|
|
std::unique_ptr<Node> data;
|
|
while (!queue_.empty()) {
|
|
data.reset(&queue_.front());
|
|
queue_.pop_front();
|
|
}
|
|
if (eventfd_ >= 0) {
|
|
::close(eventfd_);
|
|
eventfd_ = -1;
|
|
}
|
|
if (pipeFds_[0] >= 0) {
|
|
::close(pipeFds_[0]);
|
|
pipeFds_[0] = -1;
|
|
}
|
|
if (pipeFds_[1] >= 0) {
|
|
::close(pipeFds_[1]);
|
|
pipeFds_[1] = -1;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Set the advisory maximum queue size.
|
|
*
|
|
* This maximum queue size affects calls to tryPutMessage(). Message
|
|
* producers can still use the putMessage() call to unconditionally put a
|
|
* message on the queue, ignoring the configured maximum queue size. This
|
|
* can cause the queue size to exceed the configured maximum.
|
|
*/
|
|
void setMaxQueueSize(uint32_t max) {
|
|
advisoryMaxQueueSize_ = max;
|
|
}
|
|
|
|
/**
|
|
* Attempt to put a message on the queue if the queue is not already full.
|
|
*
|
|
* If the queue is full, a std::overflow_error will be thrown. The
|
|
* setMaxQueueSize() function controls the maximum queue size.
|
|
*
|
|
* If the queue is currently draining, an std::runtime_error will be thrown.
|
|
*
|
|
* This method may contend briefly on a spinlock if many threads are
|
|
* concurrently accessing the queue, but for all intents and purposes it will
|
|
* immediately place the message on the queue and return.
|
|
*
|
|
* tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
|
|
* may throw any other exception thrown by the MessageT move/copy
|
|
* constructor.
|
|
*/
|
|
template <typename MessageTT>
|
|
void tryPutMessage(MessageTT&& message) {
|
|
putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
|
|
}
|
|
|
|
/**
|
|
* No-throw versions of the above. Instead returns true on success, false on
|
|
* failure.
|
|
*
|
|
* Only std::overflow_error (the common exception case) and std::runtime_error
|
|
* (which indicates that the queue is being drained) are prevented from being
|
|
* thrown. User code must still catch std::bad_alloc errors.
|
|
*/
|
|
template <typename MessageTT>
|
|
bool tryPutMessageNoThrow(MessageTT&& message) {
|
|
return putMessageImpl(
|
|
std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
|
|
}
|
|
|
|
/**
|
|
* Unconditionally put a message on the queue.
|
|
*
|
|
* This method is like tryPutMessage(), but ignores the maximum queue size
|
|
* and always puts the message on the queue, even if the maximum queue size
|
|
* would be exceeded.
|
|
*
|
|
* putMessage() may throw
|
|
* - std::bad_alloc if memory allocation fails, and may
|
|
* - std::runtime_error if the queue is currently draining
|
|
* - any other exception thrown by the MessageT move/copy constructor.
|
|
*/
|
|
template <typename MessageTT>
|
|
void putMessage(MessageTT&& message) {
|
|
putMessageImpl(std::forward<MessageTT>(message), 0);
|
|
}
|
|
|
|
/**
|
|
* Put several messages on the queue.
|
|
*/
|
|
template <typename InputIteratorT>
|
|
void putMessages(InputIteratorT first, InputIteratorT last) {
|
|
typedef typename std::iterator_traits<InputIteratorT>::iterator_category
|
|
IterCategory;
|
|
putMessagesImpl(first, last, IterCategory());
|
|
}
|
|
|
|
/**
|
|
* Try to immediately pull a message off of the queue, without blocking.
|
|
*
|
|
* If a message is immediately available, the result parameter will be
|
|
* updated to contain the message contents and true will be returned.
|
|
*
|
|
* If no message is available, false will be returned and result will be left
|
|
* unmodified.
|
|
*/
|
|
bool tryConsume(MessageT& result) {
|
|
SCOPE_EXIT {
|
|
syncSignalAndQueue();
|
|
};
|
|
|
|
checkPid();
|
|
std::unique_ptr<Node> data;
|
|
|
|
{
|
|
folly::SpinLockGuard g(spinlock_);
|
|
|
|
if (UNLIKELY(queue_.empty())) {
|
|
return false;
|
|
}
|
|
|
|
data.reset(&queue_.front());
|
|
queue_.pop_front();
|
|
}
|
|
|
|
result = std::move(data->msg_);
|
|
RequestContext::setContext(std::move(data->ctx_));
|
|
|
|
return true;
|
|
}
|
|
|
|
size_t size() const {
|
|
folly::SpinLockGuard g(spinlock_);
|
|
return queue_.size();
|
|
}
|
|
|
|
/**
|
|
* Check that the NotificationQueue is being used from the correct process.
|
|
*
|
|
* If you create a NotificationQueue in one process, then fork, and try to
|
|
* send messages to the queue from the child process, you're going to have a
|
|
* bad time. Unfortunately users have (accidentally) run into this.
|
|
*
|
|
* Because we use an eventfd/pipe, the child process can actually signal the
|
|
* parent process that an event is ready. However, it can't put anything on
|
|
* the parent's queue, so the parent wakes up and finds an empty queue. This
|
|
* check ensures that we catch the problem in the misbehaving child process
|
|
* code, and crash before signalling the parent process.
|
|
*/
|
|
void checkPid() const {
|
|
CHECK_EQ(pid_, pid_t(getpid()));
|
|
}
|
|
|
|
private:
|
|
// Forbidden copy constructor and assignment operator
|
|
NotificationQueue(NotificationQueue const&) = delete;
|
|
NotificationQueue& operator=(NotificationQueue const&) = delete;
|
|
|
|
inline bool checkQueueSize(size_t maxSize, bool throws = true) const {
|
|
DCHECK(0 == spinlock_.try_lock());
|
|
if (maxSize > 0 && queue_.size() >= maxSize) {
|
|
if (throws) {
|
|
throw std::overflow_error(
|
|
"unable to add message to NotificationQueue: "
|
|
"queue is full");
|
|
}
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
inline bool checkDraining(bool throws = true) {
|
|
if (UNLIKELY(draining_ && throws)) {
|
|
throw std::runtime_error("queue is draining, cannot add message");
|
|
}
|
|
return draining_;
|
|
}
|
|
|
|
void ensureSignalLocked() const {
|
|
// semantics: empty fd == empty queue <=> !signal_
|
|
if (signal_) {
|
|
return;
|
|
}
|
|
|
|
ssize_t bytes_written = 0;
|
|
size_t bytes_expected = 0;
|
|
|
|
do {
|
|
if (eventfd_ >= 0) {
|
|
// eventfd(2) dictates that we must write a 64-bit integer
|
|
uint64_t signal = 1;
|
|
bytes_expected = sizeof(signal);
|
|
bytes_written = ::write(eventfd_, &signal, bytes_expected);
|
|
} else {
|
|
uint8_t signal = 1;
|
|
bytes_expected = sizeof(signal);
|
|
bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
|
|
}
|
|
} while (bytes_written == -1 && errno == EINTR);
|
|
|
|
if (bytes_written == ssize_t(bytes_expected)) {
|
|
signal_ = true;
|
|
} else {
|
|
folly::throwSystemError(
|
|
"failed to signal NotificationQueue after "
|
|
"write",
|
|
errno);
|
|
}
|
|
}
|
|
|
|
void drainSignalsLocked() {
|
|
ssize_t bytes_read = 0;
|
|
if (eventfd_ > 0) {
|
|
uint64_t message;
|
|
bytes_read = readNoInt(eventfd_, &message, sizeof(message));
|
|
CHECK(bytes_read != -1 || errno == EAGAIN);
|
|
} else {
|
|
// There should only be one byte in the pipe. To avoid potential leaks we
|
|
// still drain.
|
|
uint8_t message[32];
|
|
ssize_t result;
|
|
while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) !=
|
|
-1) {
|
|
bytes_read += result;
|
|
}
|
|
CHECK(result == -1 && errno == EAGAIN);
|
|
LOG_IF(ERROR, bytes_read > 1)
|
|
<< "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
|
|
<< bytes_read << " bytes, expected <= 1";
|
|
}
|
|
LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
|
|
<< "[NotificationQueue] Unexpected state while draining signals: signal_="
|
|
<< signal_ << " bytes_read=" << bytes_read;
|
|
|
|
signal_ = false;
|
|
}
|
|
|
|
void ensureSignal() const {
|
|
folly::SpinLockGuard g(spinlock_);
|
|
ensureSignalLocked();
|
|
}
|
|
|
|
void syncSignalAndQueue() {
|
|
folly::SpinLockGuard g(spinlock_);
|
|
|
|
if (queue_.empty()) {
|
|
drainSignalsLocked();
|
|
} else {
|
|
ensureSignalLocked();
|
|
}
|
|
}
|
|
|
|
template <typename MessageTT>
|
|
bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
|
|
checkPid();
|
|
bool signal = false;
|
|
{
|
|
auto data = std::make_unique<Node>(
|
|
std::forward<MessageTT>(message), RequestContext::saveContext());
|
|
folly::SpinLockGuard g(spinlock_);
|
|
if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
|
|
return false;
|
|
}
|
|
// We only need to signal an event if not all consumers are
|
|
// awake.
|
|
if (numActiveConsumers_ < numConsumers_) {
|
|
signal = true;
|
|
}
|
|
queue_.push_back(*data.release());
|
|
if (signal) {
|
|
ensureSignalLocked();
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
template <typename InputIteratorT>
|
|
void putMessagesImpl(
|
|
InputIteratorT first,
|
|
InputIteratorT last,
|
|
std::input_iterator_tag) {
|
|
checkPid();
|
|
bool signal = false;
|
|
boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q;
|
|
try {
|
|
while (first != last) {
|
|
auto data = std::make_unique<Node>(
|
|
std::move(*first), RequestContext::saveContext());
|
|
q.push_back(*data.release());
|
|
++first;
|
|
}
|
|
folly::SpinLockGuard g(spinlock_);
|
|
checkDraining();
|
|
queue_.splice(queue_.end(), q);
|
|
if (numActiveConsumers_ < numConsumers_) {
|
|
signal = true;
|
|
}
|
|
if (signal) {
|
|
ensureSignalLocked();
|
|
}
|
|
} catch (...) {
|
|
std::unique_ptr<Node> data;
|
|
while (!q.empty()) {
|
|
data.reset(&q.front());
|
|
q.pop_front();
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
|
|
mutable folly::SpinLock spinlock_;
|
|
mutable bool signal_{false};
|
|
int eventfd_;
|
|
int pipeFds_[2]; // to fallback to on older/non-linux systems
|
|
uint32_t advisoryMaxQueueSize_;
|
|
pid_t pid_;
|
|
boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> queue_;
|
|
int numConsumers_{0};
|
|
std::atomic<int> numActiveConsumers_{0};
|
|
bool draining_{false};
|
|
};
|
|
|
|
template <typename MessageT>
|
|
void NotificationQueue<MessageT>::Consumer::destroy() {
|
|
// If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
|
|
// will be non-nullptr. Mark the value that it points to, so that
|
|
// handlerReady() will know the callback is destroyed, and that it cannot
|
|
// access any member variables anymore.
|
|
if (destroyedFlagPtr_) {
|
|
*destroyedFlagPtr_ = true;
|
|
}
|
|
stopConsuming();
|
|
DelayedDestruction::destroy();
|
|
}
|
|
|
|
template <typename MessageT>
|
|
void NotificationQueue<MessageT>::Consumer::handlerReady(
|
|
uint16_t /*events*/) noexcept {
|
|
consumeMessages(false);
|
|
}
|
|
|
|
template <typename MessageT>
|
|
void NotificationQueue<MessageT>::Consumer::consumeMessages(
|
|
bool isDrain,
|
|
size_t* numConsumed) noexcept {
|
|
DestructorGuard dg(this);
|
|
uint32_t numProcessed = 0;
|
|
setActive(true);
|
|
SCOPE_EXIT {
|
|
if (queue_) {
|
|
queue_->syncSignalAndQueue();
|
|
}
|
|
};
|
|
SCOPE_EXIT {
|
|
setActive(false, /* shouldLock = */ true);
|
|
};
|
|
SCOPE_EXIT {
|
|
if (numConsumed != nullptr) {
|
|
*numConsumed = numProcessed;
|
|
}
|
|
};
|
|
while (true) {
|
|
// Now pop the message off of the queue.
|
|
//
|
|
// We have to manually acquire and release the spinlock here, rather than
|
|
// using SpinLockHolder since the MessageT has to be constructed while
|
|
// holding the spinlock and available after we release it. SpinLockHolder
|
|
// unfortunately doesn't provide a release() method. (We can't construct
|
|
// MessageT first since we have no guarantee that MessageT has a default
|
|
// constructor.
|
|
queue_->spinlock_.lock();
|
|
bool locked = true;
|
|
|
|
try {
|
|
if (UNLIKELY(queue_->queue_.empty())) {
|
|
// If there is no message, we've reached the end of the queue, return.
|
|
setActive(false);
|
|
queue_->spinlock_.unlock();
|
|
return;
|
|
}
|
|
|
|
// Pull a message off the queue.
|
|
std::unique_ptr<Node> data;
|
|
data.reset(&queue_->queue_.front());
|
|
queue_->queue_.pop_front();
|
|
|
|
// Check to see if the queue is empty now.
|
|
// We use this as an optimization to see if we should bother trying to
|
|
// loop again and read another message after invoking this callback.
|
|
bool wasEmpty = queue_->queue_.empty();
|
|
if (wasEmpty) {
|
|
setActive(false);
|
|
}
|
|
|
|
// Now unlock the spinlock before we invoke the callback.
|
|
queue_->spinlock_.unlock();
|
|
RequestContextScopeGuard rctx(std::move(data->ctx_));
|
|
|
|
locked = false;
|
|
|
|
// Call the callback
|
|
bool callbackDestroyed = false;
|
|
CHECK(destroyedFlagPtr_ == nullptr);
|
|
destroyedFlagPtr_ = &callbackDestroyed;
|
|
messageAvailable(std::move(data->msg_));
|
|
destroyedFlagPtr_ = nullptr;
|
|
|
|
// If the callback was destroyed before it returned, we are done
|
|
if (callbackDestroyed) {
|
|
return;
|
|
}
|
|
|
|
// If the callback is no longer installed, we are done.
|
|
if (queue_ == nullptr) {
|
|
return;
|
|
}
|
|
|
|
// If we have hit maxReadAtOnce_, we are done.
|
|
++numProcessed;
|
|
if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
|
|
return;
|
|
}
|
|
|
|
// If the queue was empty before we invoked the callback, it's probable
|
|
// that it is still empty now. Just go ahead and return, rather than
|
|
// looping again and trying to re-read from the eventfd. (If a new
|
|
// message had in fact arrived while we were invoking the callback, we
|
|
// will simply be woken up the next time around the event loop and will
|
|
// process the message then.)
|
|
if (wasEmpty) {
|
|
return;
|
|
}
|
|
} catch (const std::exception&) {
|
|
// This catch block is really just to handle the case where the MessageT
|
|
// constructor throws. The messageAvailable() callback itself is
|
|
// declared as noexcept and should never throw.
|
|
//
|
|
// If the MessageT constructor does throw we try to handle it as best as
|
|
// we can, but we can't work miracles. We will just ignore the error for
|
|
// now and return. The next time around the event loop we will end up
|
|
// trying to read the message again. If MessageT continues to throw we
|
|
// will never make forward progress and will keep trying each time around
|
|
// the event loop.
|
|
if (locked) {
|
|
// Unlock the spinlock.
|
|
queue_->spinlock_.unlock();
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
template <typename MessageT>
|
|
void NotificationQueue<MessageT>::Consumer::init(
|
|
EventBase* eventBase,
|
|
NotificationQueue* queue) {
|
|
eventBase->dcheckIsInEventBaseThread();
|
|
assert(queue_ == nullptr);
|
|
assert(!isHandlerRegistered());
|
|
queue->checkPid();
|
|
|
|
base_ = eventBase;
|
|
|
|
queue_ = queue;
|
|
|
|
{
|
|
folly::SpinLockGuard g(queue_->spinlock_);
|
|
queue_->numConsumers_++;
|
|
}
|
|
queue_->ensureSignal();
|
|
|
|
if (queue_->eventfd_ >= 0) {
|
|
initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->eventfd_));
|
|
} else {
|
|
initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->pipeFds_[0]));
|
|
}
|
|
}
|
|
|
|
template <typename MessageT>
|
|
void NotificationQueue<MessageT>::Consumer::stopConsuming() {
|
|
if (queue_ == nullptr) {
|
|
assert(!isHandlerRegistered());
|
|
return;
|
|
}
|
|
|
|
{
|
|
folly::SpinLockGuard g(queue_->spinlock_);
|
|
queue_->numConsumers_--;
|
|
setActive(false);
|
|
}
|
|
|
|
assert(isHandlerRegistered());
|
|
unregisterHandler();
|
|
detachEventBase();
|
|
queue_ = nullptr;
|
|
}
|
|
|
|
template <typename MessageT>
|
|
bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
|
|
size_t* numConsumed) noexcept {
|
|
DestructorGuard dg(this);
|
|
{
|
|
folly::SpinLockGuard g(queue_->spinlock_);
|
|
if (queue_->draining_) {
|
|
return false;
|
|
}
|
|
queue_->draining_ = true;
|
|
}
|
|
consumeMessages(true, numConsumed);
|
|
{
|
|
folly::SpinLockGuard g(queue_->spinlock_);
|
|
queue_->draining_ = false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
template <typename MessageT>
|
|
template <typename F>
|
|
void NotificationQueue<MessageT>::SimpleConsumer::consumeUntilDrained(
|
|
F&& foreach) {
|
|
SCOPE_EXIT {
|
|
queue_.syncSignalAndQueue();
|
|
};
|
|
|
|
queue_.checkPid();
|
|
|
|
while (true) {
|
|
std::unique_ptr<Node> data;
|
|
{
|
|
folly::SpinLockGuard g(queue_.spinlock_);
|
|
|
|
if (UNLIKELY(queue_.queue_.empty())) {
|
|
return;
|
|
}
|
|
|
|
data.reset(&queue_.queue_.front());
|
|
queue_.queue_.pop_front();
|
|
}
|
|
|
|
RequestContextScopeGuard rctx(std::move(data->ctx_));
|
|
foreach(std::move(data->msg_));
|
|
// Make sure message destructor is called with the correct RequestContext.
|
|
data.reset();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates a NotificationQueue::Consumer wrapping a function object
|
|
* Modeled after AsyncTimeout::make
|
|
*
|
|
*/
|
|
|
|
namespace detail {
|
|
|
|
template <typename MessageT, typename TCallback>
|
|
struct notification_queue_consumer_wrapper
|
|
: public NotificationQueue<MessageT>::Consumer {
|
|
template <typename UCallback>
|
|
explicit notification_queue_consumer_wrapper(UCallback&& callback)
|
|
: callback_(std::forward<UCallback>(callback)) {}
|
|
|
|
// we are being stricter here and requiring noexcept for callback
|
|
void messageAvailable(MessageT&& message) noexcept override {
|
|
static_assert(
|
|
noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
|
|
"callback must be declared noexcept, e.g.: `[]() noexcept {}`");
|
|
|
|
callback_(std::forward<MessageT>(message));
|
|
}
|
|
|
|
private:
|
|
TCallback callback_;
|
|
};
|
|
|
|
} // namespace detail
|
|
|
|
template <typename MessageT>
|
|
template <typename TCallback>
|
|
std::unique_ptr<
|
|
typename NotificationQueue<MessageT>::Consumer,
|
|
DelayedDestruction::Destructor>
|
|
NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
|
|
return std::unique_ptr<
|
|
NotificationQueue<MessageT>::Consumer,
|
|
DelayedDestruction::Destructor>(
|
|
new detail::notification_queue_consumer_wrapper<
|
|
MessageT,
|
|
typename std::decay<TCallback>::type>(
|
|
std::forward<TCallback>(callback)));
|
|
}
|
|
|
|
} // namespace folly
|