[FIX] Unsubscribe from room (#1655)
This commit is contained in:
parent
06085ebffb
commit
97cc18313d
|
@ -9,26 +9,66 @@ import database from '../../database';
|
||||||
import reduxStore from '../../createStore';
|
import reduxStore from '../../createStore';
|
||||||
import { addUserTyping, removeUserTyping, clearUserTyping } from '../../../actions/usersTyping';
|
import { addUserTyping, removeUserTyping, clearUserTyping } from '../../../actions/usersTyping';
|
||||||
import debounce from '../../../utils/debounce';
|
import debounce from '../../../utils/debounce';
|
||||||
|
import RocketChat from '../../rocketchat';
|
||||||
|
|
||||||
const unsubscribe = (subscriptions = []) => Promise.all(subscriptions.map(sub => sub.unsubscribe));
|
export default class RoomSubscription {
|
||||||
const removeListener = listener => listener.stop();
|
constructor(rid) {
|
||||||
|
this.rid = rid;
|
||||||
|
this.isAlive = true;
|
||||||
|
}
|
||||||
|
|
||||||
let promises;
|
subscribe = async() => {
|
||||||
let connectedListener;
|
console.log(`[RCRN] Subscribing to room ${ this.rid }`);
|
||||||
let disconnectedListener;
|
if (this.promises) {
|
||||||
let notifyRoomListener;
|
await this.unsubscribe();
|
||||||
let messageReceivedListener;
|
}
|
||||||
|
this.promises = RocketChat.subscribeRoom(this.rid);
|
||||||
|
|
||||||
export default function subscribeRoom({ rid }) {
|
this.connectedListener = RocketChat.onStreamData('connected', this.handleConnection);
|
||||||
console.log(`[RCRN] Subscribed to room ${ rid }`);
|
this.disconnectedListener = RocketChat.onStreamData('close', this.handleConnection);
|
||||||
|
this.notifyRoomListener = RocketChat.onStreamData('stream-notify-room', this.handleNotifyRoomReceived);
|
||||||
|
this.messageReceivedListener = RocketChat.onStreamData('stream-room-messages', this.handleMessageReceived);
|
||||||
|
if (!this.isAlive) {
|
||||||
|
this.unsubscribe();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const handleConnection = () => {
|
unsubscribe = async() => {
|
||||||
this.loadMissedMessages({ rid }).catch(e => console.log(e));
|
console.log(`[RCRN] Unsubscribing from room ${ this.rid }`);
|
||||||
|
this.isAlive = false;
|
||||||
|
if (this.promises) {
|
||||||
|
try {
|
||||||
|
const subscriptions = await this.promises || [];
|
||||||
|
subscriptions.map(sub => sub.unsubscribe());
|
||||||
|
} catch (e) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.removeListener(this.connectedListener);
|
||||||
|
this.removeListener(this.disconnectedListener);
|
||||||
|
this.removeListener(this.notifyRoomListener);
|
||||||
|
this.removeListener(this.messageReceivedListener);
|
||||||
|
reduxStore.dispatch(clearUserTyping());
|
||||||
|
}
|
||||||
|
|
||||||
|
removeListener = async(promise) => {
|
||||||
|
if (promise) {
|
||||||
|
try {
|
||||||
|
const listener = await promise;
|
||||||
|
listener.stop();
|
||||||
|
} catch (e) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const handleNotifyRoomReceived = protectedFunction((ddpMessage) => {
|
handleConnection = () => {
|
||||||
|
RocketChat.loadMissedMessages({ rid: this.rid }).catch(e => console.log(e));
|
||||||
|
};
|
||||||
|
|
||||||
|
handleNotifyRoomReceived = protectedFunction((ddpMessage) => {
|
||||||
const [_rid, ev] = ddpMessage.fields.eventName.split('/');
|
const [_rid, ev] = ddpMessage.fields.eventName.split('/');
|
||||||
if (rid !== _rid) {
|
if (this.rid !== _rid) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (ev === 'typing') {
|
if (ev === 'typing') {
|
||||||
|
@ -87,14 +127,14 @@ export default function subscribeRoom({ rid }) {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const read = debounce((lastOpen) => {
|
read = debounce((lastOpen) => {
|
||||||
this.readMessages(rid, lastOpen);
|
this.readMessages(this.rid, lastOpen);
|
||||||
}, 300);
|
}, 300);
|
||||||
|
|
||||||
const handleMessageReceived = protectedFunction((ddpMessage) => {
|
handleMessageReceived = protectedFunction((ddpMessage) => {
|
||||||
const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0]));
|
const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0]));
|
||||||
const lastOpen = new Date();
|
const lastOpen = new Date();
|
||||||
if (rid !== message.rid) {
|
if (this.rid !== message.rid) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
InteractionManager.runAfterInteractions(async() => {
|
InteractionManager.runAfterInteractions(async() => {
|
||||||
|
@ -126,7 +166,7 @@ export default function subscribeRoom({ rid }) {
|
||||||
batch.push(
|
batch.push(
|
||||||
msgCollection.prepareCreate(protectedFunction((m) => {
|
msgCollection.prepareCreate(protectedFunction((m) => {
|
||||||
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
|
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
|
||||||
m.subscription.id = rid;
|
m.subscription.id = this.rid;
|
||||||
Object.assign(m, message);
|
Object.assign(m, message);
|
||||||
}))
|
}))
|
||||||
);
|
);
|
||||||
|
@ -150,7 +190,7 @@ export default function subscribeRoom({ rid }) {
|
||||||
batch.push(
|
batch.push(
|
||||||
threadsCollection.prepareCreate(protectedFunction((t) => {
|
threadsCollection.prepareCreate(protectedFunction((t) => {
|
||||||
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
|
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
|
||||||
t.subscription.id = rid;
|
t.subscription.id = this.rid;
|
||||||
Object.assign(t, message);
|
Object.assign(t, message);
|
||||||
}))
|
}))
|
||||||
);
|
);
|
||||||
|
@ -178,7 +218,7 @@ export default function subscribeRoom({ rid }) {
|
||||||
threadMessagesCollection.prepareCreate(protectedFunction((tm) => {
|
threadMessagesCollection.prepareCreate(protectedFunction((tm) => {
|
||||||
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
|
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
|
||||||
Object.assign(tm, message);
|
Object.assign(tm, message);
|
||||||
tm.subscription.id = rid;
|
tm.subscription.id = this.rid;
|
||||||
tm.rid = message.tmid;
|
tm.rid = message.tmid;
|
||||||
delete tm.tmid;
|
delete tm.tmid;
|
||||||
}))
|
}))
|
||||||
|
@ -186,7 +226,7 @@ export default function subscribeRoom({ rid }) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
read(lastOpen);
|
this.read(lastOpen);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await db.action(async() => {
|
await db.action(async() => {
|
||||||
|
@ -197,49 +237,4 @@ export default function subscribeRoom({ rid }) {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
const stop = async() => {
|
|
||||||
let params;
|
|
||||||
if (promises) {
|
|
||||||
try {
|
|
||||||
params = await promises;
|
|
||||||
await unsubscribe(params);
|
|
||||||
} catch (error) {
|
|
||||||
// Do nothing
|
|
||||||
}
|
|
||||||
promises = false;
|
|
||||||
}
|
|
||||||
if (connectedListener) {
|
|
||||||
params = await connectedListener;
|
|
||||||
removeListener(params);
|
|
||||||
connectedListener = false;
|
|
||||||
}
|
|
||||||
if (disconnectedListener) {
|
|
||||||
params = await disconnectedListener;
|
|
||||||
removeListener(params);
|
|
||||||
disconnectedListener = false;
|
|
||||||
}
|
|
||||||
if (notifyRoomListener) {
|
|
||||||
params = await notifyRoomListener;
|
|
||||||
removeListener(params);
|
|
||||||
notifyRoomListener = false;
|
|
||||||
}
|
|
||||||
if (messageReceivedListener) {
|
|
||||||
params = await messageReceivedListener;
|
|
||||||
removeListener(params);
|
|
||||||
messageReceivedListener = false;
|
|
||||||
}
|
|
||||||
reduxStore.dispatch(clearUserTyping());
|
|
||||||
};
|
|
||||||
|
|
||||||
connectedListener = this.sdk.onStreamData('connected', handleConnection);
|
|
||||||
disconnectedListener = this.sdk.onStreamData('close', handleConnection);
|
|
||||||
notifyRoomListener = this.sdk.onStreamData('stream-notify-room', handleNotifyRoomReceived);
|
|
||||||
messageReceivedListener = this.sdk.onStreamData('stream-room-messages', handleMessageReceived);
|
|
||||||
|
|
||||||
promises = this.sdk.subscribeRoom(rid);
|
|
||||||
|
|
||||||
return {
|
|
||||||
stop: () => stop()
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ import {
|
||||||
} from '../actions/share';
|
} from '../actions/share';
|
||||||
|
|
||||||
import subscribeRooms from './methods/subscriptions/rooms';
|
import subscribeRooms from './methods/subscriptions/rooms';
|
||||||
import subscribeRoom from './methods/subscriptions/room';
|
|
||||||
|
|
||||||
import protectedFunction from './methods/helpers/protectedFunction';
|
import protectedFunction from './methods/helpers/protectedFunction';
|
||||||
import readMessages from './methods/readMessages';
|
import readMessages from './methods/readMessages';
|
||||||
|
@ -73,7 +72,6 @@ const RocketChat = {
|
||||||
log(e);
|
log(e);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
subscribeRoom,
|
|
||||||
canOpenRoom,
|
canOpenRoom,
|
||||||
createChannel({
|
createChannel({
|
||||||
name, users, type, readOnly, broadcast
|
name, users, type, readOnly, broadcast
|
||||||
|
@ -669,6 +667,9 @@ const RocketChat = {
|
||||||
subscribe(...args) {
|
subscribe(...args) {
|
||||||
return this.sdk.subscribe(...args);
|
return this.sdk.subscribe(...args);
|
||||||
},
|
},
|
||||||
|
subscribeRoom(...args) {
|
||||||
|
return this.sdk.subscribeRoom(...args);
|
||||||
|
},
|
||||||
unsubscribe(subscription) {
|
unsubscribe(subscription) {
|
||||||
return this.sdk.unsubscribe(subscription);
|
return this.sdk.unsubscribe(subscription);
|
||||||
},
|
},
|
||||||
|
|
|
@ -48,6 +48,7 @@ import {
|
||||||
} from '../../commands';
|
} from '../../commands';
|
||||||
import ModalNavigation from '../../lib/ModalNavigation';
|
import ModalNavigation from '../../lib/ModalNavigation';
|
||||||
import { Review } from '../../utils/review';
|
import { Review } from '../../utils/review';
|
||||||
|
import RoomClass from '../../lib/methods/subscriptions/room';
|
||||||
|
|
||||||
const stateAttrsUpdate = [
|
const stateAttrsUpdate = [
|
||||||
'joined',
|
'joined',
|
||||||
|
@ -185,6 +186,7 @@ class RoomView extends React.Component {
|
||||||
this.list = React.createRef();
|
this.list = React.createRef();
|
||||||
this.willBlurListener = props.navigation.addListener('willBlur', () => this.mounted = false);
|
this.willBlurListener = props.navigation.addListener('willBlur', () => this.mounted = false);
|
||||||
this.mounted = false;
|
this.mounted = false;
|
||||||
|
this.sub = new RoomClass(this.rid);
|
||||||
console.timeEnd(`${ this.constructor.name } init`);
|
console.timeEnd(`${ this.constructor.name } init`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,7 +286,7 @@ class RoomView extends React.Component {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await this.unsubscribe();
|
this.unsubscribe();
|
||||||
if (this.didFocusListener && this.didFocusListener.remove) {
|
if (this.didFocusListener && this.didFocusListener.remove) {
|
||||||
this.didFocusListener.remove();
|
this.didFocusListener.remove();
|
||||||
}
|
}
|
||||||
|
@ -294,9 +296,6 @@ class RoomView extends React.Component {
|
||||||
if (this.onForegroundInteraction && this.onForegroundInteraction.cancel) {
|
if (this.onForegroundInteraction && this.onForegroundInteraction.cancel) {
|
||||||
this.onForegroundInteraction.cancel();
|
this.onForegroundInteraction.cancel();
|
||||||
}
|
}
|
||||||
if (this.initInteraction && this.initInteraction.cancel) {
|
|
||||||
this.initInteraction.cancel();
|
|
||||||
}
|
|
||||||
if (this.willBlurListener && this.willBlurListener.remove) {
|
if (this.willBlurListener && this.willBlurListener.remove) {
|
||||||
this.willBlurListener.remove();
|
this.willBlurListener.remove();
|
||||||
}
|
}
|
||||||
|
@ -339,8 +338,7 @@ class RoomView extends React.Component {
|
||||||
this.setLastOpen(null);
|
this.setLastOpen(null);
|
||||||
}
|
}
|
||||||
RocketChat.readMessages(room.rid, newLastOpen).catch(e => console.log(e));
|
RocketChat.readMessages(room.rid, newLastOpen).catch(e => console.log(e));
|
||||||
await this.unsubscribe();
|
this.sub.subscribe();
|
||||||
this.sub = RocketChat.subscribeRoom(room);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,9 +385,10 @@ class RoomView extends React.Component {
|
||||||
}
|
}
|
||||||
|
|
||||||
unsubscribe = async() => {
|
unsubscribe = async() => {
|
||||||
if (this.sub && this.sub.stop) {
|
if (this.sub && this.sub.unsubscribe) {
|
||||||
await this.sub.stop();
|
await this.sub.unsubscribe();
|
||||||
}
|
}
|
||||||
|
delete this.sub;
|
||||||
}
|
}
|
||||||
|
|
||||||
observeRoom = (room) => {
|
observeRoom = (room) => {
|
||||||
|
|
Loading…
Reference in New Issue