Rocket.Chat.ReactNative/ios/Pods/Flipper-RSocket/rsocket/statemachine/RSocketStateMachine.h

350 lines
11 KiB
C++

// Copyright (c) Facebook, Inc. and its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <deque>
#include <memory>
#include "rsocket/ColdResumeHandler.h"
#include "rsocket/DuplexConnection.h"
#include "rsocket/Payload.h"
#include "rsocket/RSocketParameters.h"
#include "rsocket/ResumeManager.h"
#include "rsocket/framing/FrameProcessor.h"
#include "rsocket/framing/FrameSerializer.h"
#include "rsocket/internal/Common.h"
#include "rsocket/internal/KeepaliveTimer.h"
#include "rsocket/statemachine/StreamFragmentAccumulator.h"
#include "rsocket/statemachine/StreamStateMachineBase.h"
#include "rsocket/statemachine/StreamsWriter.h"
#include "yarpl/flowable/Subscriber.h"
#include "yarpl/flowable/Subscription.h"
#include "yarpl/single/SingleObserver.h"
namespace rsocket {
class ClientResumeStatusCallback;
class DuplexConnection;
class FrameTransport;
class Frame_ERROR;
class KeepaliveTimer;
class RSocketConnectionEvents;
class RSocketParameters;
class RSocketResponder;
class RSocketResponderCore;
class RSocketStateMachine;
class RSocketStats;
class ResumeManager;
class RSocketStateMachineTest;
class FrameSink {
public:
virtual ~FrameSink() = default;
/// Terminates underlying connection sending the error frame
/// on the connection.
///
/// This may synchronously deliver terminal signals to all
/// StreamAutomatonBase attached to this ConnectionAutomaton.
virtual void disconnectOrCloseWithError(Frame_ERROR&& error) = 0;
virtual void sendKeepalive(
std::unique_ptr<folly::IOBuf> data = folly::IOBuf::create(0)) = 0;
};
/// Handles connection-level frames and (de)multiplexes streams.
///
/// Instances of this class should be accessed and managed via shared_ptr,
/// instead of the pattern reflected in MemoryMixin and IntrusiveDeleter.
/// The reason why such a simple memory management story is possible lies in the
/// fact that there is no request(n)-based flow control between stream
/// automata and ConnectionAutomaton.
class RSocketStateMachine final
: public FrameSink,
public FrameProcessor,
public StreamsWriterImpl,
public std::enable_shared_from_this<RSocketStateMachine> {
public:
RSocketStateMachine(
std::shared_ptr<RSocketResponderCore> requestResponder,
std::unique_ptr<KeepaliveTimer> keepaliveTimer,
RSocketMode mode,
std::shared_ptr<RSocketStats> stats,
std::shared_ptr<RSocketConnectionEvents> connectionEvents,
std::shared_ptr<ResumeManager> resumeManager,
std::shared_ptr<ColdResumeHandler> coldResumeHandler);
RSocketStateMachine(
std::shared_ptr<RSocketResponder> requestResponder,
std::unique_ptr<KeepaliveTimer> keepaliveTimer,
RSocketMode mode,
std::shared_ptr<RSocketStats> stats,
std::shared_ptr<RSocketConnectionEvents> connectionEvents,
std::shared_ptr<ResumeManager> resumeManager,
std::shared_ptr<ColdResumeHandler> coldResumeHandler);
~RSocketStateMachine();
/// Create a new connection as a server.
void connectServer(std::shared_ptr<FrameTransport>, const SetupParameters&);
/// Resume a connection as a server.
bool resumeServer(std::shared_ptr<FrameTransport>, const ResumeParameters&);
/// Connect as a client. Sends a SETUP frame.
void connectClient(std::shared_ptr<FrameTransport>, SetupParameters);
/// Resume a connection as a client. Sends a RESUME frame.
void resumeClient(
ResumeIdentificationToken,
std::shared_ptr<FrameTransport>,
std::unique_ptr<ClientResumeStatusCallback>,
ProtocolVersion);
/// Disconnect the state machine's connection. Existing streams will stay
/// intact.
void disconnect(folly::exception_wrapper);
/// Whether the connection has been disconnected or closed.
bool isDisconnected() const;
/// Send an ERROR frame, and close the connection and all of its streams.
void closeWithError(Frame_ERROR&&);
/// Disconnect the connection if it is resumable, otherwise send an ERROR
/// frame and close the connection and all of its streams.
void disconnectOrCloseWithError(Frame_ERROR&&) override;
/// Close the connection and all of its streams.
void close(folly::exception_wrapper, StreamCompletionSignal);
void requestStream(
Payload request,
std::shared_ptr<yarpl::flowable::Subscriber<Payload>> responseSink);
std::shared_ptr<yarpl::flowable::Subscriber<Payload>> requestChannel(
Payload request,
bool hasInitialRequest,
std::shared_ptr<yarpl::flowable::Subscriber<Payload>> responseSink);
void requestResponse(
Payload payload,
std::shared_ptr<yarpl::single::SingleObserver<Payload>> responseSink);
/// Send a REQUEST_FNF frame.
void fireAndForget(Payload);
/// Send a METADATA_PUSH frame.
void metadataPush(std::unique_ptr<folly::IOBuf>);
/// Send a KEEPALIVE frame, with the RESPOND flag set.
void sendKeepalive(std::unique_ptr<folly::IOBuf>) override;
class CloseCallback {
public:
virtual ~CloseCallback() = default;
virtual void remove(RSocketStateMachine&) = 0;
};
/// Register a callback to be called when the StateMachine is closed.
/// It will be used to inform the containers, i.e. ConnectionSet or
/// wangle::ConnectionManager, to don't store the StateMachine anymore.
void registerCloseCallback(CloseCallback* callback);
DuplexConnection* getConnection();
// Has active requests?
bool hasStreams() const;
private:
// connection scope signals
void onKeepAliveFrame(
ResumePosition resumePosition,
std::unique_ptr<folly::IOBuf> data,
bool keepAliveRespond);
void onMetadataPushFrame(std::unique_ptr<folly::IOBuf> metadata);
void onResumeOkFrame(ResumePosition resumePosition);
void onErrorFrame(StreamId streamId, ErrorCode errorCode, Payload payload);
// stream scope signals
void onRequestNFrame(StreamId streamId, uint32_t requestN);
void onCancelFrame(StreamId streamId);
void onPayloadFrame(
StreamId streamId,
Payload payload,
bool flagsFollows,
bool flagsComplete,
bool flagsNext);
void onRequestStreamFrame(
StreamId streamId,
uint32_t requestN,
Payload payload,
bool flagsFollows);
void onRequestChannelFrame(
StreamId streamId,
uint32_t requestN,
Payload payload,
bool flagsComplete,
bool flagsNext,
bool flagsFollows);
void
onRequestResponseFrame(StreamId streamId, Payload payload, bool flagsFollows);
void
onFireAndForgetFrame(StreamId streamId, Payload payload, bool flagsFollows);
void onSetupFrame();
void onResumeFrame();
void onReservedFrame();
void onLeaseFrame();
void onExtFrame();
void onUnexpectedFrame(StreamId streamId);
std::shared_ptr<StreamStateMachineBase> getStreamStateMachine(
StreamId streamId);
void connect(std::shared_ptr<FrameTransport>);
/// Terminate underlying connection and connect new connection
void reconnect(
std::shared_ptr<FrameTransport>,
std::unique_ptr<ClientResumeStatusCallback>);
void setResumable(bool);
bool resumeFromPositionOrClose(
ResumePosition serverPosition,
ResumePosition clientPosition);
bool isPositionAvailable(ResumePosition) const;
/// Whether the connection has been closed.
bool isClosed() const;
uint32_t getKeepaliveTime() const;
void sendPendingFrames() override;
// Should buffer the frame if the state machine is disconnected or in the
// process of resuming.
bool shouldQueue() override;
RSocketStats& stats() override {
return *stats_;
}
FrameSerializer& serializer() override {
return *frameSerializer_;
}
template <typename TFrame>
bool deserializeFrameOrError(
TFrame& frame,
std::unique_ptr<folly::IOBuf> buf) {
if (frameSerializer_->deserializeFrom(frame, std::move(buf))) {
return true;
}
closeWithError(Frame_ERROR::connectionError("Invalid frame"));
return false;
}
// FrameProcessor.
void processFrame(std::unique_ptr<folly::IOBuf>) override;
void onTerminal(folly::exception_wrapper) override;
void handleFrame(StreamId, FrameType, std::unique_ptr<folly::IOBuf>);
void closeStreams(StreamCompletionSignal);
void closeFrameTransport(folly::exception_wrapper);
void sendKeepalive(FrameFlags, std::unique_ptr<folly::IOBuf>);
void resumeFromPosition(ResumePosition);
void outputFrame(std::unique_ptr<folly::IOBuf>) override;
void writeNewStream(
StreamId streamId,
StreamType streamType,
uint32_t initialRequestN,
Payload payload) override;
std::shared_ptr<yarpl::flowable::Subscriber<Payload>> onNewStreamReady(
StreamId streamId,
StreamType streamType,
Payload payload,
std::shared_ptr<yarpl::flowable::Subscriber<Payload>> response) override;
void onNewStreamReady(
StreamId streamId,
StreamType streamType,
Payload payload,
std::shared_ptr<yarpl::single::SingleObserver<Payload>> response)
override;
void onStreamClosed(StreamId) override;
bool ensureOrAutodetectFrameSerializer(const folly::IOBuf& firstFrame);
bool ensureNotInResumption();
size_t getConsumerAllowance(StreamId) const;
void setProtocolVersionOrThrow(
ProtocolVersion version,
const std::shared_ptr<FrameTransport>& transport);
bool isNewStreamId(StreamId streamId);
bool registerNewPeerStreamId(StreamId streamId);
StreamId getNextStreamId();
void setNextStreamId(StreamId streamId);
/// Client/server mode this state machine is operating in.
const RSocketMode mode_;
/// Whether the connection was initialized as resumable.
bool isResumable_{false};
/// Whether the connection has closed.
bool isClosed_{false};
/// Whether a cold resume is currently in progress.
bool coldResumeInProgress_{false};
std::shared_ptr<RSocketStats> stats_;
/// Map of all individual stream state machines.
std::unordered_map<StreamId, std::shared_ptr<StreamStateMachineBase>>
streams_;
StreamId nextStreamId_;
StreamId lastPeerStreamId_{0};
// Manages all state needed for warm/cold resumption.
std::shared_ptr<ResumeManager> resumeManager_;
const std::shared_ptr<RSocketResponderCore> requestResponder_;
std::shared_ptr<FrameTransport> frameTransport_;
std::unique_ptr<FrameSerializer> frameSerializer_;
const std::unique_ptr<KeepaliveTimer> keepaliveTimer_;
std::unique_ptr<ClientResumeStatusCallback> resumeCallback_;
std::shared_ptr<ColdResumeHandler> coldResumeHandler_;
std::shared_ptr<RSocketConnectionEvents> connectionEvents_;
CloseCallback* closeCallback_{nullptr};
friend class RSocketStateMachineTest;
};
} // namespace rsocket