diff --git a/app/lib/methods/subscriptions/room.js b/app/lib/methods/subscriptions/room.js index bf3a1c635..d2f4bc552 100644 --- a/app/lib/methods/subscriptions/room.js +++ b/app/lib/methods/subscriptions/room.js @@ -9,26 +9,66 @@ import database from '../../database'; import reduxStore from '../../createStore'; import { addUserTyping, removeUserTyping, clearUserTyping } from '../../../actions/usersTyping'; import debounce from '../../../utils/debounce'; +import RocketChat from '../../rocketchat'; -const unsubscribe = (subscriptions = []) => Promise.all(subscriptions.map(sub => sub.unsubscribe)); -const removeListener = listener => listener.stop(); +export default class RoomSubscription { + constructor(rid) { + this.rid = rid; + this.isAlive = true; + } -let promises; -let connectedListener; -let disconnectedListener; -let notifyRoomListener; -let messageReceivedListener; + subscribe = async() => { + console.log(`[RCRN] Subscribing to room ${ this.rid }`); + if (this.promises) { + await this.unsubscribe(); + } + this.promises = RocketChat.subscribeRoom(this.rid); -export default function subscribeRoom({ rid }) { - console.log(`[RCRN] Subscribed to room ${ rid }`); + this.connectedListener = RocketChat.onStreamData('connected', this.handleConnection); + this.disconnectedListener = RocketChat.onStreamData('close', this.handleConnection); + this.notifyRoomListener = RocketChat.onStreamData('stream-notify-room', this.handleNotifyRoomReceived); + this.messageReceivedListener = RocketChat.onStreamData('stream-room-messages', this.handleMessageReceived); + if (!this.isAlive) { + this.unsubscribe(); + } + } - const handleConnection = () => { - this.loadMissedMessages({ rid }).catch(e => console.log(e)); + unsubscribe = async() => { + console.log(`[RCRN] Unsubscribing from room ${ this.rid }`); + this.isAlive = false; + if (this.promises) { + try { + const subscriptions = await this.promises || []; + subscriptions.map(sub => sub.unsubscribe()); + } catch (e) { + // do nothing + } + } + this.removeListener(this.connectedListener); + this.removeListener(this.disconnectedListener); + this.removeListener(this.notifyRoomListener); + this.removeListener(this.messageReceivedListener); + reduxStore.dispatch(clearUserTyping()); + } + + removeListener = async(promise) => { + if (promise) { + try { + const listener = await promise; + listener.stop(); + } catch (e) { + // do nothing + } + } }; - const handleNotifyRoomReceived = protectedFunction((ddpMessage) => { + handleConnection = () => { + RocketChat.loadMissedMessages({ rid: this.rid }).catch(e => console.log(e)); + }; + + handleNotifyRoomReceived = protectedFunction((ddpMessage) => { const [_rid, ev] = ddpMessage.fields.eventName.split('/'); - if (rid !== _rid) { + if (this.rid !== _rid) { return; } if (ev === 'typing') { @@ -87,14 +127,14 @@ export default function subscribeRoom({ rid }) { } }); - const read = debounce((lastOpen) => { - this.readMessages(rid, lastOpen); + read = debounce((lastOpen) => { + this.readMessages(this.rid, lastOpen); }, 300); - const handleMessageReceived = protectedFunction((ddpMessage) => { + handleMessageReceived = protectedFunction((ddpMessage) => { const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0])); const lastOpen = new Date(); - if (rid !== message.rid) { + if (this.rid !== message.rid) { return; } InteractionManager.runAfterInteractions(async() => { @@ -126,7 +166,7 @@ export default function subscribeRoom({ rid }) { batch.push( msgCollection.prepareCreate(protectedFunction((m) => { m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema); - m.subscription.id = rid; + m.subscription.id = this.rid; Object.assign(m, message); })) ); @@ -150,7 +190,7 @@ export default function subscribeRoom({ rid }) { batch.push( threadsCollection.prepareCreate(protectedFunction((t) => { t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema); - t.subscription.id = rid; + t.subscription.id = this.rid; Object.assign(t, message); })) ); @@ -178,7 +218,7 @@ export default function subscribeRoom({ rid }) { threadMessagesCollection.prepareCreate(protectedFunction((tm) => { tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema); Object.assign(tm, message); - tm.subscription.id = rid; + tm.subscription.id = this.rid; tm.rid = message.tmid; delete tm.tmid; })) @@ -186,7 +226,7 @@ export default function subscribeRoom({ rid }) { } } - read(lastOpen); + this.read(lastOpen); try { await db.action(async() => { @@ -197,49 +237,4 @@ export default function subscribeRoom({ rid }) { } }); }); - - const stop = async() => { - let params; - if (promises) { - try { - params = await promises; - await unsubscribe(params); - } catch (error) { - // Do nothing - } - promises = false; - } - if (connectedListener) { - params = await connectedListener; - removeListener(params); - connectedListener = false; - } - if (disconnectedListener) { - params = await disconnectedListener; - removeListener(params); - disconnectedListener = false; - } - if (notifyRoomListener) { - params = await notifyRoomListener; - removeListener(params); - notifyRoomListener = false; - } - if (messageReceivedListener) { - params = await messageReceivedListener; - removeListener(params); - messageReceivedListener = false; - } - reduxStore.dispatch(clearUserTyping()); - }; - - connectedListener = this.sdk.onStreamData('connected', handleConnection); - disconnectedListener = this.sdk.onStreamData('close', handleConnection); - notifyRoomListener = this.sdk.onStreamData('stream-notify-room', handleNotifyRoomReceived); - messageReceivedListener = this.sdk.onStreamData('stream-room-messages', handleMessageReceived); - - promises = this.sdk.subscribeRoom(rid); - - return { - stop: () => stop() - }; } diff --git a/app/lib/rocketchat.js b/app/lib/rocketchat.js index f77494c43..7b1556fcd 100644 --- a/app/lib/rocketchat.js +++ b/app/lib/rocketchat.js @@ -21,7 +21,6 @@ import { } from '../actions/share'; import subscribeRooms from './methods/subscriptions/rooms'; -import subscribeRoom from './methods/subscriptions/room'; import protectedFunction from './methods/helpers/protectedFunction'; import readMessages from './methods/readMessages'; @@ -73,7 +72,6 @@ const RocketChat = { log(e); } }, - subscribeRoom, canOpenRoom, createChannel({ name, users, type, readOnly, broadcast @@ -669,6 +667,9 @@ const RocketChat = { subscribe(...args) { return this.sdk.subscribe(...args); }, + subscribeRoom(...args) { + return this.sdk.subscribeRoom(...args); + }, unsubscribe(subscription) { return this.sdk.unsubscribe(subscription); }, diff --git a/app/views/RoomView/index.js b/app/views/RoomView/index.js index 713da66d0..5b1ccb41a 100644 --- a/app/views/RoomView/index.js +++ b/app/views/RoomView/index.js @@ -48,6 +48,7 @@ import { } from '../../commands'; import ModalNavigation from '../../lib/ModalNavigation'; import { Review } from '../../utils/review'; +import RoomClass from '../../lib/methods/subscriptions/room'; const stateAttrsUpdate = [ 'joined', @@ -185,6 +186,7 @@ class RoomView extends React.Component { this.list = React.createRef(); this.willBlurListener = props.navigation.addListener('willBlur', () => this.mounted = false); this.mounted = false; + this.sub = new RoomClass(this.rid); console.timeEnd(`${ this.constructor.name } init`); } @@ -284,7 +286,7 @@ class RoomView extends React.Component { } } } - await this.unsubscribe(); + this.unsubscribe(); if (this.didFocusListener && this.didFocusListener.remove) { this.didFocusListener.remove(); } @@ -294,9 +296,6 @@ class RoomView extends React.Component { if (this.onForegroundInteraction && this.onForegroundInteraction.cancel) { this.onForegroundInteraction.cancel(); } - if (this.initInteraction && this.initInteraction.cancel) { - this.initInteraction.cancel(); - } if (this.willBlurListener && this.willBlurListener.remove) { this.willBlurListener.remove(); } @@ -339,8 +338,7 @@ class RoomView extends React.Component { this.setLastOpen(null); } RocketChat.readMessages(room.rid, newLastOpen).catch(e => console.log(e)); - await this.unsubscribe(); - this.sub = RocketChat.subscribeRoom(room); + this.sub.subscribe(); } } @@ -387,9 +385,10 @@ class RoomView extends React.Component { } unsubscribe = async() => { - if (this.sub && this.sub.stop) { - await this.sub.stop(); + if (this.sub && this.sub.unsubscribe) { + await this.sub.unsubscribe(); } + delete this.sub; } observeRoom = (room) => {