Chore: Migrate subscriptions/rooms to TS (#3748)

* chore: migrate subscriptions rooms to ts

* chore: adding a TODO to remember this problem

* chore: removing unnecessary todos

* chore: minor tweak after develop updates

* chore: migrate message service to ts

* chore: minor tweaks

* chore: minor tweak

* chore: minor tweak after merge develop into this branch

* chore: minor tweak after merge with dev

* minor tweak
This commit is contained in:
Alex Junior 2022-03-03 21:23:11 -03:00 committed by GitHub
parent dff60b6703
commit 765e526bbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 78 additions and 55 deletions

View File

@ -0,0 +1,7 @@
export interface IDDPMessage {
msg: string;
fields: {
eventName: string;
args: any;
};
}

View File

@ -15,7 +15,7 @@ interface IRequestTranscript {
} }
export interface IRoom { export interface IRoom {
_id?: string; _id: string;
fname?: string; fname?: string;
id: string; id: string;
rid: string; rid: string;

View File

@ -1,8 +1,8 @@
import { IMessage, IThreadResult } from '../../../definitions'; import { ILastMessage, IMessage, IThreadResult } from '../../../definitions';
import messagesStatus from '../../../constants/messagesStatus'; import messagesStatus from '../../../constants/messagesStatus';
import normalizeMessage from './normalizeMessage'; import normalizeMessage from './normalizeMessage';
export default (message: Partial<IMessage> | IThreadResult): Partial<IMessage> | IThreadResult => { export default (message: Partial<IMessage> | IThreadResult | ILastMessage): IMessage | IThreadResult => {
message.status = messagesStatus.SENT; message.status = messagesStatus.SENT;
return normalizeMessage(message); return normalizeMessage(message);
}; };

View File

@ -5,12 +5,19 @@ import { store as reduxStore } from '../../auxStore';
import { compareServerVersion } from '../../utils'; import { compareServerVersion } from '../../utils';
import findSubscriptionsRooms from './findSubscriptionsRooms'; import findSubscriptionsRooms from './findSubscriptionsRooms';
import normalizeMessage from './normalizeMessage'; import normalizeMessage from './normalizeMessage';
import { ISubscription, IServerRoom, IServerSubscription, IServerSubscriptionItem, IServerRoomItem } from '../../../definitions'; import {
ISubscription,
IServerRoom,
IServerSubscription,
IServerSubscriptionItem,
IServerRoomItem,
IRoom
} from '../../../definitions';
// TODO: delete and update // TODO: delete and update
export const merge = ( export const merge = (
subscription: ISubscription | IServerSubscriptionItem, subscription: ISubscription | IServerSubscriptionItem,
room?: ISubscription | IServerRoomItem room?: ISubscription | IServerRoomItem | IRoom
): ISubscription => { ): ISubscription => {
const serverVersion = reduxStore.getState().server.version as string; const serverVersion = reduxStore.getState().server.version as string;
subscription = EJSON.fromJSONValue(subscription) as ISubscription; subscription = EJSON.fromJSONValue(subscription) as ISubscription;

View File

@ -1,11 +1,11 @@
import { sanitizedRaw } from '@nozbe/watermelondb/RawRecord'; import { sanitizedRaw } from '@nozbe/watermelondb/RawRecord';
import { InteractionManager } from 'react-native'; import { InteractionManager } from 'react-native';
import EJSON from 'ejson'; import EJSON from 'ejson';
import Model from '@nozbe/watermelondb/Model';
import database from '../../database'; import database from '../../database';
import { merge } from '../helpers/mergeSubscriptionsRooms'; import { merge } from '../helpers/mergeSubscriptionsRooms';
import protectedFunction from '../helpers/protectedFunction'; import protectedFunction from '../helpers/protectedFunction';
import messagesStatus from '../../../constants/messagesStatus';
import log from '../../../utils/log'; import log from '../../../utils/log';
import random from '../../../utils/random'; import random from '../../../utils/random';
import { store } from '../../auxStore'; import { store } from '../../auxStore';
@ -19,16 +19,29 @@ import { INAPP_NOTIFICATION_EMITTER } from '../../../containers/InAppNotificatio
import { Encryption } from '../../encryption'; import { Encryption } from '../../encryption';
import { E2E_MESSAGE_TYPE } from '../../encryption/constants'; import { E2E_MESSAGE_TYPE } from '../../encryption/constants';
import updateMessages from '../updateMessages'; import updateMessages from '../updateMessages';
import {
IMessage,
IRoom,
ISubscription,
TMessageModel,
TRoomModel,
TThreadMessageModel,
TThreadModel
} from '../../../definitions';
import sdk from '../../rocketchat/services/sdk';
import { IDDPMessage } from '../../../definitions/IDDPMessage';
import { getSubscriptionByRoomId } from '../../database/services/Subscription';
import { getMessageById } from '../../database/services/Message';
const removeListener = listener => listener.stop(); const removeListener = (listener: { stop: () => void }) => listener.stop();
let streamListener; let streamListener: Promise<any> | false;
let subServer; let subServer: string;
let queue = {}; let queue: { [key: string]: ISubscription } = {};
let subTimer = null; let subTimer: number | null | false = null;
const WINDOW_TIME = 500; const WINDOW_TIME = 500;
const createOrUpdateSubscription = async (subscription, room) => { const createOrUpdateSubscription = async (subscription: ISubscription, room: IRoom | ISubscription) => {
try { try {
const db = database.active; const db = database.active;
const subCollection = db.get('subscriptions'); const subCollection = db.get('subscriptions');
@ -86,12 +99,12 @@ const createOrUpdateSubscription = async (subscription, room) => {
e2eKeyId: s.e2eKeyId, e2eKeyId: s.e2eKeyId,
E2EKey: s.E2EKey, E2EKey: s.E2EKey,
avatarETag: s.avatarETag avatarETag: s.avatarETag
}; } as ISubscription;
} catch (error) { } catch (error) {
try { try {
await db.action(async () => { await db.write(async () => {
await roomsCollection.create( await roomsCollection.create(
protectedFunction(r => { protectedFunction((r: TRoomModel) => {
r._raw = sanitizedRaw({ id: room._id }, roomsCollection.schema); r._raw = sanitizedRaw({ id: room._id }, roomsCollection.schema);
Object.assign(r, room); Object.assign(r, room);
}) })
@ -121,23 +134,15 @@ const createOrUpdateSubscription = async (subscription, room) => {
departmentId: r.departmentId, departmentId: r.departmentId,
livechatData: r.livechatData, livechatData: r.livechatData,
avatarETag: r.avatarETag avatarETag: r.avatarETag
}; } as IRoom;
} catch (error) { } catch (error) {
// Do nothing // Do nothing
} }
} }
let tmp; let tmp = merge(subscription, room);
if (subscription) { tmp = (await Encryption.decryptSubscription(tmp)) as ISubscription;
tmp = merge(subscription, room); const sub = await getSubscriptionByRoomId(tmp.rid);
tmp = await Encryption.decryptSubscription(tmp);
}
let sub;
try {
sub = await subCollection.find(tmp.rid);
} catch (error) {
// Do nothing
}
// If we're receiving a E2EKey of a room // If we're receiving a E2EKey of a room
if (sub && !sub.E2EKey && subscription?.E2EKey) { if (sub && !sub.E2EKey && subscription?.E2EKey) {
@ -151,12 +156,12 @@ const createOrUpdateSubscription = async (subscription, room) => {
e2eKeyId: sub.e2eKeyId e2eKeyId: sub.e2eKeyId
}); });
// Decrypt lastMessage using the received E2EKey // Decrypt lastMessage using the received E2EKey
tmp = await Encryption.decryptSubscription(tmp); tmp = (await Encryption.decryptSubscription(tmp)) as ISubscription;
// Decrypt all pending messages of this room in parallel // Decrypt all pending messages of this room in parallel
Encryption.decryptPendingMessages(tmp.rid); Encryption.decryptPendingMessages(tmp.rid);
} }
const batch = []; const batch: Model[] = [];
if (sub) { if (sub) {
try { try {
const update = sub.prepareUpdate(s => { const update = sub.prepareUpdate(s => {
@ -190,12 +195,7 @@ const createOrUpdateSubscription = async (subscription, room) => {
if (tmp.lastMessage && !rooms.includes(tmp.rid)) { if (tmp.lastMessage && !rooms.includes(tmp.rid)) {
const lastMessage = buildMessage(tmp.lastMessage); const lastMessage = buildMessage(tmp.lastMessage);
const messagesCollection = db.get('messages'); const messagesCollection = db.get('messages');
let messageRecord; const messageRecord = await getMessageById(lastMessage._id);
try {
messageRecord = await messagesCollection.find(lastMessage._id);
} catch (error) {
// Do nothing
}
if (messageRecord) { if (messageRecord) {
batch.push( batch.push(
@ -207,14 +207,16 @@ const createOrUpdateSubscription = async (subscription, room) => {
batch.push( batch.push(
messagesCollection.prepareCreate(m => { messagesCollection.prepareCreate(m => {
m._raw = sanitizedRaw({ id: lastMessage._id }, messagesCollection.schema); m._raw = sanitizedRaw({ id: lastMessage._id }, messagesCollection.schema);
m.subscription.id = lastMessage.rid; if (m.subscription) {
m.subscription.id = lastMessage.rid;
}
return Object.assign(m, lastMessage); return Object.assign(m, lastMessage);
}) })
); );
} }
} }
await db.action(async () => { await db.write(async () => {
await db.batch(...batch); await db.batch(...batch);
}); });
} catch (e) { } catch (e) {
@ -222,11 +224,11 @@ const createOrUpdateSubscription = async (subscription, room) => {
} }
}; };
const getSubQueueId = rid => `SUB-${rid}`; const getSubQueueId = (rid: string) => `SUB-${rid}`;
const getRoomQueueId = rid => `ROOM-${rid}`; const getRoomQueueId = (rid: string) => `ROOM-${rid}`;
const debouncedUpdate = subscription => { const debouncedUpdate = (subscription: ISubscription) => {
if (!subTimer) { if (!subTimer) {
subTimer = setTimeout(() => { subTimer = setTimeout(() => {
const batch = queue; const batch = queue;
@ -257,11 +259,11 @@ const debouncedUpdate = subscription => {
}; };
export default function subscribeRooms() { export default function subscribeRooms() {
const handleStreamMessageReceived = protectedFunction(async ddpMessage => { const handleStreamMessageReceived = protectedFunction(async (ddpMessage: IDDPMessage) => {
const db = database.active; const db = database.active;
// check if the server from variable is the same as the js sdk client // check if the server from variable is the same as the js sdk client
if (this.sdk && this.sdk.client && this.sdk.client.host !== subServer) { if (sdk && sdk.current.client && sdk.current.client.host !== subServer) {
return; return;
} }
if (ddpMessage.msg === 'added') { if (ddpMessage.msg === 'added') {
@ -274,7 +276,7 @@ export default function subscribeRooms() {
if (diff?.statusLivechat) { if (diff?.statusLivechat) {
store.dispatch(setUser({ statusLivechat: diff.statusLivechat })); store.dispatch(setUser({ statusLivechat: diff.statusLivechat }));
} }
if (['settings.preferences.showMessageInMainThread'] in diff) { if ((['settings.preferences.showMessageInMainThread'] as any) in diff) {
store.dispatch(setUser({ showMessageInMainThread: diff['settings.preferences.showMessageInMainThread'] })); store.dispatch(setUser({ showMessageInMainThread: diff['settings.preferences.showMessageInMainThread'] }));
} }
} }
@ -283,13 +285,19 @@ export default function subscribeRooms() {
try { try {
const subCollection = db.get('subscriptions'); const subCollection = db.get('subscriptions');
const sub = await subCollection.find(data.rid); const sub = await subCollection.find(data.rid);
const messages = await sub.messages.fetch(); // TODO - today the Relation type from watermelon just support one to one relations
const threads = await sub.threads.fetch(); // @ts-ignore
const threadMessages = await sub.threadMessages.fetch(); const messages = (await sub.messages.fetch()) as TMessageModel[];
const messagesToDelete = messages.map(m => m.prepareDestroyPermanently()); // @ts-ignore
const threadsToDelete = threads.map(m => m.prepareDestroyPermanently()); const threads = (await sub.threads.fetch()) as TThreadModel[];
const threadMessagesToDelete = threadMessages.map(m => m.prepareDestroyPermanently()); // @ts-ignore
await db.action(async () => { const threadMessages = (await sub.threadMessages.fetch()) as TThreadMessageModel[];
const messagesToDelete = messages?.map((m: TMessageModel) => m.prepareDestroyPermanently());
const threadsToDelete = threads?.map((m: TThreadModel) => m.prepareDestroyPermanently());
const threadMessagesToDelete = threadMessages?.map((m: TThreadMessageModel) => m.prepareDestroyPermanently());
await db.write(async () => {
await db.batch(sub.prepareDestroyPermanently(), ...messagesToDelete, ...threadsToDelete, ...threadMessagesToDelete); await db.batch(sub.prepareDestroyPermanently(), ...messagesToDelete, ...threadsToDelete, ...threadMessagesToDelete);
}); });
@ -318,13 +326,14 @@ export default function subscribeRooms() {
const [args] = ddpMessage.fields.args; const [args] = ddpMessage.fields.args;
const _id = random(17); const _id = random(17);
const message = { const message = {
// @ts-ignore
u: { u: {
_id, _id,
username: 'rocket.cat', username: 'rocket.cat',
name: 'Rocket Cat' name: 'Rocket Cat'
}, },
...buildMessage(EJSON.fromJSONValue(args)) ...buildMessage(EJSON.fromJSONValue(args))
}; } as IMessage;
await updateMessages({ rid: args.rid, update: [message] }); await updateMessages({ rid: args.rid, update: [message] });
} catch (e) { } catch (e) {
log(e); log(e);
@ -383,12 +392,12 @@ export default function subscribeRooms() {
} }
}; };
streamListener = this.sdk.onStreamData('stream-notify-user', handleStreamMessageReceived); streamListener = sdk.onStreamData('stream-notify-user', handleStreamMessageReceived);
try { try {
// set the server that started this task // set the server that started this task
subServer = this.sdk.client.host; subServer = sdk.current.client.host;
this.sdk.subscribeNotifyUser().catch(e => console.log(e)); sdk.current.subscribeNotifyUser().catch((e: unknown) => console.log(e));
return { return {
stop: () => stop() stop: () => stop()

View File

@ -119,7 +119,7 @@ export const getDiscussions = ({
count, count,
text text
}: { }: {
roomId: string | undefined; roomId: string;
text?: string | undefined; text?: string | undefined;
offset: number; offset: number;
count: number; count: number;