Manual Review Sync Queues for CouchDB Replication

Operational Architecture

In distributed edge, IoT, and mobile backend deployments, network partitions and concurrent writes inevitably produce document-level conflicts. While foundational Conflict Detection & Automated Resolution Strategies handle the majority of divergent states through deterministic merge logic, a subset of conflicts requires human intervention. Manual review sync queues act as a controlled holding layer for documents that exceed automated resolution thresholds. The architecture routes unresolved conflicts to a dedicated CouchDB database (sync_review_queue), where they are indexed, locked, and surfaced to operators via a deterministic API. This pattern prevents replication stalls, maintains idempotent sync pipelines, and preserves audit trails for compliance-heavy environments.

The queue operates as a sidecar to the primary replication topology. Instead of allowing conflicting revisions to accumulate indefinitely or forcing lossy last-write-wins semantics, the system intercepts documents whose computed _conflicts array (read with ?conflicts=true) is non-empty. These documents are cloned into the review queue with an attached review_status field, a locked_by operator identifier, and a resolution_deadline timestamp. Once an operator resolves the conflict, the chosen revision is written back to the source database and a deletion (tombstone) is written for every losing _rev so the conflict is actually cleared; the queue entry is then archived. This decoupled workflow ensures that continuous replication remains uninterrupted while providing a structured escalation path for edge cases.

flowchart LR
  CH["_changes (conflicts=true)"] --> EV{"Severity check"}
  EV -->|auto-resolvable| AR["Auto-merge engine"]
  EV -->|needs human| Q[("Review queue DB")]
  Q --> OP["Operator resolves"]
  OP --> WB["Write winner + delete losers"]
  AR --> WB

Replicator Configuration & Conflict Preservation

The CouchDB _replicator database must be configured to preserve conflicts rather than auto-resolve them during continuous sync. Deploy the following replication document to establish a bidirectional continuous sync that explicitly tracks divergent revisions:

{
  "_id": "edge_to_core_replication",
  "source": "https://edge-node-01.local:5984/device_data",
  "target": "https://core-cluster.internal:5984/device_data",
  "continuous": true,
  "create_target": true,
  "user_ctx": {
    "name": "replicator_svc",
    "roles": ["_admin"]
  },
  "selector": {
    "sync_enabled": true
  },
  "heartbeat": 30000,
  "retries_per_request": 5,
  "connection_timeout": 15000
}

Key operational parameters:

  • The top-level selector (a Mango selector) restricts replication to matching documents. Use selector on its own — do not set filter to _selector in a replicator document; that value is only meaningful as a _changes-feed filter. Replication inherently preserves every conflicting revision, so no conflicts flag is needed (and none exists); your downstream consumer requests conflicts=true on the _changes feed to surface them. This behavior is documented in the official CouchDB Replication Protocol.
  • heartbeat and connection_timeout prevent stale long-poll connections in constrained IoT networks where NAT tables aggressively prune idle TCP sessions.
  • retries_per_request handles transient network drops at the HTTP layer without losing replication state; the scheduler additionally retries crashed jobs with exponential backoff, ensuring eventual consistency across partitioned nodes.

Python Conflict Routing Pipeline

The routing pipeline consumes the _changes feed, evaluates conflict severity, and pushes unresolvable documents to the manual queue. This script assumes conflicts have already passed through Auto-Merge Rule Engines and failed deterministic resolution. When Algorithm Selection for Merge cannot guarantee data integrity (e.g., conflicting sensor telemetry with overlapping timestamps and missing provenance), the pipeline routes the document to sync_review_queue.

The implementation below uses a persistent HTTP session, exponential backoff, and explicit since token tracking to provide at-least-once, idempotent processing (the since checkpoint and content-hashed queue IDs make re-delivery safe). It leverages the requests library for robust connection pooling and retry logic, as detailed in the Requests Library Documentation.

#!/usr/bin/env python3
"""
CouchDB Manual Review Sync Queue Router
Routes unresolved conflicts to a dedicated queue DB for operator review.
"""
import os
import time
import logging
import hashlib
from typing import List, Optional

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

