Auto-Merge Rule Engines for CouchDB Replication Conflict Resolution & Sync Automation

Edge/IoT deployments and mobile backends routinely operate across partitioned, high-latency networks where concurrent document mutations are inevitable. CouchDB’s replication protocol intentionally avoids automatic field-level merging to preserve data integrity. Instead, it deterministically designates a winning revision (highest generation number, then the lexicographically highest revision hash as a tiebreaker) purely for the default read, while retaining every divergent leaf in the revision tree. Those losing leaves are surfaced as a computed _conflicts array when a document is read with ?conflicts=true. Production systems cannot rely on manual intervention at scale; they require an external auto-merge rule engine to intercept these conflicts, apply deterministic resolution logic, and push reconciled documents back into the replication stream. This guide details the exact _replicator configuration, a production-grade Python conflict resolver, and deployment patterns for continuous sync automation within the broader Conflict Detection & Automated Resolution Strategies framework.

Conflict Surface & _replicator Configuration Schema

CouchDB exposes conflicts through the _changes feed when queried with style=all_docs or by inspecting the _conflicts array on individual documents. The _replicator database must be configured to maintain continuous bidirectional sync while surfacing conflict metadata without halting replication.

Deploy the following _replicator document to establish a continuous, conflict-aware replication pipeline:

{
  "_id": "edge_to_core_sync",
  "source": "http://edge-node-01:5984/iot_telemetry",
  "target": "http://core-cluster-01:5984/iot_telemetry",
  "continuous": true,
  "create_target": true,
  "user_ctx": {
    "name": "replication_svc",
    "roles": ["_admin"]
  },
  "doc_ids": [],
  "filter": null,
  "since_seq": 0,
  "owner": "replication_svc"
}

Key operational parameters:

  • continuous: true ensures the replication daemon maintains an open _changes listener, enabling near-real-time sync.
  • create_target: true prevents replication stalls when target databases are provisioned lazily during cluster scaling.
  • Conflict resolution must occur externally. The replication engine keeps syncing non-conflicting documents while conflicting revisions remain as additional leaf branches in the revision tree (visible per document via ?conflicts=true).

Streaming Conflict Detection via _changes

Monitoring conflict density requires querying the database changes feed with style=all_docs, include_docs=true, and conflicts=true (the last is what causes each returned document to carry its computed _conflicts array). The CouchDB Changes API delivers changes in update order with resumable since sequences, making it well suited to streaming architectures. By keeping only documents that have a non-empty doc._conflicts array, the pipeline isolates the records requiring intervention.

This stream feeds directly into the auto-merge rule engine, which operates as a stateless consumer. Each conflict event triggers a fetch of the winning revision alongside all conflicting branches, enabling deterministic reconciliation without race conditions.

Production-Grade Python Auto-Merge Engine

The rule engine operates as a long-running daemon that consumes the _changes feed, applies configurable merge strategies, and writes reconciled documents using CouchDB’s _bulk_docs endpoint. The following implementation includes exponential backoff, idempotent revision tracking, and structured rule routing.

#!/usr/bin/env python3
"""
CouchDB Auto-Merge Rule Engine
Consumes _changes feed, detects conflicts, applies merge rules, writes reconciled docs.
"""
import os
import time
import logging
import requests
from typing import Dict, List, Optional, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

