var assert = require('assert');
var async = require('async');
var loopback = require('../');
var Change = loopback.Change;
var defineModelTestsWithDataSource = require('./util/model-tests');
var PersistedModel = loopback.PersistedModel;
var expect = require('chai').expect;
var debug = require('debug')('test');

describe('Replication / Change APIs', function() {
  var dataSource, SourceModel, TargetModel;
  var useSinceFilter;
  var tid = 0; // per-test unique id used e.g. to build unique model names

  beforeEach(function() {
    tid++;
    useSinceFilter = false;
    var test = this;
    dataSource = this.dataSource = loopback.createDataSource({
      connector: loopback.Memory
    });
    SourceModel = this.SourceModel = PersistedModel.extend(
      'SourceModel-' + tid,
      { id: { id: true, type: String, defaultFn: 'guid' } },
      { trackChanges: true });

    SourceModel.attachTo(dataSource);

    TargetModel = this.TargetModel = PersistedModel.extend(
      'TargetModel-' + tid,
      { id: { id: true, type: String, defaultFn: 'guid' } },
      { trackChanges: true });

    // NOTE(bajtos) At the moment, all models share the same Checkpoint
    // model. This causes the in-process replication to work differently
    // than client-server replication.
    // As a workaround, we manually setup unique Checkpoint for TargetModel.
    var TargetChange = TargetModel.Change;
    TargetChange.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint');
    TargetChange.Checkpoint.attachTo(dataSource);

    TargetModel.attachTo(dataSource);

    test.startingCheckpoint = -1;

    this.createInitalData = function(cb) {
      SourceModel.create({name: 'foo'}, function(err, inst) {
        if (err) return cb(err);
        test.model = inst;
        SourceModel.replicate(TargetModel, cb);
      });
    };
  });

  describe('Model.changes(since, filter, callback)', function() {
    it('Get changes since the given checkpoint', function(done) {
      var test = this;
      this.SourceModel.create({name: 'foo'}, function(err) {
        if (err) return done(err);
        setTimeout(function() {
          test.SourceModel.changes(test.startingCheckpoint, {}, function(err, changes) {
            assert.equal(changes.length, 1);
            done();
          });
        }, 1);
      });
    });

    it('excludes changes from older checkpoints', function(done) {
      var FUTURE_CHECKPOINT = 999;

      SourceModel.create({ name: 'foo' }, function(err) {
        if (err) return done(err);
        SourceModel.changes(FUTURE_CHECKPOINT, {}, function(err, changes) {
          if (err) return done(err);
          /*jshint -W030 */
          expect(changes).to.be.empty;
          done();
        });
      });
    });
  });

  describe('Model.replicate(since, targetModel, options, callback)', function() {
    it('Replicate data using the target model', function(done) {
      var test = this;
      var options = {};
      var sourceData;
      var targetData;

      this.SourceModel.create({name: 'foo'}, function(err) {
        if (err) return done(err);
        test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
        options, function(err, conflicts) {
          if (err) return done(err);
          assert(conflicts.length === 0);
          async.parallel([
            function(cb) {
              test.SourceModel.find(function(err, result) {
                if (err) return cb(err);
                sourceData = result;
                cb();
              });
            },
            function(cb) {
              test.TargetModel.find(function(err, result) {
                if (err) return cb(err);
                targetData = result;
                cb();
              });
            }
          ], function(err) {
            if (err) return done(err);

            assert.deepEqual(sourceData, targetData);
            done();
          });
        });
      });
    });

    it('applies "since" filter on source changes', function(done) {
      async.series([
        function createModelInSourceCp1(next) {
          SourceModel.create({ id: '1' }, next);
        },
        function checkpoint(next) {
          SourceModel.checkpoint(next);
        },
        function createModelInSourceCp2(next) {
          SourceModel.create({ id: '2' }, next);
        },
        function replicateLastChangeOnly(next) {
          SourceModel.currentCheckpoint(function(err, cp) {
            if (err) return done(err);
            SourceModel.replicate(cp, TargetModel, next);
          });
        },
        function verify(next) {
          TargetModel.find(function(err, list) {
            if (err) return done(err);
            // '1' should be skipped by replication
            expect(getIds(list)).to.eql(['2']);
            next();
          });
        }
      ], done);
    });

    it('applies "since" filter on target changes', function(done) {
      // Because the "since" filter is just an optimization,
      // there isn't really any observable behaviour we could
      // check to assert correct implementation.
      var diffSince = [];
      spyAndStoreSinceArg(TargetModel, 'diff', diffSince);

      SourceModel.replicate(10, TargetModel, function(err) {
        if (err) return done(err);
        expect(diffSince).to.eql([10]);
        done();
      });
    });

    it('uses different "since" value for source and target', function(done) {
      var sourceSince = [];
      var targetSince = [];

      spyAndStoreSinceArg(SourceModel, 'changes', sourceSince);
      spyAndStoreSinceArg(TargetModel, 'diff', targetSince);

      var since = { source: 1, target: 2 };
      SourceModel.replicate(since, TargetModel, function(err) {
        if (err) return done(err);
        expect(sourceSince).to.eql([1]);
        expect(targetSince).to.eql([2]);
        done();
      });
    });

    it('picks up changes made during replication', function(done) {
      setupRaceConditionInReplication(function(cb) {
        // simulate the situation when another model is created
        // while a replication run is in progress
        SourceModel.create({ id: 'racer' }, cb);
      });

      var lastCp;
      async.series([
        function buildSomeDataToReplicate(next) {
          SourceModel.create({ id: 'init' }, next);
        },
        function getLastCp(next) {
          SourceModel.currentCheckpoint(function(err, cp) {
            if (err) return done(err);
            lastCp = cp;
            next();
          });
        },
        function replicate(next) {
          SourceModel.replicate(TargetModel, next);
        },
        function verifyAssumptions(next) {
          SourceModel.find(function(err, list) {
            expect(getIds(list), 'source ids')
              .to.eql(['init', 'racer']);

            TargetModel.find(function(err, list) {
              expect(getIds(list), 'target ids after first sync')
                .to.include.members(['init']);
              next();
            });
          });
        },
        function replicateAgain(next) {
          SourceModel.replicate(lastCp + 1, TargetModel, next);
        },
        function verify(next) {
          TargetModel.find(function(err, list) {
            expect(getIds(list), 'target ids').to.eql(['init', 'racer']);
            next();
          });
        }
      ], done);
    });

    it('returns new current checkpoints to callback', function(done) {
      var sourceCp, targetCp;
      async.series([
        bumpSourceCheckpoint,
        bumpTargetCheckpoint,
        bumpTargetCheckpoint,
        function replicate(cb) {
          expect(sourceCp).to.not.equal(targetCp);

          SourceModel.replicate(
            TargetModel,
            function(err, conflicts, newCheckpoints) {
              if (err) return cb(err);
              expect(conflicts, 'conflicts').to.eql([]);
              expect(newCheckpoints, 'currentCheckpoints').to.eql({
                source: sourceCp + 1,
                target: targetCp + 1
              });
              cb();
            });
        }
      ], done);

      function bumpSourceCheckpoint(cb) {
        SourceModel.checkpoint(function(err, inst) {
          if (err) return cb(err);
          sourceCp = inst.seq;
          cb();
        });
      }

      function bumpTargetCheckpoint(cb) {
        TargetModel.checkpoint(function(err, inst) {
          if (err) return cb(err);
          targetCp = inst.seq;
          cb();
        });
      }
    });

    it('leaves current target checkpoint empty', function(done) {
      async.series([
        function createTestData(next) {
          SourceModel.create({}, next);
        },
        replicateExpectingSuccess(),
        function verify(next) {
          TargetModel.currentCheckpoint(function(err, cp) {
            if (err) return next(err);
            TargetModel.getChangeModel().find(
              { where: { checkpoint: { gte: cp } } },
              function(err, changes) {
                if (err) return done(err);
                expect(changes).to.have.length(0);
                done();
              });
          });
        }
      ], done);
    });

    describe('with 3rd-party changes', function() {
      it('detects UPDATE made during UPDATE', function(done) {
        async.series([
          createModel(SourceModel, { id: '1' }),
          replicateExpectingSuccess(),
          function updateModel(next) {
            SourceModel.updateAll({ id: '1' }, { name: 'source' }, next);
          },
          function replicateWith3rdPartyModifyingData(next) {
            setupRaceConditionInReplication(function(cb) {
              TargetModel.dataSource.connector.updateAttributes(
                TargetModel.modelName,
                '1',
                { name: '3rd-party' },
                cb);
            });

            SourceModel.replicate(
              TargetModel,
              function(err, conflicts, cps, updates) {
                if (err) return next(err);

                var conflictedIds = getPropValue(conflicts || [], 'modelId');
                expect(conflictedIds).to.eql(['1']);

                // resolve the conflict using ours
                conflicts[0].resolve(next);
              });
          },

          replicateExpectingSuccess(),
          verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
        ], done);
      });

      it('detects CREATE made during CREATE', function(done) {
        async.series([
          // FIXME(bajtos) Remove the 'name' property once the implementation
          // of UPDATE is fixed to correctly remove properties
          createModel(SourceModel, { id: '1', name: 'source' }),
          function replicateWith3rdPartyModifyingData(next) {
            setupRaceConditionInReplication(function(cb) {
              TargetModel.dataSource.connector.create(
                TargetModel.modelName,
                { id: '1', name: '3rd-party' },
                cb);
            });

            SourceModel.replicate(
              TargetModel,
              function(err, conflicts, cps, updates) {
                if (err) return next(err);

                var conflictedIds = getPropValue(conflicts || [], 'modelId');
                expect(conflictedIds).to.eql(['1']);

                // resolve the conflict using ours
                conflicts[0].resolve(next);
              });
          },

          replicateExpectingSuccess(),
          verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
        ], done);
      });

      it('detects UPDATE made during DELETE', function(done) {
        async.series([
          createModel(SourceModel, { id: '1' }),
          replicateExpectingSuccess(),
          function deleteModel(next) {
            SourceModel.deleteById('1', next);
          },
          function replicateWith3rdPartyModifyingData(next) {
            setupRaceConditionInReplication(function(cb) {
              TargetModel.dataSource.connector.updateAttributes(
                TargetModel.modelName,
                '1',
                { name: '3rd-party' },
                cb);
            });

            SourceModel.replicate(
              TargetModel,
              function(err, conflicts, cps, updates) {
                if (err) return next(err);

                var conflictedIds = getPropValue(conflicts || [], 'modelId');
                expect(conflictedIds).to.eql(['1']);

                // resolve the conflict using ours
                conflicts[0].resolve(next);
              });
          },

          replicateExpectingSuccess(),
          verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
        ], done);
      });

      it('handles DELETE made during DELETE', function(done) {
        async.series([
          createModel(SourceModel, { id: '1' }),
          replicateExpectingSuccess(),
          function deleteModel(next) {
            SourceModel.deleteById('1', next);
          },
          function setup3rdPartyModifyingData(next) {
            setupRaceConditionInReplication(function(cb) {
              TargetModel.dataSource.connector.destroy(
                TargetModel.modelName,
                '1',
                cb);
            });
            next();
          },
          replicateExpectingSuccess(),
          verifyInstanceWasReplicated(SourceModel, TargetModel, '1')
        ], done);
      });
    });
  });

  describe('conflict detection - both updated', function() {
    beforeEach(function(done) {
      var SourceModel = this.SourceModel;
      var TargetModel = this.TargetModel;
      var test = this;

      test.createInitalData(createConflict);

      function createConflict(err, conflicts) {
        async.parallel([
          function(cb) {
            SourceModel.findOne(function(err, inst) {
              if (err) return cb(err);
              inst.name = 'source update';
              inst.save(cb);
            });
          },
          function(cb) {
            TargetModel.findOne(function(err, inst) {
              if (err) return cb(err);
              inst.name = 'target update';
              inst.save(cb);
            });
          }
        ], function(err) {
          if (err) return done(err);
          SourceModel.replicate(TargetModel, function(err, conflicts) {
            if (err) return done(err);
            test.conflicts = conflicts;
            test.conflict = conflicts[0];
            done();
          });
        });
      }
    });
    it('should detect a single conflict', function() {
      assert.equal(this.conflicts.length, 1);
      assert(this.conflict);
    });
    it('type should be UPDATE', function(done) {
      this.conflict.type(function(err, type) {
        assert.equal(type, Change.UPDATE);
        done();
      });
    });
    it('conflict.changes()', function(done) {
      var test = this;
      this.conflict.changes(function(err, sourceChange, targetChange) {
        assert.equal(typeof sourceChange.id, 'string');
        assert.equal(typeof targetChange.id, 'string');
        assert.equal(test.model.getId(), sourceChange.getModelId());
        assert.equal(sourceChange.type(), Change.UPDATE);
        assert.equal(targetChange.type(), Change.UPDATE);
        done();
      });
    });
    it('conflict.models()', function(done) {
      var test = this;
      this.conflict.models(function(err, source, target) {
        assert.deepEqual(source.toJSON(), {
          id: test.model.id,
          name: 'source update'
        });
        assert.deepEqual(target.toJSON(), {
          id: test.model.id,
          name: 'target update'
        });
        done();
      });
    });
  });

  describe('conflict detection - source deleted', function() {
    beforeEach(function(done) {
      var SourceModel = this.SourceModel;
      var TargetModel = this.TargetModel;
      var test = this;

      test.createInitalData(createConflict);

      function createConflict() {
        async.parallel([
          function(cb) {
            SourceModel.findOne(function(err, inst) {
              if (err) return cb(err);
              test.model = inst;
              inst.remove(cb);
            });
          },
          function(cb) {
            TargetModel.findOne(function(err, inst) {
              if (err) return cb(err);
              inst.name = 'target update';
              inst.save(cb);
            });
          }
        ], function(err) {
          if (err) return done(err);
          SourceModel.replicate(TargetModel, function(err, conflicts) {
            if (err) return done(err);
            test.conflicts = conflicts;
            test.conflict = conflicts[0];
            done();
          });
        });
      }
    });
    it('should detect a single conflict', function() {
      assert.equal(this.conflicts.length, 1);
      assert(this.conflict);
    });
    it('type should be DELETE', function(done) {
      this.conflict.type(function(err, type) {
        assert.equal(type, Change.DELETE);
        done();
      });
    });
    it('conflict.changes()', function(done) {
      var test = this;
      this.conflict.changes(function(err, sourceChange, targetChange) {
        assert.equal(typeof sourceChange.id, 'string');
        assert.equal(typeof targetChange.id, 'string');
        assert.equal(test.model.getId(), sourceChange.getModelId());
        assert.equal(sourceChange.type(), Change.DELETE);
        assert.equal(targetChange.type(), Change.UPDATE);
        done();
      });
    });
    it('conflict.models()', function(done) {
      var test = this;
      this.conflict.models(function(err, source, target) {
        assert.equal(source, null);
        assert.deepEqual(target.toJSON(), {
          id: test.model.id,
          name: 'target update'
        });
        done();
      });
    });
  });

  describe('conflict detection - target deleted', function() {
    beforeEach(function(done) {
      var SourceModel = this.SourceModel;
      var TargetModel = this.TargetModel;
      var test = this;

      test.createInitalData(createConflict);

      function createConflict() {
        async.parallel([
          function(cb) {
            SourceModel.findOne(function(err, inst) {
              if (err) return cb(err);
              test.model = inst;
              inst.name = 'source update';
              inst.save(cb);
            });
          },
          function(cb) {
            TargetModel.findOne(function(err, inst) {
              if (err) return cb(err);
              inst.remove(cb);
            });
          }
        ], function(err) {
          if (err) return done(err);
          SourceModel.replicate(TargetModel, function(err, conflicts) {
            if (err) return done(err);
            test.conflicts = conflicts;
            test.conflict = conflicts[0];
            done();
          });
        });
      }
    });
    it('should detect a single conflict', function() {
      assert.equal(this.conflicts.length, 1);
      assert(this.conflict);
    });
    it('type should be DELETE', function(done) {
      this.conflict.type(function(err, type) {
        assert.equal(type, Change.DELETE);
        done();
      });
    });
    it('conflict.changes()', function(done) {
      var test = this;
      this.conflict.changes(function(err, sourceChange, targetChange) {
        assert.equal(typeof sourceChange.id, 'string');
        assert.equal(typeof targetChange.id, 'string');
        assert.equal(test.model.getId(), sourceChange.getModelId());
        assert.equal(sourceChange.type(), Change.UPDATE);
        assert.equal(targetChange.type(), Change.DELETE);
        done();
      });
    });
    it('conflict.models()', function(done) {
      var test = this;
      this.conflict.models(function(err, source, target) {
        assert.equal(target, null);
        assert.deepEqual(source.toJSON(), {
          id: test.model.id,
          name: 'source update'
        });
        done();
      });
    });
  });

  describe('conflict detection - both deleted', function() {
    beforeEach(function(done) {
      var SourceModel = this.SourceModel;
      var TargetModel = this.TargetModel;
      var test = this;

      test.createInitalData(createConflict);

      function createConflict() {
        async.parallel([
          function(cb) {
            SourceModel.findOne(function(err, inst) {
              if (err) return cb(err);
              test.model = inst;
              inst.remove(cb);
            });
          },
          function(cb) {
            TargetModel.findOne(function(err, inst) {
              if (err) return cb(err);
              inst.remove(cb);
            });
          }
        ], function(err) {
          if (err) return done(err);
          SourceModel.replicate(TargetModel, function(err, conflicts) {
            if (err) return done(err);
            test.conflicts = conflicts;
            test.conflict = conflicts[0];
            done();
          });
        });
      }
    });
    it('should not detect a conflict', function() {
      assert.equal(this.conflicts.length, 0);
      assert(!this.conflict);
    });
  });

  describe('change detection', function() {
    it('detects "create"', function(done) {
      SourceModel.create({}, function(err, inst) {
        if (err) return done(err);
        assertChangeRecordedForId(inst.id, done);
      });
    });

    it('detects "updateOrCreate"', function(done) {
      givenReplicatedInstance(function(err, created) {
        if (err) return done(err);
        var data = created.toObject();
        created.name = 'updated';
        SourceModel.updateOrCreate(created, function(err, inst) {
          if (err) return done(err);
          assertChangeRecordedForId(inst.id, done);
        });
      });
    });

    it('detects "findOrCreate"', function(done) {
      // make sure we bypass find+create and call the connector directly
      SourceModel.dataSource.connector.findOrCreate =
        function(model, query, data, callback) {
          this.all(model, query, function(err, list) {
            if (err || (list && list[0]))
              return callback(err, list && list[0], false);
            this.create(model, data, function(err) {
              callback(err, data, true);
            });
          }.bind(this));
        };

      SourceModel.findOrCreate(
        { where: { name: 'does-not-exist' } },
        { name: 'created' },
        function(err, inst) {
          if (err) return done(err);
          assertChangeRecordedForId(inst.id, done);
        });
    });

    it('detects "deleteById"', function(done) {
      givenReplicatedInstance(function(err, inst) {
        if (err) return done(err);
        SourceModel.deleteById(inst.id, function(err) {
          assertChangeRecordedForId(inst.id, done);
        });
      });
    });

    it('detects "deleteAll"', function(done) {
      givenReplicatedInstance(function(err, inst) {
        if (err) return done(err);
        SourceModel.deleteAll({ name: inst.name }, function(err) {
          if (err) return done(err);
          assertChangeRecordedForId(inst.id, done);
        });
      });
    });

    it('detects "updateAll"', function(done) {
      givenReplicatedInstance(function(err, inst) {
        if (err) return done(err);
        SourceModel.updateAll(
          { name: inst.name },
          { name: 'updated' },
          function(err) {
            if (err) return done(err);
            assertChangeRecordedForId(inst.id, done);
          });
      });
    });

    it('detects "prototype.save"', function(done) {
      givenReplicatedInstance(function(err, inst) {
        if (err) return done(err);
        inst.name = 'updated';
        inst.save(function(err) {
          if (err) return done(err);
          assertChangeRecordedForId(inst.id, done);
        });
      });
    });

    it('detects "prototype.updateAttributes"', function(done) {
      givenReplicatedInstance(function(err, inst) {
        if (err) return done(err);
        inst.updateAttributes({ name: 'updated' }, function(err) {
          if (err) return done(err);
          assertChangeRecordedForId(inst.id, done);
        });
      });
    });

    it('detects "prototype.delete"', function(done) {
      givenReplicatedInstance(function(err, inst) {
        if (err) return done(err);
        inst.delete(function(err) {
          assertChangeRecordedForId(inst.id, done);
        });
      });
    });

    function givenReplicatedInstance(cb) {
      SourceModel.create({ name: 'a-name' }, function(err, inst) {
        if (err) return cb(err);
        SourceModel.checkpoint(function(err) {
          if (err) return cb(err);
          cb(null, inst);
        });
      });
    }

    function assertChangeRecordedForId(id, cb) {
      SourceModel.getChangeModel().getCheckpointModel()
        .current(function(err, cp) {
          if (err) return cb(err);
          SourceModel.changes(cp - 1, {}, function(err, pendingChanges) {
            if (err) return cb(err);
            expect(pendingChanges, 'list of changes').to.have.length(1);
            var change = pendingChanges[0].toObject();
            expect(change).to.have.property('checkpoint', cp); // sanity check
            expect(change).to.have.property('modelName', SourceModel.modelName);
            // NOTE(bajtos) Change.modelId is always String
            // regardless of the type of the changed model's id property
            expect(change).to.have.property('modelId', '' + id);
            cb();
          });
        });
    }
  });

  describe('complex setup', function() {
    var sourceInstance, sourceInstanceId, AnotherModel;

    beforeEach(function createReplicatedInstance(done) {
      async.series([
        function createInstance(next) {
          SourceModel.create({ id: 'test-instance' }, function(err, result) {
            sourceInstance = result;
            sourceInstanceId = result.id;
            next(err);
          });
        },
        replicateExpectingSuccess(),
        verifySourceWasReplicated()
      ], done);
    });

    beforeEach(function setupThirdModel() {
      AnotherModel = this.AnotherModel = PersistedModel.extend(
        'AnotherModel-' + tid,
        { id: { id: true, type: String, defaultFn: 'guid' } },
        { trackChanges: true });

      // NOTE(bajtos) At the moment, all models share the same Checkpoint
      // model. This causes the in-process replication to work differently
      // than client-server replication.
      // As a workaround, we manually setup unique Checkpoint for AnotherModel.
      var AnotherChange = AnotherModel.Change;
      AnotherChange.Checkpoint = loopback.Checkpoint.extend('AnotherCheckpoint');
      AnotherChange.Checkpoint.attachTo(dataSource);

      AnotherModel.attachTo(dataSource);
    });

    it('correctly replicates without checkpoint filter', function(done) {
      async.series([
        updateSourceInstanceNameTo('updated'),
        replicateExpectingSuccess(),
        verifySourceWasReplicated(),

        function deleteInstance(next) {
          sourceInstance.remove(next);
        },
        replicateExpectingSuccess(),
        function verifyTargetModelWasDeleted(next) {
          TargetModel.find(function(err, list) {
            if (err) return next(err);
            expect(getIds(list)).to.not.contain(sourceInstance.id);
            next();
          });
        }
      ], done);
    });

    it('replicates multiple updates within the same CP', function(done) {
      async.series([
        replicateExpectingSuccess(),
        verifySourceWasReplicated(),

        updateSourceInstanceNameTo('updated'),
        updateSourceInstanceNameTo('again'),
        replicateExpectingSuccess(),
        verifySourceWasReplicated()
      ], done);
    });

    describe('clientA-server-clientB', function() {
      var ClientA, Server, ClientB;

      beforeEach(function() {
        ClientA  = SourceModel;
        Server = TargetModel;
        ClientB = AnotherModel;

        // NOTE(bajtos) The tests should ideally pass without the since
        // filter too. Unfortunately that's not possible with the current
        // implementation that remembers only the last two changes made.
        useSinceFilter = true;
      });

      it('replicates new models', function(done) {
        async.series([
          // Note that ClientA->Server was already replicated during setup
          replicateExpectingSuccess(Server, ClientB),
          verifySourceWasReplicated(ClientB)
        ], done);
      });

      it('propagates updates with no false conflicts', function(done) {
        async.series([
          updateSourceInstanceNameTo('v2'),
          replicateExpectingSuccess(ClientA, Server),

          replicateExpectingSuccess(Server, ClientB),

          updateSourceInstanceNameTo('v3'),
          replicateExpectingSuccess(ClientA, Server),
          updateSourceInstanceNameTo('v4'),
          replicateExpectingSuccess(ClientA, Server),

          replicateExpectingSuccess(Server, ClientB),
          verifySourceWasReplicated(ClientB)
        ], done);
      });

      it('propagates deletes with no false conflicts', function(done) {
        async.series([
          deleteSourceInstance(),
          replicateExpectingSuccess(ClientA, Server),
          replicateExpectingSuccess(Server, ClientB),
          verifySourceWasReplicated(ClientB)
        ], done);
      });

      describe('bidirectional sync', function() {
        beforeEach(function finishInitialSync(next) {
          // The fixture setup creates a new model instance and replicates
          // it from ClientA to Server. Since we are performing bidirectional
          // synchronization in this suite, we must complete the first sync,
          // otherwise some of the tests may fail.
          replicateExpectingSuccess(Server, ClientA)(next);
        });

        it('propagates CREATE', function(done) {
          async.series([
            sync(ClientA, Server),
            sync(ClientB, Server)
          ], done);
        });

        it('propagates CREATE+UPDATE', function(done) {
          async.series([
            // NOTE: ClientB has not fetched the new model instance yet
            updateSourceInstanceNameTo('v2'),
            sync(ClientA, Server),

            // ClientB fetches the created & updated instance from the server
            sync(ClientB, Server),
          ], done);
        });

        it('propagates DELETE', function(done) {
          async.series([
            // NOTE: ClientB has not fetched the new model instance yet
            updateSourceInstanceNameTo('v2'),
            sync(ClientA, Server),

            // ClientB fetches the created & updated instance from the server
            sync(ClientB, Server),
          ], done);

        });

        it('does not report false conflicts', function(done) {
          async.series([
            // client A makes some work
            updateSourceInstanceNameTo('v2'),
            sync(ClientA, Server),

            // ClientB fetches the change from the server
            sync(ClientB, Server),
            verifySourceWasReplicated(ClientB),

            // client B makes some work
            updateClientB('v5'),
            sync(Server, ClientB),
            updateClientB('v6'),
            sync(ClientB, Server),

            // client A fetches the changes
            sync(ClientA, Server)
          ], done);
        });

        it('handles UPDATE conflict resolved using "ours"', function(done) {
          testUpdateConflictIsResolved(
            function resolveUsingOurs(conflict, cb) {
              conflict.resolveUsingSource(cb);
            },
            done);
        });

        it('handles UPDATE conflict resolved using "theirs"', function(done) {
          testUpdateConflictIsResolved(
            function resolveUsingTheirs(conflict, cb) {
              // We sync ClientA->Server first
              expect(conflict.SourceModel.modelName)
                .to.equal(ClientB.modelName);
              conflict.resolveUsingTarget(cb);
            },
            done);
        });

        it('handles UPDATE conflict resolved manually', function(done) {
          testUpdateConflictIsResolved(
            function resolveManually(conflict, cb) {
              conflict.resolveManually({ name: 'manual' }, cb);
            },
            done);
        });

        it('handles DELETE conflict resolved using "ours"', function(done) {
          testDeleteConflictIsResolved(
            function resolveUsingOurs(conflict, cb) {
              conflict.resolveUsingSource(cb);
            },
            done);
        });

        it('handles DELETE conflict resolved using "theirs"', function(done) {
          testDeleteConflictIsResolved(
            function resolveUsingTheirs(conflict, cb) {
              // We sync ClientA->Server first
              expect(conflict.SourceModel.modelName)
                .to.equal(ClientB.modelName);
              conflict.resolveUsingTarget(cb);
            },
            done);
        });

        it('handles DELETE conflict resolved as manual delete', function(done) {
          testDeleteConflictIsResolved(
            function resolveManually(conflict, cb) {
              conflict.resolveManually(null, cb);
            },
            done);
        });

        it('handles DELETE conflict resolved manually', function(done) {
          testDeleteConflictIsResolved(
            function resolveManually(conflict, cb) {
              conflict.resolveManually({ name: 'manual' }, cb);
            },
            done);
        });
      });

      function testUpdateConflictIsResolved(resolver, cb) {
        async.series([
          // sync the new model to ClientB
          sync(ClientB, Server),
          verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId),

          // ClientA makes a change
          updateSourceInstanceNameTo('a'),
          sync(ClientA, Server),

          // ClientB changes the same instance
          updateClientB('b'),

          function syncAndResolveConflict(next) {
            replicate(ClientB, Server, function(err, conflicts, cps) {
              if (err) return next(err);

              expect(conflicts).to.have.length(1);
              expect(conflicts[0].SourceModel.modelName)
                .to.equal(ClientB.modelName);

              debug('Resolving the conflict %j', conflicts[0]);
              resolver(conflicts[0], next);
            });
          },

          // repeat the last sync, it should pass now
          sync(ClientB, Server),
          // and sync back to ClientA too
          sync(ClientA, Server),

          verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId)
        ], cb);
      }

      function testDeleteConflictIsResolved(resolver, cb) {
        async.series([
          // sync the new model to ClientB
          sync(ClientB, Server),
          verifyInstanceWasReplicated(ClientA, ClientB, sourceInstanceId),

          // ClientA makes a change
          function deleteInstanceOnClientA(next) {
            ClientA.deleteById(sourceInstanceId, next);
          },

          sync(ClientA, Server),

          // ClientB changes the same instance
          updateClientB('b'),

          function syncAndResolveConflict(next) {
            replicate(ClientB, Server, function(err, conflicts, cps) {
              if (err) return next(err);

              expect(conflicts).to.have.length(1);
              expect(conflicts[0].SourceModel.modelName)
                .to.equal(ClientB.modelName);

              debug('Resolving the conflict %j', conflicts[0]);
              resolver(conflicts[0], next);
            });
          },

          // repeat the last sync, it should pass now
          sync(ClientB, Server),
          // and sync back to ClientA too
          sync(ClientA, Server),

          verifyInstanceWasReplicated(ClientB, ClientA, sourceInstanceId)
        ], cb);
      }

      function updateClientB(name) {
        return function updateInstanceB(next) {
          ClientB.findById(sourceInstanceId, function(err, instance) {
            if (err) return next(err);
            instance.name = name;
            instance.save(next);
          });
        };
      }

      function sync(client, server) {
        return function syncBothWays(next) {
          async.series([
            // NOTE(bajtos) It's important to replicate from the client to the
            // server first, so that we can resolve any conflicts at the client
            replicateExpectingSuccess(client, server),
            replicateExpectingSuccess(server, client)
          ], next);
        };
      }

    });

    function updateSourceInstanceNameTo(value) {
      return function updateInstance(next) {
        debug('update source instance name to %j', value);
        sourceInstance.name = value;
        sourceInstance.save(next);
      };
    }

    function deleteSourceInstance(value) {
      return function deleteInstance(next) {
        debug('delete source instance', value);
        sourceInstance.remove(function(err) {
          sourceInstance = null;
          next(err);
        });
      };
    }

    function verifySourceWasReplicated(target) {
      if (!target) target = TargetModel;
      return function verify(next) {
        target.findById(sourceInstanceId, function(err, targetInstance) {
          if (err) return next(err);
          expect(targetInstance && targetInstance.toObject())
            .to.eql(sourceInstance && sourceInstance.toObject());
          next();
        });
      };
    }
  });

  var _since = {};
  function replicate(source, target, since, next) {
    if (typeof since === 'function') {
      next = since;
      since = undefined;
    }

    var sinceIx = source.modelName + ':to:' + target.modelName;
    if (since === undefined) {
      since = useSinceFilter ?
        _since[sinceIx] || -1 :
        -1;
    }

    debug('replicate from %s to %s since %j',
      source.modelName, target.modelName, since);

    source.replicate(since, target, function(err, conflicts, cps) {
      if (err) return next(err);
      if (conflicts.length === 0) {
        _since[sinceIx] = cps;
      }
      next(err, conflicts, cps);
    });
  }

  function createModel(Model, data) {
    return function create(next) {
      Model.create(data, next);
    };
  }

  function replicateExpectingSuccess(source, target, since) {
    if (!source) source = SourceModel;
    if (!target) target = TargetModel;

    return function doReplicate(next) {
      replicate(source, target, since, function(err, conflicts, cps) {
        if (err) return next(err);
        if (conflicts.length) {
          return next(new Error('Unexpected conflicts\n' +
            conflicts.map(JSON.stringify).join('\n')));
        }
        next();
      });
    };
  }

  function setupRaceConditionInReplication(fn) {
    var bulkUpdate = TargetModel.bulkUpdate;
    TargetModel.bulkUpdate = function(data, cb) {
      // simulate the situation when a 3rd party modifies the database
      // while a replication run is in progress
      var self = this;
      fn(function(err) {
        if (err) return cb(err);
        bulkUpdate.call(self, data, cb);
      });

      // apply the 3rd party modification only once
      TargetModel.bulkUpdate = bulkUpdate;
    };
  }

  function verifyInstanceWasReplicated(source, target, id) {
    return function verify(next) {
      source.findById(id, function(err, expected) {
        if (err) return next(err);
        target.findById(id, function(err, actual) {
          if (err) return next(err);
          expect(actual && actual.toObject())
            .to.eql(expected && expected.toObject());
          debug('replicated instance: %j', actual);
          next();
        });
      });
    };
  }

  function spyAndStoreSinceArg(Model, methodName, store) {
    var orig = Model[methodName];
    Model[methodName] = function(since) {
      store.push(since);
      orig.apply(this, arguments);
    };
  }

  function getPropValue(obj, name) {
    return Array.isArray(obj) ?
      obj.map(function(it) { return getPropValue(it, name); }) :
      obj[name];
  }

  function getIds(list) {
    return getPropValue(list, 'id');
  }
});