Rocket.Chat.ReactNative/ios/Pods/Folly/folly/TokenBucket.h

379 lines
12 KiB
C++

/*
* Copyright 2015-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <atomic>
#include <chrono>
#include <folly/Likely.h>
#include <folly/concurrency/CacheLocality.h>
namespace folly {
/**
* Thread-safe (atomic) token bucket implementation.
*
* A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
* of events with an average rate and some amount of burstiness. The canonical
* example is a packet switched network: the network can accept some number of
* bytes per second and the bytes come in finite packets (bursts). A token
* bucket stores up to a fixed number of tokens (the burst size). Some number
* of tokens are removed when an event occurs. The tokens are replenished at a
* fixed rate.
*
* This implementation records the last time it was updated. This allows the
* token bucket to add tokens "just in time" when tokens are requested.
*
* The "dynamic" base variant allows the token generation rate and maximum
* burst size to change with every token consumption.
*
* @tparam Clock Clock type, must be steady i.e. monotonic.
*/
template <typename Clock = std::chrono::steady_clock>
class BasicDynamicTokenBucket {
static_assert(Clock::is_steady, "clock must be steady");
public:
/**
* Constructor.
*
* @param zeroTime Initial time at which to consider the token bucket
* starting to fill. Defaults to 0, so by default token
* buckets are "full" after construction.
*/
explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept
: zeroTime_(zeroTime) {}
/**
* Copy constructor.
*
* Thread-safe. (Copy constructors of derived classes may not be thread-safe
* however.)
*/
BasicDynamicTokenBucket(const BasicDynamicTokenBucket& other) noexcept
: zeroTime_(other.zeroTime_.load()) {}
/**
* Copy-assignment operator.
*
* Warning: not thread safe for the object being assigned to (including
* self-assignment). Thread-safe for the other object.
*/
BasicDynamicTokenBucket& operator=(
const BasicDynamicTokenBucket& other) noexcept {
zeroTime_ = other.zeroTime_.load();
return *this;
}
/**
* Re-initialize token bucket.
*
* Thread-safe.
*
* @param zeroTime Initial time at which to consider the token bucket
* starting to fill. Defaults to 0, so by default token
* bucket is reset to "full".
*/
void reset(double zeroTime = 0) noexcept {
zeroTime_ = zeroTime;
}
/**
* Returns the current time in seconds since Epoch.
*/
static double defaultClockNow() noexcept {
using dur = std::chrono::duration<double>;
auto const now = Clock::now().time_since_epoch();
return std::chrono::duration_cast<dur>(now).count();
}
/**
* Attempts to consume some number of tokens. Tokens are first added to the
* bucket based on the time elapsed since the last attempt to consume tokens.
* Note: Attempts to consume more tokens than the burst size will always
* fail.
*
* Thread-safe.
*
* @param toConsume The number of tokens to consume.
* @param rate Number of tokens to generate per second.
* @param burstSize Maximum burst size. Must be greater than 0.
* @param nowInSeconds Current time in seconds. Should be monotonically
* increasing from the nowInSeconds specified in
* this token bucket's constructor.
* @return True if the rate limit check passed, false otherwise.
*/
bool consume(
double toConsume,
double rate,
double burstSize,
double nowInSeconds = defaultClockNow()) {
assert(rate > 0);
assert(burstSize > 0);
return consumeImpl(
rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
if (tokens < toConsume) {
return false;
}
tokens -= toConsume;
return true;
});
}
/**
* Similar to consume, but always consumes some number of tokens. If the
* bucket contains enough tokens - consumes toConsume tokens. Otherwise the
* bucket is drained.
*
* Thread-safe.
*
* @param toConsume The number of tokens to consume.
* @param rate Number of tokens to generate per second.
* @param burstSize Maximum burst size. Must be greater than 0.
* @param nowInSeconds Current time in seconds. Should be monotonically
* increasing from the nowInSeconds specified in
* this token bucket's constructor.
* @return number of tokens that were consumed.
*/
double consumeOrDrain(
double toConsume,
double rate,
double burstSize,
double nowInSeconds = defaultClockNow()) {
assert(rate > 0);
assert(burstSize > 0);
double consumed;
consumeImpl(
rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
if (tokens < toConsume) {
consumed = tokens;
tokens = 0.0;
} else {
consumed = toConsume;
tokens -= toConsume;
}
return true;
});
return consumed;
}
/**
* Returns the number of tokens currently available.
*
* Thread-safe (but returned value may immediately be outdated).
*/
double available(
double rate,
double burstSize,
double nowInSeconds = defaultClockNow()) const noexcept {
assert(rate > 0);
assert(burstSize > 0);
return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize);
}
private:
template <typename TCallback>
bool consumeImpl(
double rate,
double burstSize,
double nowInSeconds,
const TCallback& callback) {
auto zeroTimeOld = zeroTime_.load();
double zeroTimeNew;
do {
auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
if (!callback(tokens)) {
return false;
}
zeroTimeNew = nowInSeconds - tokens / rate;
} while (
UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
return true;
}
alignas(hardware_destructive_interference_size) std::atomic<double> zeroTime_;
};
/**
* Specialization of BasicDynamicTokenBucket with a fixed token
* generation rate and a fixed maximum burst size.
*/
template <typename Clock = std::chrono::steady_clock>
class BasicTokenBucket {
static_assert(Clock::is_steady, "clock must be steady");
private:
using Impl = BasicDynamicTokenBucket<Clock>;
public:
/**
* Construct a token bucket with a specific maximum rate and burst size.
*
* @param genRate Number of tokens to generate per second.
* @param burstSize Maximum burst size. Must be greater than 0.
* @param zeroTime Initial time at which to consider the token bucket
* starting to fill. Defaults to 0, so by default token
* bucket is "full" after construction.
*/
BasicTokenBucket(
double genRate,
double burstSize,
double zeroTime = 0) noexcept
: tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
assert(rate_ > 0);
assert(burstSize_ > 0);
}
/**
* Copy constructor.
*
* Warning: not thread safe!
*/
BasicTokenBucket(const BasicTokenBucket& other) noexcept = default;
/**
* Copy-assignment operator.
*
* Warning: not thread safe!
*/
BasicTokenBucket& operator=(const BasicTokenBucket& other) noexcept = default;
/**
* Returns the current time in seconds since Epoch.
*/
static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
return Impl::defaultClockNow();
}
/**
* Change rate and burst size.
*
* Warning: not thread safe!
*
* @param genRate Number of tokens to generate per second.
* @param burstSize Maximum burst size. Must be greater than 0.
* @param nowInSeconds Current time in seconds. Should be monotonically
* increasing from the nowInSeconds specified in
* this token bucket's constructor.
*/
void reset(
double genRate,
double burstSize,
double nowInSeconds = defaultClockNow()) noexcept {
assert(genRate > 0);
assert(burstSize > 0);
const double availTokens = available(nowInSeconds);
rate_ = genRate;
burstSize_ = burstSize;
setCapacity(availTokens, nowInSeconds);
}
/**
* Change number of tokens in bucket.
*
* Warning: not thread safe!
*
* @param tokens Desired number of tokens in bucket after the call.
* @param nowInSeconds Current time in seconds. Should be monotonically
* increasing from the nowInSeconds specified in
* this token bucket's constructor.
*/
void setCapacity(double tokens, double nowInSeconds) noexcept {
tokenBucket_.reset(nowInSeconds - tokens / rate_);
}
/**
* Attempts to consume some number of tokens. Tokens are first added to the
* bucket based on the time elapsed since the last attempt to consume tokens.
* Note: Attempts to consume more tokens than the burst size will always
* fail.
*
* Thread-safe.
*
* @param toConsume The number of tokens to consume.
* @param nowInSeconds Current time in seconds. Should be monotonically
* increasing from the nowInSeconds specified in
* this token bucket's constructor.
* @return True if the rate limit check passed, false otherwise.
*/
bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
}
/**
* Similar to consume, but always consumes some number of tokens. If the
* bucket contains enough tokens - consumes toConsume tokens. Otherwise the
* bucket is drained.
*
* Thread-safe.
*
* @param toConsume The number of tokens to consume.
* @param nowInSeconds Current time in seconds. Should be monotonically
* increasing from the nowInSeconds specified in
* this token bucket's constructor.
* @return number of tokens that were consumed.
*/
double consumeOrDrain(
double toConsume,
double nowInSeconds = defaultClockNow()) {
return tokenBucket_.consumeOrDrain(
toConsume, rate_, burstSize_, nowInSeconds);
}
/**
* Returns the number of tokens currently available.
*
* Thread-safe (but returned value may immediately be outdated).
*/
double available(double nowInSeconds = defaultClockNow()) const {
return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
}
/**
* Returns the number of tokens generated per second.
*
* Thread-safe (but returned value may immediately be outdated).
*/
double rate() const noexcept {
return rate_;
}
/**
* Returns the maximum burst size.
*
* Thread-safe (but returned value may immediately be outdated).
*/
double burst() const noexcept {
return burstSize_;
}
private:
Impl tokenBucket_;
double rate_;
double burstSize_;
};
using TokenBucket = BasicTokenBucket<>;
using DynamicTokenBucket = BasicDynamicTokenBucket<>;
} // namespace folly