Chore: Migrate subscriptions/room to TS (#3752)

* chore: initial commit

* chore: fix readMessages

* chore: removing some `any`

* chore: removing some `any`

* chore: removing some `any`

* chore: fix erros after merge develop inside this branch

* chore: minor tweak

* chore: applying changes requested

* minor tweak

Co-authored-by: Gleidson Daniel Silva <gleidson10daniel@hotmail.com>
This commit is contained in:
Alex Junior 2022-03-03 21:49:20 -03:00 committed by GitHub
parent 765e526bbf
commit 3a7ec74e5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 28 deletions

View File

@ -93,6 +93,7 @@ export interface ISubscription {
avatarETag?: string; avatarETag?: string;
teamId?: string; teamId?: string;
teamMain?: boolean; teamMain?: boolean;
unsubscribe: () => Promise<any>;
separator?: boolean; separator?: boolean;
// https://nozbe.github.io/WatermelonDB/Relation.html#relation-api // https://nozbe.github.io/WatermelonDB/Relation.html#relation-api
messages: RelationModified<TMessageModel>; messages: RelationModified<TMessageModel>;

View File

@ -16,7 +16,7 @@ const getLastUpdate = async (rid: string) => {
return null; return null;
}; };
async function load({ rid: roomId, lastOpen }: { rid: string; lastOpen: Date }) { async function load({ rid: roomId, lastOpen }: { rid: string; lastOpen?: Date }) {
let lastUpdate; let lastUpdate;
if (lastOpen) { if (lastOpen) {
lastUpdate = new Date(lastOpen).toISOString(); lastUpdate = new Date(lastOpen).toISOString();
@ -29,7 +29,7 @@ async function load({ rid: roomId, lastOpen }: { rid: string; lastOpen: Date })
return result; return result;
} }
export default function loadMissedMessages(args: { rid: string; lastOpen: Date }): Promise<void> { export default function loadMissedMessages(args: { rid: string; lastOpen?: Date }): Promise<void> {
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, reject) => {
try { try {
const data = await load({ rid: args.rid, lastOpen: args.lastOpen }); const data = await load({ rid: args.rid, lastOpen: args.lastOpen });

View File

@ -15,11 +15,30 @@ import debounce from '../../../utils/debounce';
import RocketChat from '../../rocketchat'; import RocketChat from '../../rocketchat';
import { subscribeRoom, unsubscribeRoom } from '../../../actions/room'; import { subscribeRoom, unsubscribeRoom } from '../../../actions/room';
import { Encryption } from '../../encryption'; import { Encryption } from '../../encryption';
import { IMessage, TMessageModel, TSubscriptionModel, TThreadMessageModel, TThreadModel } from '../../../definitions';
import { IDDPMessage } from '../../../definitions/IDDPMessage';
const WINDOW_TIME = 1000; const WINDOW_TIME = 1000;
export default class RoomSubscription { export default class RoomSubscription {
constructor(rid) { private rid: string;
private isAlive: boolean;
private timer: null | number;
private queue: { [key: string]: IMessage };
private messagesBatch: {};
private _messagesBatch: { [key: string]: TMessageModel };
private threadsBatch: {};
private _threadsBatch: { [key: string]: TThreadModel };
private threadMessagesBatch: {};
private _threadMessagesBatch: { [key: string]: TThreadMessageModel };
private promises?: Promise<TSubscriptionModel[]>;
private connectedListener?: Promise<any>;
private disconnectedListener?: Promise<any>;
private notifyRoomListener?: Promise<any>;
private messageReceivedListener?: Promise<any>;
private lastOpen?: Date;
constructor(rid: string) {
this.rid = rid; this.rid = rid;
this.isAlive = true; this.isAlive = true;
this.timer = null; this.timer = null;
@ -27,6 +46,10 @@ export default class RoomSubscription {
this.messagesBatch = {}; this.messagesBatch = {};
this.threadsBatch = {}; this.threadsBatch = {};
this.threadMessagesBatch = {}; this.threadMessagesBatch = {};
this._messagesBatch = {};
this._threadsBatch = {};
this._threadMessagesBatch = {};
} }
subscribe = async () => { subscribe = async () => {
@ -41,7 +64,7 @@ export default class RoomSubscription {
this.notifyRoomListener = RocketChat.onStreamData('stream-notify-room', this.handleNotifyRoomReceived); this.notifyRoomListener = RocketChat.onStreamData('stream-notify-room', this.handleNotifyRoomReceived);
this.messageReceivedListener = RocketChat.onStreamData('stream-room-messages', this.handleMessageReceived); this.messageReceivedListener = RocketChat.onStreamData('stream-room-messages', this.handleMessageReceived);
if (!this.isAlive) { if (!this.isAlive) {
this.unsubscribe(); await this.unsubscribe();
} }
reduxStore.dispatch(subscribeRoom(this.rid)); reduxStore.dispatch(subscribeRoom(this.rid));
@ -69,7 +92,7 @@ export default class RoomSubscription {
} }
}; };
removeListener = async promise => { removeListener = async (promise?: Promise<any>): Promise<void> => {
if (promise) { if (promise) {
try { try {
const listener = await promise; const listener = await promise;
@ -85,7 +108,7 @@ export default class RoomSubscription {
RocketChat.loadMissedMessages({ rid: this.rid }).catch(e => console.log(e)); RocketChat.loadMissedMessages({ rid: this.rid }).catch(e => console.log(e));
}; };
handleNotifyRoomReceived = protectedFunction(ddpMessage => { handleNotifyRoomReceived = protectedFunction((ddpMessage: IDDPMessage) => {
const [_rid, ev] = ddpMessage.fields.eventName.split('/'); const [_rid, ev] = ddpMessage.fields.eventName.split('/');
if (this.rid !== _rid) { if (this.rid !== _rid) {
return; return;
@ -115,9 +138,9 @@ export default class RoomSubscription {
const msgCollection = db.get('messages'); const msgCollection = db.get('messages');
const threadsCollection = db.get('threads'); const threadsCollection = db.get('threads');
const threadMessagesCollection = db.get('thread_messages'); const threadMessagesCollection = db.get('thread_messages');
let deleteMessage; let deleteMessage: TMessageModel;
let deleteThread; let deleteThread: TThreadModel;
let deleteThreadMessage; let deleteThreadMessage: TThreadMessageModel;
// Delete message // Delete message
try { try {
@ -142,7 +165,7 @@ export default class RoomSubscription {
} catch (e) { } catch (e) {
// Do nothing // Do nothing
} }
await db.action(async () => { await db.write(async () => {
await db.batch(deleteMessage, deleteThread, deleteThreadMessage); await db.batch(deleteMessage, deleteThread, deleteThreadMessage);
}); });
} catch (e) { } catch (e) {
@ -153,11 +176,11 @@ export default class RoomSubscription {
} }
}); });
read = debounce(lastOpen => { read = debounce((lastOpen: Date) => {
RocketChat.readMessages(this.rid, lastOpen); RocketChat.readMessages(this.rid, lastOpen);
}, 300); }, 300);
updateMessage = message => updateMessage = (message: IMessage): Promise<void> =>
new Promise(async resolve => { new Promise(async resolve => {
if (this.rid !== message.rid) { if (this.rid !== message.rid) {
return resolve(); return resolve();
@ -177,15 +200,15 @@ export default class RoomSubscription {
const messageRecord = await getMessageById(message._id); const messageRecord = await getMessageById(message._id);
if (messageRecord) { if (messageRecord) {
operation = messageRecord.prepareUpdate( operation = messageRecord.prepareUpdate(
protectedFunction(m => { protectedFunction((m: TMessageModel) => {
Object.assign(m, message); Object.assign(m, message);
}) })
); );
} else { } else {
operation = msgCollection.prepareCreate( operation = msgCollection.prepareCreate(
protectedFunction(m => { protectedFunction((m: TMessageModel) => {
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema); m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
m.subscription.id = this.rid; if (m.subscription) m.subscription.id = this.rid;
Object.assign(m, message); Object.assign(m, message);
}) })
); );
@ -202,15 +225,15 @@ export default class RoomSubscription {
const threadRecord = await getThreadById(message._id); const threadRecord = await getThreadById(message._id);
if (threadRecord) { if (threadRecord) {
operation = threadRecord.prepareUpdate( operation = threadRecord.prepareUpdate(
protectedFunction(t => { protectedFunction((t: TThreadModel) => {
Object.assign(t, message); Object.assign(t, message);
}) })
); );
} else { } else {
operation = threadsCollection.prepareCreate( operation = threadsCollection.prepareCreate(
protectedFunction(t => { protectedFunction((t: TThreadModel) => {
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema); t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
t.subscription.id = this.rid; if (t.subscription) t.subscription.id = this.rid;
Object.assign(t, message); Object.assign(t, message);
}) })
); );
@ -228,20 +251,26 @@ export default class RoomSubscription {
const threadMessageRecord = await getThreadMessageById(message._id); const threadMessageRecord = await getThreadMessageById(message._id);
if (threadMessageRecord) { if (threadMessageRecord) {
operation = threadMessageRecord.prepareUpdate( operation = threadMessageRecord.prepareUpdate(
protectedFunction(tm => { protectedFunction((tm: TThreadMessageModel) => {
Object.assign(tm, message); Object.assign(tm, message);
tm.rid = message.tmid; if (message.tmid) {
delete tm.tmid; tm.rid = message.tmid;
delete tm.tmid;
}
}) })
); );
} else { } else {
operation = threadMessagesCollection.prepareCreate( operation = threadMessagesCollection.prepareCreate(
protectedFunction(tm => { protectedFunction((tm: TThreadMessageModel) => {
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema); tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
Object.assign(tm, message); Object.assign(tm, message);
tm.subscription.id = this.rid; if (tm.subscription) {
tm.rid = message.tmid; tm.subscription.id = this.rid;
delete tm.tmid; }
if (message.tmid) {
tm.rid = message.tmid;
delete tm.tmid;
}
}) })
); );
} }
@ -254,7 +283,7 @@ export default class RoomSubscription {
return resolve(); return resolve();
}); });
handleMessageReceived = ddpMessage => { handleMessageReceived = (ddpMessage: IDDPMessage) => {
if (!this.timer) { if (!this.timer) {
this.timer = setTimeout(async () => { this.timer = setTimeout(async () => {
// copy variables values to local and clean them // copy variables values to local and clean them
@ -280,7 +309,7 @@ export default class RoomSubscription {
try { try {
const db = database.active; const db = database.active;
await db.action(async () => { await db.write(async () => {
await db.batch( await db.batch(
...Object.values(this._messagesBatch), ...Object.values(this._messagesBatch),
...Object.values(this._threadsBatch), ...Object.values(this._threadsBatch),
@ -300,7 +329,7 @@ export default class RoomSubscription {
}, WINDOW_TIME); }, WINDOW_TIME);
} }
this.lastOpen = new Date(); this.lastOpen = new Date();
const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0])); const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0])) as IMessage;
this.queue[message._id] = message; this.queue[message._id] = message;
}; };
} }