[FIX] Watermelon batches (#1277)

This commit is contained in:
Diego Mello 2019-10-08 09:36:15 -03:00 committed by GitHub
parent 08dac6ff86
commit 145e5c6b55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 286 additions and 131 deletions

View File

@ -91,22 +91,24 @@ class EmojiPicker extends Component {
_addFrequentlyUsed = protectedFunction(async(emoji) => { _addFrequentlyUsed = protectedFunction(async(emoji) => {
const db = database.active; const db = database.active;
const freqEmojiCollection = db.collections.get('frequently_used_emojis'); const freqEmojiCollection = db.collections.get('frequently_used_emojis');
await db.action(async() => { let freqEmojiRecord;
try { try {
const freqEmojiRecord = await freqEmojiCollection.find(emoji.content); freqEmojiRecord = await freqEmojiCollection.find(emoji.content);
} catch (error) {
// Do nothing
}
await db.action(async() => {
if (freqEmojiRecord) {
await freqEmojiRecord.update((f) => { await freqEmojiRecord.update((f) => {
f.count += 1; f.count += 1;
}); });
} catch (error) { } else {
try {
await freqEmojiCollection.create((f) => { await freqEmojiCollection.create((f) => {
f._raw = sanitizedRaw({ id: emoji.content }, freqEmojiCollection.schema); f._raw = sanitizedRaw({ id: emoji.content }, freqEmojiCollection.schema);
Object.assign(f, emoji); Object.assign(f, emoji);
f.count = 1; f.count = 1;
}); });
} catch (e) {
// Do nothing
}
} }
}); });
}) })

View File

@ -6,11 +6,13 @@ import RocketChat from '../lib/rocketchat';
import database from '../lib/database'; import database from '../lib/database';
import protectedFunction from '../lib/methods/helpers/protectedFunction'; import protectedFunction from '../lib/methods/helpers/protectedFunction';
import I18n from '../i18n'; import I18n from '../i18n';
import log from '../utils/log';
class MessageErrorActions extends React.Component { class MessageErrorActions extends React.Component {
static propTypes = { static propTypes = {
actionsHide: PropTypes.func.isRequired, actionsHide: PropTypes.func.isRequired,
message: PropTypes.object message: PropTypes.object,
tmid: PropTypes.string
}; };
// eslint-disable-next-line react/sort-comp // eslint-disable-next-line react/sort-comp
@ -27,17 +29,66 @@ class MessageErrorActions extends React.Component {
} }
handleResend = protectedFunction(async() => { handleResend = protectedFunction(async() => {
const { message } = this.props; const { message, tmid } = this.props;
await RocketChat.resendMessage(message); await RocketChat.resendMessage(message, tmid);
}); });
handleDelete = protectedFunction(async() => { handleDelete = async() => {
const { message } = this.props; try {
const { message, tmid } = this.props;
const db = database.active; const db = database.active;
await db.action(async() => { const deleteBatch = [];
await message.destroyPermanently(); const msgCollection = db.collections.get('messages');
}); const threadCollection = db.collections.get('threads');
// Delete the object (it can be Message or ThreadMessage instance)
deleteBatch.push(message.prepareDestroyPermanently());
// If it's a thread, we find and delete the whole tree, if necessary
if (tmid) {
try {
const msg = await msgCollection.find(message.id);
deleteBatch.push(msg.prepareDestroyPermanently());
} catch (error) {
// Do nothing: message not found
}
try {
// Find the thread header and update it
const msg = await msgCollection.find(tmid);
if (msg.tcount <= 1) {
deleteBatch.push(
msg.prepareUpdate((m) => {
m.tcount = null;
m.tlm = null;
}) })
);
try {
// If the whole thread was removed, delete the thread
const thread = await threadCollection.find(tmid);
deleteBatch.push(thread.prepareDestroyPermanently());
} catch (error) {
// Do nothing: thread not found
}
} else {
deleteBatch.push(
msg.prepareUpdate((m) => {
m.tcount -= 1;
})
);
}
} catch (error) {
// Do nothing: message not found
}
}
await db.action(async() => {
await db.batch(...deleteBatch);
});
} catch (e) {
log(e);
}
}
showActionSheet = () => { showActionSheet = () => {
ActionSheet.showActionSheetWithOptions({ ActionSheet.showActionSheetWithOptions({

View File

@ -52,7 +52,7 @@ MessageInner.displayName = 'MessageInner';
const Message = React.memo((props) => { const Message = React.memo((props) => {
if (props.isThreadReply || props.isThreadSequential || props.isInfo) { if (props.isThreadReply || props.isThreadSequential || props.isInfo) {
const thread = props.isThreadReply ? <RepliedThread isTemp={props.isTemp} {...props} /> : null; const thread = props.isThreadReply ? <RepliedThread {...props} /> : null;
return ( return (
<View style={[styles.container, props.style]}> <View style={[styles.container, props.style]}>
{thread} {thread}

View File

@ -9,9 +9,9 @@ import DisclosureIndicator from '../DisclosureIndicator';
import styles from './styles'; import styles from './styles';
const RepliedThread = React.memo(({ const RepliedThread = React.memo(({
tmid, tmsg, isHeader, isTemp, fetchThreadName, id tmid, tmsg, isHeader, fetchThreadName, id
}) => { }) => {
if (!tmid || !isHeader || isTemp) { if (!tmid || !isHeader) {
return null; return null;
} }
@ -40,9 +40,6 @@ const RepliedThread = React.memo(({
if (prevProps.isHeader !== nextProps.isHeader) { if (prevProps.isHeader !== nextProps.isHeader) {
return false; return false;
} }
if (prevProps.isTemp !== nextProps.isTemp) {
return false;
}
return true; return true;
}); });
@ -51,7 +48,6 @@ RepliedThread.propTypes = {
tmsg: PropTypes.string, tmsg: PropTypes.string,
id: PropTypes.string, id: PropTypes.string,
isHeader: PropTypes.bool, isHeader: PropTypes.bool,
isTemp: PropTypes.bool,
fetchThreadName: PropTypes.func fetchThreadName: PropTypes.func
}; };
RepliedThread.displayName = 'MessageRepliedThread'; RepliedThread.displayName = 'MessageRepliedThread';

View File

@ -3,12 +3,14 @@ import log from '../../utils/log';
export default async function readMessages(rid, lastOpen) { export default async function readMessages(rid, lastOpen) {
try { try {
// RC 0.61.0
const data = await this.sdk.post('subscriptions.read', { rid });
const db = database.active; const db = database.active;
const subscription = await db.collections.get('subscriptions').find(rid);
// RC 0.61.0
await this.sdk.post('subscriptions.read', { rid });
await db.action(async() => { await db.action(async() => {
try { try {
const subscription = await db.collections.get('subscriptions').find(rid);
await subscription.update((s) => { await subscription.update((s) => {
s.open = true; s.open = true;
s.alert = false; s.alert = false;
@ -22,7 +24,6 @@ export default async function readMessages(rid, lastOpen) {
// Do nothing // Do nothing
} }
}); });
return data;
} catch (e) { } catch (e) {
log(e); log(e);
} }

View File

@ -5,81 +5,155 @@ import database from '../database';
import log from '../../utils/log'; import log from '../../utils/log';
import random from '../../utils/random'; import random from '../../utils/random';
export const getMessage = async(rid, msg = '', tmid, user) => {
const _id = random(17);
const { id, username } = user;
try {
const db = database.active;
const msgCollection = db.collections.get('messages');
let message;
await db.action(async() => {
message = await msgCollection.create((m) => {
m._raw = sanitizedRaw({ id: _id }, msgCollection.schema);
m.subscription.id = rid;
m.msg = msg;
m.tmid = tmid;
m.ts = new Date();
m._updatedAt = new Date();
m.status = messagesStatus.TEMP;
m.u = {
_id: id || '1',
username
};
});
});
return message;
} catch (error) {
console.warn('getMessage', error);
}
};
export async function sendMessageCall(message) { export async function sendMessageCall(message) {
const { const {
id: _id, subscription: { id: rid }, msg, tmid id: _id, subscription: { id: rid }, msg, tmid
} = message; } = message;
try {
// RC 0.60.0 // RC 0.60.0
const data = await this.sdk.post('chat.sendMessage', { await this.sdk.post('chat.sendMessage', {
message: { message: {
_id, rid, msg, tmid _id, rid, msg, tmid
} }
}); });
return data; } catch (e) {
const db = database.active;
const msgCollection = db.collections.get('messages');
const threadMessagesCollection = db.collections.get('thread_messages');
const errorBatch = [];
const messageRecord = await msgCollection.find(_id);
errorBatch.push(
messageRecord.prepareUpdate((m) => {
m.status = messagesStatus.ERROR;
})
);
if (tmid) {
const threadMessageRecord = await threadMessagesCollection.find(_id);
errorBatch.push(
threadMessageRecord.prepareUpdate((tm) => {
tm.status = messagesStatus.ERROR;
})
);
}
await db.action(async() => {
await db.batch(...errorBatch);
});
}
} }
export default async function(rid, msg, tmid, user) { export default async function(rid, msg, tmid, user) {
try { try {
const db = database.active; const db = database.active;
const subsCollections = db.collections.get('subscriptions'); const subsCollection = db.collections.get('subscriptions');
const message = await getMessage(rid, msg, tmid, user); const msgCollection = db.collections.get('messages');
if (!message) { const threadCollection = db.collections.get('threads');
return; const threadMessagesCollection = db.collections.get('thread_messages');
} const messageId = random(17);
const batch = [];
const message = {
id: messageId, subscription: { id: rid }, msg, tmid
};
const messageDate = new Date();
let tMessageRecord;
// If it's replying to a thread
if (tmid) {
try {
// Find thread message header in Messages collection
tMessageRecord = await msgCollection.find(tmid);
batch.push(
tMessageRecord.prepareUpdate((m) => {
m.tlm = messageDate;
m.tcount += 1;
})
);
try { try {
const room = await subsCollections.find(rid); // Find thread message header in Threads collection
await db.action(async() => { await threadCollection.find(tmid);
await room.update((r) => { } catch (error) {
// If there's no record, create one
batch.push(
threadCollection.prepareCreate((tm) => {
tm._raw = sanitizedRaw({ id: tmid }, threadCollection.schema);
tm.subscription.id = rid;
tm.tmid = tmid;
tm.msg = tMessageRecord.msg;
tm.ts = tMessageRecord.ts;
tm._updatedAt = messageDate;
tm.status = messagesStatus.SENT; // Original message was sent already
tm.u = tMessageRecord.u;
})
);
}
// Create the message sent in ThreadMessages collection
batch.push(
threadMessagesCollection.prepareCreate((tm) => {
tm._raw = sanitizedRaw({ id: messageId }, threadMessagesCollection.schema);
tm.subscription.id = rid;
tm.rid = tmid;
tm.msg = msg;
tm.ts = messageDate;
tm._updatedAt = messageDate;
tm.status = messagesStatus.TEMP;
tm.u = {
_id: user.id || '1',
username: user.username
};
})
);
} catch (e) {
log(e);
}
}
// Create the message sent in Messages collection
batch.push(
msgCollection.prepareCreate((m) => {
m._raw = sanitizedRaw({ id: messageId }, msgCollection.schema);
m.subscription.id = rid;
m.msg = msg;
m.ts = messageDate;
m._updatedAt = messageDate;
m.status = messagesStatus.TEMP;
m.u = {
_id: user.id || '1',
username: user.username
};
if (tmid) {
m.tmid = tmid;
m.tlm = messageDate;
m.tmsg = tMessageRecord.msg;
}
})
);
try {
const room = await subsCollection.find(rid);
if (room.draftMessage) {
batch.push(
room.prepareUpdate((r) => {
r.draftMessage = null; r.draftMessage = null;
}); })
}); );
}
} catch (e) { } catch (e) {
// Do nothing // Do nothing
} }
try { try {
await sendMessageCall.call(this, message);
await db.action(async() => { await db.action(async() => {
await message.update((m) => { await db.batch(...batch);
m.status = messagesStatus.SENT;
});
}); });
} catch (e) { } catch (e) {
await db.action(async() => { log(e);
await message.update((m) => { return;
m.status = messagesStatus.ERROR;
});
});
} }
await sendMessageCall.call(this, message);
} catch (e) { } catch (e) {
log(e); log(e);
} }

View File

@ -8,6 +8,7 @@ import buildMessage from '../helpers/buildMessage';
import database from '../../database'; import database from '../../database';
import reduxStore from '../../createStore'; import reduxStore from '../../createStore';
import { addUserTyping, removeUserTyping, clearUserTyping } from '../../../actions/usersTyping'; import { addUserTyping, removeUserTyping, clearUserTyping } from '../../../actions/usersTyping';
import debounce from '../../../utils/debounce';
const unsubscribe = subscriptions => subscriptions.forEach(sub => sub.unsubscribe().catch(() => console.log('unsubscribeRoom'))); const unsubscribe = subscriptions => subscriptions.forEach(sub => sub.unsubscribe().catch(() => console.log('unsubscribeRoom')));
const removeListener = listener => listener.stop(); const removeListener = listener => listener.stop();
@ -85,6 +86,10 @@ export default function subscribeRoom({ rid }) {
} }
}); });
const read = debounce((lastOpen) => {
this.readMessages(rid, lastOpen);
}, 300);
const handleMessageReceived = protectedFunction((ddpMessage) => { const handleMessageReceived = protectedFunction((ddpMessage) => {
const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0])); const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0]));
const lastOpen = new Date(); const lastOpen = new Date();
@ -94,20 +99,26 @@ export default function subscribeRoom({ rid }) {
InteractionManager.runAfterInteractions(async() => { InteractionManager.runAfterInteractions(async() => {
const db = database.active; const db = database.active;
const batch = []; const batch = [];
const subCollection = db.collections.get('subscriptions');
const msgCollection = db.collections.get('messages'); const msgCollection = db.collections.get('messages');
const threadsCollection = db.collections.get('threads'); const threadsCollection = db.collections.get('threads');
const threadMessagesCollection = db.collections.get('thread_messages'); const threadMessagesCollection = db.collections.get('thread_messages');
let messageRecord;
let threadRecord;
let threadMessageRecord;
// Create or update message // Create or update message
try { try {
const messageRecord = await msgCollection.find(message._id); messageRecord = await msgCollection.find(message._id);
batch.push(
messageRecord.prepareUpdate((m) => {
Object.assign(m, message);
})
);
} catch (error) { } catch (error) {
// Do nothing
}
if (messageRecord) {
batch.push(
messageRecord.prepareUpdate(protectedFunction((m) => {
Object.assign(m, message);
}))
);
} else {
batch.push( batch.push(
msgCollection.prepareCreate(protectedFunction((m) => { msgCollection.prepareCreate(protectedFunction((m) => {
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema); m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
@ -120,13 +131,18 @@ export default function subscribeRoom({ rid }) {
// Create or update thread // Create or update thread
if (message.tlm) { if (message.tlm) {
try { try {
const threadRecord = await threadsCollection.find(message._id); threadRecord = await threadsCollection.find(message._id);
batch.push(
threadRecord.prepareUpdate((t) => {
Object.assign(t, message);
})
);
} catch (error) { } catch (error) {
// Do nothing
}
if (threadRecord) {
batch.push(
threadRecord.prepareUpdate(protectedFunction((t) => {
Object.assign(t, message);
}))
);
} else {
batch.push( batch.push(
threadsCollection.prepareCreate(protectedFunction((t) => { threadsCollection.prepareCreate(protectedFunction((t) => {
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema); t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
@ -140,15 +156,20 @@ export default function subscribeRoom({ rid }) {
// Create or update thread message // Create or update thread message
if (message.tmid) { if (message.tmid) {
try { try {
const threadMessageRecord = await threadMessagesCollection.find(message._id); threadMessageRecord = await threadMessagesCollection.find(message._id);
} catch (error) {
// Do nothing
}
if (threadMessageRecord) {
batch.push( batch.push(
threadMessageRecord.prepareUpdate((tm) => { threadMessageRecord.prepareUpdate(protectedFunction((tm) => {
Object.assign(tm, message); Object.assign(tm, message);
tm.rid = message.tmid; tm.rid = message.tmid;
delete tm.tmid; delete tm.tmid;
}) }))
); );
} catch (error) { } else {
batch.push( batch.push(
threadMessagesCollection.prepareCreate(protectedFunction((tm) => { threadMessagesCollection.prepareCreate(protectedFunction((tm) => {
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema); tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
@ -161,12 +182,7 @@ export default function subscribeRoom({ rid }) {
} }
} }
try { read(lastOpen);
await subCollection.find(rid);
this.readMessages(rid, lastOpen);
} catch (e) {
console.log('Subscription not found. We probably subscribed to a not joined channel. No need to mark as read.');
}
try { try {
await db.action(async() => { await db.action(async() => {

View File

@ -66,10 +66,10 @@ const createOrUpdateSubscription = async(subscription, room) => {
} catch (error) { } catch (error) {
try { try {
await db.action(async() => { await db.action(async() => {
await roomsCollection.create((r) => { await roomsCollection.create(protectedFunction((r) => {
r._raw = sanitizedRaw({ id: room._id }, roomsCollection.schema); r._raw = sanitizedRaw({ id: room._id }, roomsCollection.schema);
Object.assign(r, room); Object.assign(r, room);
}); }));
}); });
} catch (e) { } catch (e) {
// Do nothing // Do nothing
@ -96,19 +96,25 @@ const createOrUpdateSubscription = async(subscription, room) => {
const tmp = merge(subscription, room); const tmp = merge(subscription, room);
await db.action(async() => { await db.action(async() => {
let sub;
try { try {
const sub = await subCollection.find(tmp.rid); sub = await subCollection.find(tmp.rid);
await sub.update((s) => {
Object.assign(s, tmp);
});
} catch (error) { } catch (error) {
await subCollection.create((s) => { // Do nothing
}
if (sub) {
await sub.update(protectedFunction((s) => {
Object.assign(s, tmp);
}));
} else {
await subCollection.create(protectedFunction((s) => {
s._raw = sanitizedRaw({ id: tmp.rid }, subCollection.schema); s._raw = sanitizedRaw({ id: tmp.rid }, subCollection.schema);
Object.assign(s, tmp); Object.assign(s, tmp);
if (s.roomUpdatedAt) { if (s.roomUpdatedAt) {
s.roomUpdatedAt = new Date(); s.roomUpdatedAt = new Date();
} }
}); }));
} }
}); });
} catch (e) { } catch (e) {

View File

@ -39,7 +39,7 @@ import loadMessagesForRoom from './methods/loadMessagesForRoom';
import loadMissedMessages from './methods/loadMissedMessages'; import loadMissedMessages from './methods/loadMissedMessages';
import loadThreadMessages from './methods/loadThreadMessages'; import loadThreadMessages from './methods/loadThreadMessages';
import sendMessage, { getMessage, sendMessageCall } from './methods/sendMessage'; import sendMessage, { sendMessageCall } from './methods/sendMessage';
import { sendFileMessage, cancelUpload, isUploadActive } from './methods/sendFileMessage'; import { sendFileMessage, cancelUpload, isUploadActive } from './methods/sendFileMessage';
import callJitsi from './methods/callJitsi'; import callJitsi from './methods/callJitsi';
@ -427,11 +427,10 @@ const RocketChat = {
loadMissedMessages, loadMissedMessages,
loadMessagesForRoom, loadMessagesForRoom,
loadThreadMessages, loadThreadMessages,
getMessage,
sendMessage, sendMessage,
getRooms, getRooms,
readMessages, readMessages,
async resendMessage(message) { async resendMessage(message, tmid) {
const db = database.active; const db = database.active;
try { try {
await db.action(async() => { await db.action(async() => {
@ -439,18 +438,21 @@ const RocketChat = {
m.status = messagesStatus.TEMP; m.status = messagesStatus.TEMP;
}); });
}); });
await sendMessageCall.call(this, message); let m = {
} catch (error) { id: message.id,
try { msg: message.msg,
await db.action(async() => { subscription: { id: message.subscription.id }
await message.update((m) => { };
m.status = messagesStatus.ERROR; if (tmid) {
}); m = {
}); ...m,
tmid
};
}
await sendMessageCall.call(this, m);
} catch (e) { } catch (e) {
log(e); log(e);
} }
}
}, },
async search({ text, filterUsers = true, filterRooms = true }) { async search({ text, filterUsers = true, filterRooms = true }) {

View File

@ -775,6 +775,7 @@ class RoomView extends React.Component {
} }
{showErrorActions ? ( {showErrorActions ? (
<MessageErrorActions <MessageErrorActions
tmid={this.tmid}
message={selectedMessage} message={selectedMessage}
actionsHide={this.onErrorActionsHide} actionsHide={this.onErrorActionsHide}
/> />

View File

@ -198,7 +198,13 @@ class ShareListView extends React.Component {
const serversCollection = serversDB.collections.get('servers'); const serversCollection = serversDB.collections.get('servers');
this.servers = await serversCollection.query().fetch(); this.servers = await serversCollection.query().fetch();
this.chats = this.data.slice(0, LIMIT); this.chats = this.data.slice(0, LIMIT);
const serverInfo = await serversCollection.find(server); let serverInfo = {};
try {
serverInfo = await serversCollection.find(server);
} catch (error) {
// Do nothing
}
const canUploadFileResult = canUploadFile(fileInfo || fileData, serverInfo); const canUploadFileResult = canUploadFile(fileInfo || fileData, serverInfo);
this.internalSetState({ this.internalSetState({