diff --git a/app/definitions/ISubscription.ts b/app/definitions/ISubscription.ts index 1332fc442..72e0c9a5e 100644 --- a/app/definitions/ISubscription.ts +++ b/app/definitions/ISubscription.ts @@ -93,6 +93,7 @@ export interface ISubscription { avatarETag?: string; teamId?: string; teamMain?: boolean; + unsubscribe: () => Promise; separator?: boolean; // https://nozbe.github.io/WatermelonDB/Relation.html#relation-api messages: RelationModified; diff --git a/app/lib/methods/loadMissedMessages.ts b/app/lib/methods/loadMissedMessages.ts index 2674e8a19..5a114269c 100644 --- a/app/lib/methods/loadMissedMessages.ts +++ b/app/lib/methods/loadMissedMessages.ts @@ -16,7 +16,7 @@ const getLastUpdate = async (rid: string) => { return null; }; -async function load({ rid: roomId, lastOpen }: { rid: string; lastOpen: Date }) { +async function load({ rid: roomId, lastOpen }: { rid: string; lastOpen?: Date }) { let lastUpdate; if (lastOpen) { lastUpdate = new Date(lastOpen).toISOString(); @@ -29,7 +29,7 @@ async function load({ rid: roomId, lastOpen }: { rid: string; lastOpen: Date }) return result; } -export default function loadMissedMessages(args: { rid: string; lastOpen: Date }): Promise { +export default function loadMissedMessages(args: { rid: string; lastOpen?: Date }): Promise { return new Promise(async (resolve, reject) => { try { const data = await load({ rid: args.rid, lastOpen: args.lastOpen }); diff --git a/app/lib/methods/subscriptions/room.js b/app/lib/methods/subscriptions/room.ts similarity index 78% rename from app/lib/methods/subscriptions/room.js rename to app/lib/methods/subscriptions/room.ts index 3aa0a206d..f72137c22 100644 --- a/app/lib/methods/subscriptions/room.js +++ b/app/lib/methods/subscriptions/room.ts @@ -15,11 +15,30 @@ import debounce from '../../../utils/debounce'; import RocketChat from '../../rocketchat'; import { subscribeRoom, unsubscribeRoom } from '../../../actions/room'; import { Encryption } from '../../encryption'; +import { IMessage, TMessageModel, TSubscriptionModel, TThreadMessageModel, TThreadModel } from '../../../definitions'; +import { IDDPMessage } from '../../../definitions/IDDPMessage'; const WINDOW_TIME = 1000; export default class RoomSubscription { - constructor(rid) { + private rid: string; + private isAlive: boolean; + private timer: null | number; + private queue: { [key: string]: IMessage }; + private messagesBatch: {}; + private _messagesBatch: { [key: string]: TMessageModel }; + private threadsBatch: {}; + private _threadsBatch: { [key: string]: TThreadModel }; + private threadMessagesBatch: {}; + private _threadMessagesBatch: { [key: string]: TThreadMessageModel }; + private promises?: Promise; + private connectedListener?: Promise; + private disconnectedListener?: Promise; + private notifyRoomListener?: Promise; + private messageReceivedListener?: Promise; + private lastOpen?: Date; + + constructor(rid: string) { this.rid = rid; this.isAlive = true; this.timer = null; @@ -27,6 +46,10 @@ export default class RoomSubscription { this.messagesBatch = {}; this.threadsBatch = {}; this.threadMessagesBatch = {}; + + this._messagesBatch = {}; + this._threadsBatch = {}; + this._threadMessagesBatch = {}; } subscribe = async () => { @@ -41,7 +64,7 @@ export default class RoomSubscription { this.notifyRoomListener = RocketChat.onStreamData('stream-notify-room', this.handleNotifyRoomReceived); this.messageReceivedListener = RocketChat.onStreamData('stream-room-messages', this.handleMessageReceived); if (!this.isAlive) { - this.unsubscribe(); + await this.unsubscribe(); } reduxStore.dispatch(subscribeRoom(this.rid)); @@ -69,7 +92,7 @@ export default class RoomSubscription { } }; - removeListener = async promise => { + removeListener = async (promise?: Promise): Promise => { if (promise) { try { const listener = await promise; @@ -85,7 +108,7 @@ export default class RoomSubscription { RocketChat.loadMissedMessages({ rid: this.rid }).catch(e => console.log(e)); }; - handleNotifyRoomReceived = protectedFunction(ddpMessage => { + handleNotifyRoomReceived = protectedFunction((ddpMessage: IDDPMessage) => { const [_rid, ev] = ddpMessage.fields.eventName.split('/'); if (this.rid !== _rid) { return; @@ -115,9 +138,9 @@ export default class RoomSubscription { const msgCollection = db.get('messages'); const threadsCollection = db.get('threads'); const threadMessagesCollection = db.get('thread_messages'); - let deleteMessage; - let deleteThread; - let deleteThreadMessage; + let deleteMessage: TMessageModel; + let deleteThread: TThreadModel; + let deleteThreadMessage: TThreadMessageModel; // Delete message try { @@ -142,7 +165,7 @@ export default class RoomSubscription { } catch (e) { // Do nothing } - await db.action(async () => { + await db.write(async () => { await db.batch(deleteMessage, deleteThread, deleteThreadMessage); }); } catch (e) { @@ -153,11 +176,11 @@ export default class RoomSubscription { } }); - read = debounce(lastOpen => { + read = debounce((lastOpen: Date) => { RocketChat.readMessages(this.rid, lastOpen); }, 300); - updateMessage = message => + updateMessage = (message: IMessage): Promise => new Promise(async resolve => { if (this.rid !== message.rid) { return resolve(); @@ -177,15 +200,15 @@ export default class RoomSubscription { const messageRecord = await getMessageById(message._id); if (messageRecord) { operation = messageRecord.prepareUpdate( - protectedFunction(m => { + protectedFunction((m: TMessageModel) => { Object.assign(m, message); }) ); } else { operation = msgCollection.prepareCreate( - protectedFunction(m => { + protectedFunction((m: TMessageModel) => { m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema); - m.subscription.id = this.rid; + if (m.subscription) m.subscription.id = this.rid; Object.assign(m, message); }) ); @@ -202,15 +225,15 @@ export default class RoomSubscription { const threadRecord = await getThreadById(message._id); if (threadRecord) { operation = threadRecord.prepareUpdate( - protectedFunction(t => { + protectedFunction((t: TThreadModel) => { Object.assign(t, message); }) ); } else { operation = threadsCollection.prepareCreate( - protectedFunction(t => { + protectedFunction((t: TThreadModel) => { t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema); - t.subscription.id = this.rid; + if (t.subscription) t.subscription.id = this.rid; Object.assign(t, message); }) ); @@ -228,20 +251,26 @@ export default class RoomSubscription { const threadMessageRecord = await getThreadMessageById(message._id); if (threadMessageRecord) { operation = threadMessageRecord.prepareUpdate( - protectedFunction(tm => { + protectedFunction((tm: TThreadMessageModel) => { Object.assign(tm, message); - tm.rid = message.tmid; - delete tm.tmid; + if (message.tmid) { + tm.rid = message.tmid; + delete tm.tmid; + } }) ); } else { operation = threadMessagesCollection.prepareCreate( - protectedFunction(tm => { + protectedFunction((tm: TThreadMessageModel) => { tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema); Object.assign(tm, message); - tm.subscription.id = this.rid; - tm.rid = message.tmid; - delete tm.tmid; + if (tm.subscription) { + tm.subscription.id = this.rid; + } + if (message.tmid) { + tm.rid = message.tmid; + delete tm.tmid; + } }) ); } @@ -254,7 +283,7 @@ export default class RoomSubscription { return resolve(); }); - handleMessageReceived = ddpMessage => { + handleMessageReceived = (ddpMessage: IDDPMessage) => { if (!this.timer) { this.timer = setTimeout(async () => { // copy variables values to local and clean them @@ -280,7 +309,7 @@ export default class RoomSubscription { try { const db = database.active; - await db.action(async () => { + await db.write(async () => { await db.batch( ...Object.values(this._messagesBatch), ...Object.values(this._threadsBatch), @@ -300,7 +329,7 @@ export default class RoomSubscription { }, WINDOW_TIME); } this.lastOpen = new Date(); - const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0])); + const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0])) as IMessage; this.queue[message._id] = message; }; }