verdnatura-chat/app/lib/methods/subscriptions/room.js

246 lines
6.5 KiB
JavaScript

import EJSON from 'ejson';
import { sanitizedRaw } from '@nozbe/watermelondb/RawRecord';
import { InteractionManager } from 'react-native';
import log from '../../../utils/log';
import protectedFunction from '../helpers/protectedFunction';
import buildMessage from '../helpers/buildMessage';
import database from '../../database';
import reduxStore from '../../createStore';
import { addUserTyping, removeUserTyping, clearUserTyping } from '../../../actions/usersTyping';
import debounce from '../../../utils/debounce';
const unsubscribe = (subscriptions = []) => Promise.all(subscriptions.map(sub => sub.unsubscribe));
const removeListener = listener => listener.stop();
let promises;
let connectedListener;
let disconnectedListener;
let notifyRoomListener;
let messageReceivedListener;
export default function subscribeRoom({ rid }) {
console.log(`[RCRN] Subscribed to room ${ rid }`);
const handleConnection = () => {
this.loadMissedMessages({ rid }).catch(e => console.log(e));
};
const handleNotifyRoomReceived = protectedFunction((ddpMessage) => {
const [_rid, ev] = ddpMessage.fields.eventName.split('/');
if (rid !== _rid) {
return;
}
if (ev === 'typing') {
const [username, typing] = ddpMessage.fields.args;
if (typing) {
reduxStore.dispatch(addUserTyping(username));
} else {
reduxStore.dispatch(removeUserTyping(username));
}
} else if (ev === 'deleteMessage') {
InteractionManager.runAfterInteractions(async() => {
if (ddpMessage && ddpMessage.fields && ddpMessage.fields.args.length > 0) {
try {
const { _id } = ddpMessage.fields.args[0];
const db = database.active;
const msgCollection = db.collections.get('messages');
const threadsCollection = db.collections.get('threads');
const threadMessagesCollection = db.collections.get('thread_messages');
let deleteMessage;
let deleteThread;
let deleteThreadMessage;
// Delete message
try {
const m = await msgCollection.find(_id);
deleteMessage = m.prepareDestroyPermanently();
} catch (e) {
// Do nothing
}
// Delete thread
try {
const m = await threadsCollection.find(_id);
deleteThread = m.prepareDestroyPermanently();
} catch (e) {
// Do nothing
}
// Delete thread message
try {
const m = await threadMessagesCollection.find(_id);
deleteThreadMessage = m.prepareDestroyPermanently();
} catch (e) {
// Do nothing
}
await db.action(async() => {
await db.batch(
deleteMessage, deleteThread, deleteThreadMessage
);
});
} catch (e) {
log(e);
}
}
});
}
});
const read = debounce((lastOpen) => {
this.readMessages(rid, lastOpen);
}, 300);
const handleMessageReceived = protectedFunction((ddpMessage) => {
const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0]));
const lastOpen = new Date();
if (rid !== message.rid) {
return;
}
InteractionManager.runAfterInteractions(async() => {
const db = database.active;
const batch = [];
const msgCollection = db.collections.get('messages');
const threadsCollection = db.collections.get('threads');
const threadMessagesCollection = db.collections.get('thread_messages');
let messageRecord;
let threadRecord;
let threadMessageRecord;
// Create or update message
try {
messageRecord = await msgCollection.find(message._id);
} catch (error) {
// Do nothing
}
if (messageRecord) {
try {
const update = messageRecord.prepareUpdate((m) => {
Object.assign(m, message);
});
batch.push(update);
} catch (e) {
console.log(e);
}
} else {
batch.push(
msgCollection.prepareCreate(protectedFunction((m) => {
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
m.subscription.id = rid;
Object.assign(m, message);
}))
);
}
// Create or update thread
if (message.tlm) {
try {
threadRecord = await threadsCollection.find(message._id);
} catch (error) {
// Do nothing
}
if (threadRecord) {
batch.push(
threadRecord.prepareUpdate(protectedFunction((t) => {
Object.assign(t, message);
}))
);
} else {
batch.push(
threadsCollection.prepareCreate(protectedFunction((t) => {
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
t.subscription.id = rid;
Object.assign(t, message);
}))
);
}
}
// Create or update thread message
if (message.tmid) {
try {
threadMessageRecord = await threadMessagesCollection.find(message._id);
} catch (error) {
// Do nothing
}
if (threadMessageRecord) {
batch.push(
threadMessageRecord.prepareUpdate(protectedFunction((tm) => {
Object.assign(tm, message);
tm.rid = message.tmid;
delete tm.tmid;
}))
);
} else {
batch.push(
threadMessagesCollection.prepareCreate(protectedFunction((tm) => {
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
Object.assign(tm, message);
tm.subscription.id = rid;
tm.rid = message.tmid;
delete tm.tmid;
}))
);
}
}
read(lastOpen);
try {
await db.action(async() => {
await db.batch(...batch);
});
} catch (e) {
log(e);
}
});
});
const stop = async() => {
let params;
if (promises) {
try {
params = await promises;
await unsubscribe(params);
} catch (error) {
// Do nothing
}
promises = false;
}
if (connectedListener) {
params = await connectedListener;
removeListener(params);
connectedListener = false;
}
if (disconnectedListener) {
params = await disconnectedListener;
removeListener(params);
disconnectedListener = false;
}
if (notifyRoomListener) {
params = await notifyRoomListener;
removeListener(params);
notifyRoomListener = false;
}
if (messageReceivedListener) {
params = await messageReceivedListener;
removeListener(params);
messageReceivedListener = false;
}
reduxStore.dispatch(clearUserTyping());
};
connectedListener = this.sdk.onStreamData('connected', handleConnection);
disconnectedListener = this.sdk.onStreamData('close', handleConnection);
notifyRoomListener = this.sdk.onStreamData('stream-notify-room', handleNotifyRoomReceived);
messageReceivedListener = this.sdk.onStreamData('stream-room-messages', handleMessageReceived);
promises = this.sdk.subscribeRoom(rid);
return {
stop: () => stop()
};
}