class ConflictRouter:
    def __init__(self, source_db_url: str, queue_db_url: str, auth: Optional[tuple] = None):
        self.source_url = source_db_url.rstrip("/")
        self.queue_url = queue_db_url.rstrip("/")
        self.auth = auth
        self.session = self._init_session()
        self.since_token = "0"
        self.last_checkpoint_file = ".sync_router_since"

    def _init_session(self) -> requests.Session:
        session = requests.Session()
        if self.auth:
            session.auth = self.auth
        retry_strategy = Retry(
            total=5,
            backoff_factor=1.5,
            status_forcelist=[429, 500, 502, 503, 504],
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        return session

    def _load_checkpoint(self) -> str:
        if os.path.exists(self.last_checkpoint_file):
            with open(self.last_checkpoint_file, "r") as f:
                return f.read().strip()
        return "0"

    def _save_checkpoint(self, token: str):
        with open(self.last_checkpoint_file, "w") as f:
            f.write(token)

    def _evaluate_conflict_severity(self, doc: dict, conflicts: List[str]) -> bool:
        """
        Returns True if the document requires manual review.
        Heuristic: >2 conflicting revisions, missing 'provenance' field, 
        or overlapping telemetry windows.
        """
        if len(conflicts) >= 3:
            return True
        if "provenance" not in doc or not doc["provenance"]:
            return True
        if "timestamp" in doc and "last_updated" in doc:
            # Example threshold: conflict spans > 24 hours
            t1 = doc.get("timestamp", 0)
            t2 = doc.get("last_updated", 0)
            if abs(t1 - t2) > 86400000:
                return True
        return False

    def _push_to_queue(self, doc_id: str, doc: dict, conflicts: List[str]):
        queue_doc = {
            "_id": f"review_{hashlib.sha256(doc_id.encode()).hexdigest()[:12]}",
            "original_id": doc_id,
            "conflicting_revs": conflicts,
            "payload": doc,
            "review_status": "pending",
            "locked_by": None,
            "created_at": int(time.time() * 1000),
            "resolution_deadline": int(time.time() * 1000) + (7 * 86400000)
        }
        resp = self.session.put(f"{self.queue_url}/{queue_doc['_id']}", json=queue_doc)
        resp.raise_for_status()
        logger.info(f"Routed {doc_id} to review queue: {queue_doc['_id']}")

    def run(self):
        self.since_token = self._load_checkpoint()
        logger.info(f"Starting conflict router from since={self.since_token}")

        changes_url = f"{self.source_url}/_changes"
        params = {
            "feed": "longpoll",
            "since": self.since_token,
            "include_docs": True,
            "conflicts": True,
            "timeout": 30000,
            "heartbeat": 15000
        }

        while True:
            try:
                # Advance `since` each poll so we only fetch new changes.
                params["since"] = self.since_token
                resp = self.session.get(changes_url, params=params, timeout=35)
                resp.raise_for_status()
                data = resp.json()
                self.since_token = data.get("last_seq", self.since_token)

                for change in data.get("results", []):
                    doc = change.get("doc")
                    if not doc or "_deleted" in doc:
                        continue

                    conflicts = doc.get("_conflicts", [])
                    if conflicts and self._evaluate_conflict_severity(doc, conflicts):
                        self._push_to_queue(doc["_id"], doc, conflicts)

                self._save_checkpoint(self.since_token)

            except requests.exceptions.RequestException as e:
                logger.error(f"Changes feed interrupted: {e}. Retrying in 5s...")
                time.sleep(5)
            except Exception as e:
                logger.error(f"Unexpected routing error: {e}")
                time.sleep(10)

if __name__ == "__main__":
    SOURCE = os.getenv("COUCHDB_SOURCE_URL", "https://edge-node-01.local:5984/device_data")
    QUEUE = os.getenv("COUCHDB_QUEUE_URL", "https://core-cluster.internal:5984/sync_review_queue")
    USER = os.getenv("COUCHDB_USER", "admin")
    PASS = os.getenv("COUCHDB_PASS", "password")

    router = ConflictRouter(SOURCE, QUEUE, auth=(USER, PASS))
    router.run()

Queue Management & Operational Safeguards

The sync_review_queue database requires strict lifecycle controls to prevent unbounded growth. Operators should deploy a _design document that indexes pending reviews by created_at and resolution_deadline. Automated cleanup jobs must archive resolved documents after a configurable retention window, typically 90 days for regulated industries.

To prevent race conditions during operator assignment, the queue relies on CouchDB’s optimistic concurrency control. When an operator claims a document, the pipeline issues a conditional PUT with the current _rev. If the revision has changed, the claim fails and the operator receives a conflict notification. This guarantees that only one engineer can modify a review state at a time. Additionally, the pipeline maintains an idempotent routing guarantee by hashing the original document ID and using it as a deterministic queue key. Duplicate conflict events from the _changes feed are safely ignored if the queue already contains a matching original_id.

By isolating high-complexity conflicts from the primary replication stream, teams maintain low-latency sync for standard payloads while preserving a structured, auditable path for edge-case resolution. This architecture scales horizontally across multiple routing instances, provided each instance consumes a partitioned _changes feed or coordinates via distributed locking mechanisms.