diff --git a/app/definitions/IDDPMessage.ts b/app/definitions/IDDPMessage.ts new file mode 100644 index 000000000..7b6ad822a --- /dev/null +++ b/app/definitions/IDDPMessage.ts @@ -0,0 +1,7 @@ +export interface IDDPMessage { + msg: string; + fields: { + eventName: string; + args: any; + }; +} diff --git a/app/definitions/IRoom.ts b/app/definitions/IRoom.ts index ed215e5f9..f63445a5e 100644 --- a/app/definitions/IRoom.ts +++ b/app/definitions/IRoom.ts @@ -15,7 +15,7 @@ interface IRequestTranscript { } export interface IRoom { - _id?: string; + _id: string; fname?: string; id: string; rid: string; diff --git a/app/lib/methods/helpers/buildMessage.ts b/app/lib/methods/helpers/buildMessage.ts index b17574c97..a0fc06013 100644 --- a/app/lib/methods/helpers/buildMessage.ts +++ b/app/lib/methods/helpers/buildMessage.ts @@ -1,8 +1,8 @@ -import { IMessage, IThreadResult } from '../../../definitions'; +import { ILastMessage, IMessage, IThreadResult } from '../../../definitions'; import messagesStatus from '../../../constants/messagesStatus'; import normalizeMessage from './normalizeMessage'; -export default (message: Partial | IThreadResult): Partial | IThreadResult => { +export default (message: Partial | IThreadResult | ILastMessage): IMessage | IThreadResult => { message.status = messagesStatus.SENT; return normalizeMessage(message); }; diff --git a/app/lib/methods/helpers/mergeSubscriptionsRooms.ts b/app/lib/methods/helpers/mergeSubscriptionsRooms.ts index 4cca2707d..da2485ee8 100644 --- a/app/lib/methods/helpers/mergeSubscriptionsRooms.ts +++ b/app/lib/methods/helpers/mergeSubscriptionsRooms.ts @@ -5,12 +5,19 @@ import { store as reduxStore } from '../../auxStore'; import { compareServerVersion } from '../../utils'; import findSubscriptionsRooms from './findSubscriptionsRooms'; import normalizeMessage from './normalizeMessage'; -import { ISubscription, IServerRoom, IServerSubscription, IServerSubscriptionItem, IServerRoomItem } from '../../../definitions'; +import { + ISubscription, + IServerRoom, + IServerSubscription, + IServerSubscriptionItem, + IServerRoomItem, + IRoom +} from '../../../definitions'; // TODO: delete and update export const merge = ( subscription: ISubscription | IServerSubscriptionItem, - room?: ISubscription | IServerRoomItem + room?: ISubscription | IServerRoomItem | IRoom ): ISubscription => { const serverVersion = reduxStore.getState().server.version as string; subscription = EJSON.fromJSONValue(subscription) as ISubscription; diff --git a/app/lib/methods/subscriptions/rooms.js b/app/lib/methods/subscriptions/rooms.ts similarity index 77% rename from app/lib/methods/subscriptions/rooms.js rename to app/lib/methods/subscriptions/rooms.ts index 14cf75d5c..ababd496b 100644 --- a/app/lib/methods/subscriptions/rooms.js +++ b/app/lib/methods/subscriptions/rooms.ts @@ -1,11 +1,11 @@ import { sanitizedRaw } from '@nozbe/watermelondb/RawRecord'; import { InteractionManager } from 'react-native'; import EJSON from 'ejson'; +import Model from '@nozbe/watermelondb/Model'; import database from '../../database'; import { merge } from '../helpers/mergeSubscriptionsRooms'; import protectedFunction from '../helpers/protectedFunction'; -import messagesStatus from '../../../constants/messagesStatus'; import log from '../../../utils/log'; import random from '../../../utils/random'; import { store } from '../../auxStore'; @@ -19,16 +19,29 @@ import { INAPP_NOTIFICATION_EMITTER } from '../../../containers/InAppNotificatio import { Encryption } from '../../encryption'; import { E2E_MESSAGE_TYPE } from '../../encryption/constants'; import updateMessages from '../updateMessages'; +import { + IMessage, + IRoom, + ISubscription, + TMessageModel, + TRoomModel, + TThreadMessageModel, + TThreadModel +} from '../../../definitions'; +import sdk from '../../rocketchat/services/sdk'; +import { IDDPMessage } from '../../../definitions/IDDPMessage'; +import { getSubscriptionByRoomId } from '../../database/services/Subscription'; +import { getMessageById } from '../../database/services/Message'; -const removeListener = listener => listener.stop(); +const removeListener = (listener: { stop: () => void }) => listener.stop(); -let streamListener; -let subServer; -let queue = {}; -let subTimer = null; +let streamListener: Promise | false; +let subServer: string; +let queue: { [key: string]: ISubscription } = {}; +let subTimer: number | null | false = null; const WINDOW_TIME = 500; -const createOrUpdateSubscription = async (subscription, room) => { +const createOrUpdateSubscription = async (subscription: ISubscription, room: IRoom | ISubscription) => { try { const db = database.active; const subCollection = db.get('subscriptions'); @@ -86,12 +99,12 @@ const createOrUpdateSubscription = async (subscription, room) => { e2eKeyId: s.e2eKeyId, E2EKey: s.E2EKey, avatarETag: s.avatarETag - }; + } as ISubscription; } catch (error) { try { - await db.action(async () => { + await db.write(async () => { await roomsCollection.create( - protectedFunction(r => { + protectedFunction((r: TRoomModel) => { r._raw = sanitizedRaw({ id: room._id }, roomsCollection.schema); Object.assign(r, room); }) @@ -121,23 +134,15 @@ const createOrUpdateSubscription = async (subscription, room) => { departmentId: r.departmentId, livechatData: r.livechatData, avatarETag: r.avatarETag - }; + } as IRoom; } catch (error) { // Do nothing } } - let tmp; - if (subscription) { - tmp = merge(subscription, room); - tmp = await Encryption.decryptSubscription(tmp); - } - let sub; - try { - sub = await subCollection.find(tmp.rid); - } catch (error) { - // Do nothing - } + let tmp = merge(subscription, room); + tmp = (await Encryption.decryptSubscription(tmp)) as ISubscription; + const sub = await getSubscriptionByRoomId(tmp.rid); // If we're receiving a E2EKey of a room if (sub && !sub.E2EKey && subscription?.E2EKey) { @@ -151,12 +156,12 @@ const createOrUpdateSubscription = async (subscription, room) => { e2eKeyId: sub.e2eKeyId }); // Decrypt lastMessage using the received E2EKey - tmp = await Encryption.decryptSubscription(tmp); + tmp = (await Encryption.decryptSubscription(tmp)) as ISubscription; // Decrypt all pending messages of this room in parallel Encryption.decryptPendingMessages(tmp.rid); } - const batch = []; + const batch: Model[] = []; if (sub) { try { const update = sub.prepareUpdate(s => { @@ -190,12 +195,7 @@ const createOrUpdateSubscription = async (subscription, room) => { if (tmp.lastMessage && !rooms.includes(tmp.rid)) { const lastMessage = buildMessage(tmp.lastMessage); const messagesCollection = db.get('messages'); - let messageRecord; - try { - messageRecord = await messagesCollection.find(lastMessage._id); - } catch (error) { - // Do nothing - } + const messageRecord = await getMessageById(lastMessage._id); if (messageRecord) { batch.push( @@ -207,14 +207,16 @@ const createOrUpdateSubscription = async (subscription, room) => { batch.push( messagesCollection.prepareCreate(m => { m._raw = sanitizedRaw({ id: lastMessage._id }, messagesCollection.schema); - m.subscription.id = lastMessage.rid; + if (m.subscription) { + m.subscription.id = lastMessage.rid; + } return Object.assign(m, lastMessage); }) ); } } - await db.action(async () => { + await db.write(async () => { await db.batch(...batch); }); } catch (e) { @@ -222,11 +224,11 @@ const createOrUpdateSubscription = async (subscription, room) => { } }; -const getSubQueueId = rid => `SUB-${rid}`; +const getSubQueueId = (rid: string) => `SUB-${rid}`; -const getRoomQueueId = rid => `ROOM-${rid}`; +const getRoomQueueId = (rid: string) => `ROOM-${rid}`; -const debouncedUpdate = subscription => { +const debouncedUpdate = (subscription: ISubscription) => { if (!subTimer) { subTimer = setTimeout(() => { const batch = queue; @@ -257,11 +259,11 @@ const debouncedUpdate = subscription => { }; export default function subscribeRooms() { - const handleStreamMessageReceived = protectedFunction(async ddpMessage => { + const handleStreamMessageReceived = protectedFunction(async (ddpMessage: IDDPMessage) => { const db = database.active; // check if the server from variable is the same as the js sdk client - if (this.sdk && this.sdk.client && this.sdk.client.host !== subServer) { + if (sdk && sdk.current.client && sdk.current.client.host !== subServer) { return; } if (ddpMessage.msg === 'added') { @@ -274,7 +276,7 @@ export default function subscribeRooms() { if (diff?.statusLivechat) { store.dispatch(setUser({ statusLivechat: diff.statusLivechat })); } - if (['settings.preferences.showMessageInMainThread'] in diff) { + if ((['settings.preferences.showMessageInMainThread'] as any) in diff) { store.dispatch(setUser({ showMessageInMainThread: diff['settings.preferences.showMessageInMainThread'] })); } } @@ -283,13 +285,19 @@ export default function subscribeRooms() { try { const subCollection = db.get('subscriptions'); const sub = await subCollection.find(data.rid); - const messages = await sub.messages.fetch(); - const threads = await sub.threads.fetch(); - const threadMessages = await sub.threadMessages.fetch(); - const messagesToDelete = messages.map(m => m.prepareDestroyPermanently()); - const threadsToDelete = threads.map(m => m.prepareDestroyPermanently()); - const threadMessagesToDelete = threadMessages.map(m => m.prepareDestroyPermanently()); - await db.action(async () => { + // TODO - today the Relation type from watermelon just support one to one relations + // @ts-ignore + const messages = (await sub.messages.fetch()) as TMessageModel[]; + // @ts-ignore + const threads = (await sub.threads.fetch()) as TThreadModel[]; + // @ts-ignore + const threadMessages = (await sub.threadMessages.fetch()) as TThreadMessageModel[]; + + const messagesToDelete = messages?.map((m: TMessageModel) => m.prepareDestroyPermanently()); + const threadsToDelete = threads?.map((m: TThreadModel) => m.prepareDestroyPermanently()); + const threadMessagesToDelete = threadMessages?.map((m: TThreadMessageModel) => m.prepareDestroyPermanently()); + + await db.write(async () => { await db.batch(sub.prepareDestroyPermanently(), ...messagesToDelete, ...threadsToDelete, ...threadMessagesToDelete); }); @@ -318,13 +326,14 @@ export default function subscribeRooms() { const [args] = ddpMessage.fields.args; const _id = random(17); const message = { + // @ts-ignore u: { _id, username: 'rocket.cat', name: 'Rocket Cat' }, ...buildMessage(EJSON.fromJSONValue(args)) - }; + } as IMessage; await updateMessages({ rid: args.rid, update: [message] }); } catch (e) { log(e); @@ -383,12 +392,12 @@ export default function subscribeRooms() { } }; - streamListener = this.sdk.onStreamData('stream-notify-user', handleStreamMessageReceived); + streamListener = sdk.onStreamData('stream-notify-user', handleStreamMessageReceived); try { // set the server that started this task - subServer = this.sdk.client.host; - this.sdk.subscribeNotifyUser().catch(e => console.log(e)); + subServer = sdk.current.client.host; + sdk.current.subscribeNotifyUser().catch((e: unknown) => console.log(e)); return { stop: () => stop() diff --git a/app/lib/rocketchat/services/restApi.ts b/app/lib/rocketchat/services/restApi.ts index e9e7937d9..34cd2acea 100644 --- a/app/lib/rocketchat/services/restApi.ts +++ b/app/lib/rocketchat/services/restApi.ts @@ -119,7 +119,7 @@ export const getDiscussions = ({ count, text }: { - roomId: string | undefined; + roomId: string; text?: string | undefined; offset: number; count: number;