class CouchDBMergeEngine:
    def __init__(self, db_url: str, username: str, password: str):
        self.db_url = db_url.rstrip("/")
        self.session = requests.Session()
        self.session.auth = (username, password)
        retry_strategy = Retry(
            total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]
        )
        self.session.mount("http://", HTTPAdapter(max_retries=retry_strategy))
        self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))

    def fetch_changes(self, since_seq: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
        """Poll the _changes feed, including each document's conflict metadata.

        `conflicts=true` is what makes returned docs carry a computed
        `_conflicts` array. You cannot filter by `_conflicts` with a Mango
        selector — it is a read-time projection, not a stored/indexed field —
        so we request all changes and let resolve_conflicts() skip clean docs.
        """
        params = {
            "feed": "normal",
            "style": "all_docs",
            "include_docs": "true",
            "conflicts": "true",
            "since": since_seq,
            "limit": limit,
        }
        resp = self.session.get(f"{self.db_url}/_changes", params=params)
        resp.raise_for_status()
        return resp.json().get("results", [])

    def resolve_conflicts(self, doc: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Apply deterministic merge logic. Extend for [Custom Conflict Resolver Functions in Python](/conflict-detection-automated-resolution-strategies/auto-merge-rule-engines/custom-conflict-resolver-functions-in-python/).

        Returns the merged winner plus the losing revisions that must be
        tombstoned, or None when the document has no conflicts.
        """
        conflicts = doc.get("_conflicts", [])
        if not conflicts:
            return None

        # Fetch conflicting revisions
        rev_docs = []
        for rev in conflicts:
            r = self.session.get(f"{self.db_url}/{doc['_id']}?rev={rev}")
            if r.status_code == 200:
                rev_docs.append(r.json())

        # Deterministic merge: field-level union with higher-generation fallback
        merged = doc.copy()
        merged.pop("_conflicts", None)

        for rev_doc in rev_docs:
            for key, value in rev_doc.items():
                if key.startswith("_"):
                    continue
                if key not in merged or self._is_newer(rev_doc, merged):
                    merged[key] = value

        # The merged winner keeps the current winning _rev so the write extends
        # the winning branch; the losing leaves are returned for deletion.
        return {"merged": merged, "losing_revs": conflicts}

    def _is_newer(self, candidate: Dict, current: Dict) -> bool:
        """Simple heuristic: compare _rev generation numbers."""
        try:
            cand_gen = int(candidate["_rev"].split("-")[0])
            curr_gen = int(current["_rev"].split("-")[0])
            return cand_gen > curr_gen
        except (ValueError, KeyError):
            return False

    def write_resolved(self, resolution: Dict[str, Any]) -> bool:
        """Commit the merged winner AND delete the losing leaf revisions.

        A conflict is only cleared once every losing leaf is tombstoned, so the
        winner and the deletions are sent together in one _bulk_docs batch.
        """
        merged = resolution["merged"]
        docs = [merged] + [
            {"_id": merged["_id"], "_rev": rev, "_deleted": True}
            for rev in resolution["losing_revs"]
        ]
        resp = self.session.post(f"{self.db_url}/_bulk_docs", json={"docs": docs})
        resp.raise_for_status()
        return all(r.get("ok") for r in resp.json())

    def run(self, poll_interval: float = 2.0):
        """Main event loop."""
        since_seq = 0
        logging.info("Starting CouchDB Auto-Merge Engine...")
        while True:
            try:
                changes = self.fetch_changes(since_seq)
                for change in changes:
                    doc = change.get("doc", {})
                    if not doc or "_id" not in doc:
                        continue
                    
                    resolution = self.resolve_conflicts(doc)
                    if resolution:
                        if self.write_resolved(resolution):
                            logging.info(f"Resolved conflict for {doc['_id']}")
                        else:
                            logging.warning(f"Failed to resolve {doc['_id']}, routing to fallback")
                    since_seq = change.get("seq", since_seq)
                time.sleep(poll_interval)
            except requests.exceptions.RequestException as e:
                logging.error(f"Network error: {e}. Retrying in {poll_interval * 2}s...")
                time.sleep(poll_interval * 2)

if __name__ == "__main__":
    engine = CouchDBMergeEngine(
        db_url=os.getenv("COUCHDB_URL", "http://localhost:5984/iot_telemetry"),
        username=os.getenv("COUCHDB_USER", "admin"),
        password=os.getenv("COUCHDB_PASS", "password")
    )
    engine.run()

Deterministic Merge Strategies & Rule Routing

Auto-merge engines must prioritize predictability over complexity. When designing resolution logic, teams should align their approach with established Algorithm Selection for Merge principles. Common production strategies include:

  1. Field-Level Union: Non-overlapping keys from conflicting branches are merged. Overlapping keys defer to a deterministic tiebreaker (e.g., highest _rev sequence, explicit updated_at timestamp, or source priority).
  2. Last-Write-Wins (LWW): The revision with the highest sequence number overwrites all others. Suitable for telemetry streams where recency implies accuracy.
  3. CRDT-Inspired Merges: For collaborative state, use commutative, associative, and idempotent operations (e.g., G-Counters, OR-Sets) to guarantee convergence without coordination.

Rule routing should be implemented via a strategy pattern, mapping document types (doc.type or doc.schema_version) to specific resolver functions. This isolates business logic from the sync pipeline and enables hot-swapping of merge policies without restarting the daemon.

flowchart LR
  C["Conflicted document"] --> R{"Route by doc.type"}
  R -->|telemetry| F["Field-level union"]
  R -->|state| L["Last-write-wins"]
  R -->|counter| D["CRDT merge"]
  F --> WB["_bulk_docs:<br/>winner + delete losers"]
  L --> WB
  D --> WB
  R -->|unresolvable| Q["Manual review queue"]

Deployment Patterns & Sync Automation

Running the auto-merge engine in production requires careful orchestration to prevent duplicate processing and ensure graceful degradation. Containerize the resolver using a lightweight base image, mount configuration via environment variables, and deploy with a single replica per database partition to avoid split-brain resolution attempts.

When deterministic resolution fails due to schema incompatibility or missing metadata, the engine must transition to a safe fallback state. Unresolvable conflicts should be routed to Manual Review Sync Queues where human operators can inspect divergence, apply corrective patches, and re-inject the document into the replication stream.

For high-throughput IoT deployments, implement cache warming for conflict metadata to reduce database read amplification. Monitor conflict density, resolution latency, and fallback queue depth using standard observability stacks. Automate emergency resync workflows by exposing a health endpoint that triggers targeted _replicator document updates when conflict rates exceed predefined thresholds.