Skip to content

Migration Jobs — Full Implementation Guide

Deep dive on every migration job: dump, import, clone, delete, verify, instance-replace. Covers data model, executor wiring, shared helpers, decision trees, and flow diagrams.


1. Overview

Migrations are async jobs queued via REST API or CLI, executed by a background worker.

Architecture:

mermaid
flowchart LR
    UI[Web UI / CLI] -->|POST migrations| H[Handler]
    H -->|insert Job| DB[(Postgres<br/>jobs/job_logs/job_artifacts)]
    W[WorkerMigrationRunner<br/>30s tick] -->|NextQueuedJob| DB
    W -->|dispatch| EX[Executor]
    EX -->|RunDump/Import/Clone/Delete/Verify| MG[internal/migrate]
    MG -->|read/write| MONGO[(MongoDB)]
    EX -->|logs/artifact| DB
    EX -->|notify| NOTIF[Notifications]

Job types (internal/model/migration.go):

TypePurpose
tenant_copyDirect instance-to-instance tenant copy
tenant_cloneDump + import (within or across instance)
tenant_dumpStandalone tenant export → .zip
tenant_importArchive → target instance
tenant_deleteDestructive remove + pre-delete safety dump
instance_replaceFull-instance migration

Status machine:

queued ──(worker pick)──▶ running ──(finish)──▶ succeeded

                              └──(error)────▶ failed

No job-level retry. Batch-level retry handled inside migrate package.


2. Data Model

Tables (internal/db/migrations/):

  • jobs — one row per job. Key fields: type, status, tenant_id, source_instance_id, target_instance_id, dry_run, progress, target_customer_id, target_tenant_name, clone_user_assignments, user_remap (jsonb), reuse_existing_users, dump_file_path, import_file_path, include_collections, exclude_collections, error_message, created_tenant_id, created_tenant_code.
  • job_logs — streamed log entries (level, message).
  • job_artifacts — final JSON reports (migration_report, dump_report, import_report, clone_report, delete_report).

Mongo tenant-scope shapes (three forms; filter must match all):

js
{ tenantId: "ABC1234" }                    // scalar (most collections)
{ tenantID: "ABC1234" }                    // scalar (legacy spelling)
{ tenantIDs: ["ABC1234", ...] }            // array (user multi-tenant)
{ byTenant: { ABC1234: {...} } }           // nested map (user, user-session)

Tenant-namespaced collection prefixes: custom_<code>_, x_<code>_, x_mt_<code>_, cx_s_<code>_. Entire collection belongs to one tenant.


3. Collection Classification

Every collection falls into one bucket. This drives every job's decision tree.

mermaid
flowchart TD
    C[Collection] --> A{system.*?}
    A -->|yes| SKIP[Skip always]
    A -->|no| B{Tenant-namespaced<br/>prefix matches code?}
    B -->|yes| NS[Treat as owned<br/>dump all / drop entire]
    B -->|no| D{In excluded list?<br/>appAudit, version-history,<br/>test, exto-modules-old}
    D -->|yes| SKIP
    D -->|no| E{Name in<br/>special list?}
    E -->|user| U[User path<br/>remap / reuse]
    E -->|user-session| US[Fresh _id,<br/>no idMap walk]
    E -->|customer| CUS[Rewrite name/code,<br/>fresh _id]
    E -->|other| GEN[Classified path<br/>idMap deep-walk,<br/>upsert by _id]

Skip-list for import (importSkipCollections): appAudit, user-session. Skip-list for classify + idMap (idClassifySkipList): user, user-session, customer.


4. Shared Helpers

FileWhat
classify.goPass-1 _id classification. Returns remapSets per collection.
idmap.goIDMap interface. In-memory → bbolt spill at 500k entries. LRU 100k read cache.
stream.goforEachDoc — streams .jsonl (Extended JSON) or .bson (mongodump) from zip.
rewrite.gorewriteDocObjectIDs deep-walks all OIDs. rewriteDocUserRefs handles known email/OID paths + customFields.
tenantrefs.gorewriteTenantRefs, ensureTenantID (backfill scalar), grantReusedUserMembership (atomic pipeline).
retry.goisTransientWriteErr, upsertBatchDocsWithRetry — exp backoff 1s/3s/9s then split-batch recurse.
preflight.goPreflightTargetTenantUnique — fails if code/name exists in target customer.
archive_users.goActive-user extraction from archive/DB. ApplyEffectiveEmail.
collections.goNamed constants + namespace detection.
log.goStderrLogger[HH:MM:SS] msg format.
migrate.goCollectionFilter(name, code) — the $or filter used everywhere.

5. Dump Job

Purpose: Export every tenant-owned doc + indexes from one Mongo DB into a single .zip.

Entry: migrate.RunDump(ctx, DumpConfig, logger)DumpReport.

Flow

mermaid
flowchart TD
    S[Start] --> C[Connect Mongo]
    C --> L[List collections]
    L --> F{Filter: system?<br/>excluded?<br/>tenant-ns?}
    F -->|skip| NX[Next]
    F -->|include| DR{dry-run?}
    DR -->|yes| CNT[Count only]
    DR -->|no| Z[Open .zip entry<br/>dbname/coll.jsonl]
    Z --> ST[Stream cursor,<br/>batch 1000 docs]
    ST --> JSONL[Write Extended JSON<br/>one line per doc]
    JSONL --> IDX[List indexes,<br/>strip _id_/v/ns]
    IDX --> IX[Write<br/>dbname/coll.indexes.jsonl]
    IX --> NX
    NX --> MORE{more colls?}
    MORE -->|yes| F
    MORE -->|no| META[Write _metadata.json]
    META --> R[Build DumpReport]

Archive Layout

<tenant>.zip
├─ _metadata.json                       {tenantId, tenantCode, tenantName, dbName, format, exportedAt}
├─ <dbname>/<coll>.jsonl                one doc per line (Extended JSON)
└─ <dbname>/<coll>.indexes.jsonl        index specs, minus _id_, v, ns

Decision Tree — "include collection?"

system.* ────────────────────────────▶ skip
in dumpExcludedCollections ──────────▶ skip  (appAudit, version-history, test, exto-modules-old)
matches tenant-ns prefix ────────────▶ include (all docs)
IncludeCollections set & not in it ──▶ skip
ExcludeCollections set & in it ──────▶ skip
otherwise ───────────────────────────▶ include (filter by CollectionFilter)

Errors

  • Connection fail → abort.
  • Per-collection fail → logged, DumpReport.HadErrors=true, continue.

6. Import Job

Purpose: Take a dump archive, write into target Mongo DB as a new tenant with safe _id collision handling, tenant-code rewrite, and user identity reuse/remap.

Entry: migrate.RunImport(ctx, ImportConfig, logger)MigrationReport.

Phases

mermaid
flowchart TD
    S[Start] --> P0[Phase 0: Preflight<br/>target tenant code/name unique]
    P0 --> P1[Phase 1: Open zip,<br/>read _metadata.json]
    P1 --> P2[Phase 2: Build coll list<br/>user first, rest alpha]
    P2 --> P3[Phase 3: Index preflight<br/>atomic then per-index fallback]
    P3 --> P4[Phase 4: Init IDMap<br/>bbolt spill 500k]
    P4 --> P5[Phase 5: Pre-scan target users<br/>email to OID map]
    P5 --> P6[Phase 6: Pass-1 CLASSIFY<br/>per coll, chunks of 10k _ids]
    P6 --> P7[Phase 7: Pass-2 IMPORT<br/>per coll, per-doc transform]
    P7 --> RPT[MigrationReport]

Pass 1 — Classify (classify.go)

For each non-skip collection, stream src _ids in 10k chunks; query target {_id:{$in:chunk}}:

mermaid
flowchart TD
    ID[src _id] --> Q{in target?}
    Q -->|no| C2[Case 2: keep _id<br/>no remap]
    Q -->|yes| T{same tenant?}
    T -->|yes| C1[Case 1: keep _id<br/>upsert idempotent]
    T -->|no| C3[Case 3: alloc new OID<br/>store src-hex to new-oid in idMap<br/>add to remapSet]

Pass 2 — Per-Collection Path Decision

mermaid
flowchart TD
    COL[Collection cf] --> S1{in importSkipCollections?<br/>appAudit, user-session}
    S1 -->|yes| SK[Skip]
    S1 -->|no| U1{name == user?}
    U1 -->|no| S2{name in<br/>idClassifySkipList?<br/>user-session, customer}
    U1 -->|yes| R1{ReuseExistingUsers<br/>or UserRemap set?}
    R1 -->|yes| PA[Path A: user reuse/remap]
    R1 -->|no| PB[Path B: skip-list]
    S2 -->|yes| PB
    S2 -->|no| PC[Path C: classified]

Path A — User with Reuse/Remap

mermaid
flowchart TD
    D[user doc] --> P[prepareCollectionDoc<br/>prune byTenant to target only]
    P --> RT[rewriteTenantRefs<br/>src code to tgt code]
    RT --> EM[Extract email lowercased]
    EM --> RM[ApplyEffectiveEmail<br/>explicit then default then src]
    RM --> EX{effective email<br/>in target users?}
    EX -->|yes| REU[action=reused<br/>grantReusedUserMembership<br/>atomic pipeline<br/>tenantIDs = existing minus src plus tgt<br/>populate maps, no insert]
    EX -->|no| INS{email changed?}
    INS -->|yes| IR[action=inserted_renamed<br/>new _id, set email]
    INS -->|no| II[action=inserted<br/>new _id]
    IR --> BA[Batch insert]
    II --> BA
    REU --> NX[Next doc]
    BA --> NX

grantReusedUserMembership pipeline:

js
[
  { $set: {
      tenantIDs: {
        $setUnion: [
          { $filter: { input: "$tenantIDs", cond: { $ne: ["$$this", srcCode] }}},
          [tgtCode]
        ]
      }
  }}
]

Path B — Skip-List (user-session, customer)

  • customer: overwrite name = TargetTenantName, code = TargetTenantCode.
  • Fresh _id via bson.NewObjectID().
  • rewriteDocUserRefs on known paths + customFields.
  • No idMap deep-walk.
  • InsertMany (not upsert — fresh ids can't collide).

Path C — Classified (default)

mermaid
flowchart TD
    D[doc] --> P[prepareCollectionDoc]
    P --> RT[rewriteTenantRefs]
    RT --> ET[ensureTenantID<br/>backfill scalar tenantId]
    ET --> ID{src _id in<br/>remapSet for coll?}
    ID -->|yes| SW[Swap: doc._id = idMap.Get hex]
    ID -->|no| KP[Keep src _id]
    SW --> RO[rewriteDocObjectIDs<br/>deep-walk, replace<br/>any OID in idMap]
    KP --> RO
    RO --> RU[rewriteDocUserRefs<br/>known paths + customFields]
    RU --> B[Batch]
    B --> F{batch full?}
    F -->|yes| UP[upsertBatchDocsWithRetry<br/>ReplaceOne upsert by _id]
    UP --> NX[Next doc]
    F -->|no| NX

Index Preflight

For each coll with .indexes.jsonl:
  parse specs → createIndexes atomic
    → success ✓
    → fail → per-index retry
      → unique-index fail → ABORT entire import (schema violation on target data)
      → non-unique fail → log, continue

Errors & Idempotency

ErrorBehavior
target tenant code/name existsabort (preflight)
unique-index build failsabort
transient write (pipe/EOF)retry 1s/3s/9s → split batch → single-doc recurse
per-coll stream errorlog, HadErrors=true, continue
re-run of same archivesafe — upsert-by-_id is idempotent

7. Clone Job

Purpose: Copy tenant within or across instances. Thin wrapper: Preflight → Dump → Import.

Entry: migrate.RunClone(ctx, CloneConfig, logger)MigrationReport.

mermaid
flowchart LR
    S[Start] --> PF[PreflightTargetTenantUnique<br/>target code/name free?]
    PF -->|fail| X[Abort]
    PF -->|ok| D[RunDump to temp zip<br/>progress 0 to 50 percent]
    D --> DR{dry-run?}
    DR -->|yes| DONE[Return, keep archive]
    DR -->|no| I[RunImport from temp zip<br/>progress 50 to 100 percent]
    I --> CLN[defer: delete temp zip]
    CLN --> DONE

Why Preflight first: Saves ~minutes of dump work if target already has collision.

ReuseExistingUsers matters here for same-DB clones (source and target in same Mongo) — without it, user docs would collide on email unique index.


8. Delete Job

Purpose: Remove all tenant data. Always preceded by safety dump (when not dry-run).

Entry: migrate.RunDelete(ctx, DeleteConfig, logger)DeleteReport.

Flow

mermaid
flowchart TD
    S[Start] --> SD{dry-run?}
    SD -->|no| DUMP[Safety dump<br/>via runDumpForTenant]
    SD -->|yes| L[List collections]
    DUMP --> L
    L --> LOOP[For each coll]
    LOOP --> DEC{classify}
    DEC -->|system prefix| SK[Skip]
    DEC -->|excluded list| SK
    DEC -->|tenant-ns prefix| DROP[Drop collection]
    DEC -->|user / user-session| UPD[UpdateMany:<br/>$pull tenantIDs<br/>$unset byTenant.code<br/>then delete orphans]
    DEC -->|other| DEL{dry-run?}
    DEL -->|yes| CNT[CountDocuments]
    DEL -->|no| DM[DeleteMany CollectionFilter]
    DROP --> NX[Next]
    UPD --> NX
    SK --> NX
    CNT --> NX
    DM --> NX
    NX --> MORE{more?}
    MORE -->|yes| LOOP
    MORE -->|no| VER{Verify flag?}
    VER -->|yes| V[RunVerifyWithDB<br/>embed in report]
    VER -->|no| R[DeleteReport]
    V --> R

Orphan-User Cleanup

After $pull/$unset on user coll:

js
db.user.deleteMany({
  $and: [
    { $or: [{ tenantIDs: { $size: 0 } }, { tenantIDs: { $exists: false } }] },
    { username: { $ne: "dev" } }
  ]
})

dev account preserved (bootstrap).

Idempotency

  • $pull / $unset → no-op if already absent.
  • DeleteMany with filter → no-op if no matches.
  • Re-running delete is safe.

9. Verify Job

Purpose: Read-only scan — confirm no tenant data remains.

Entry: migrate.RunVerify or RunVerifyWithDB.

mermaid
flowchart TD
    S[Start] --> L[List collections]
    L --> LOOP[For each non-system coll]
    LOOP --> D{tenant-ns prefix?}
    D -->|yes| C1[Count all docs<br/>finding: coll should be dropped]
    D -->|no| C2[CountDocuments CollectionFilter]
    C1 --> CH{count non-zero?}
    C2 --> CH
    CH -->|yes| F[Add finding<br/>Passed=false]
    CH -->|no| NX[Next]
    F --> NX
    NX --> MORE{more?}
    MORE -->|yes| LOOP
    MORE -->|no| R[VerifyReport]

Passed = true only when zero findings.


10. Worker / Executor Wiring

Poller: internal/worker/migration_runner.go — 30s tick.

mermaid
flowchart TD
    T[Ticker 30s] --> N[repo.NextQueuedJob]
    N -->|none| TR[tracker.Record idle]
    N -->|job| RUN[status=running]
    RUN --> DP{job.Type}
    DP -->|tenant_copy| EC[executeTenantCopy]
    DP -->|tenant_dump| ED[executeTenantDump]
    DP -->|tenant_import| EI[executeTenantImport]
    DP -->|tenant_clone| ECL[executeTenantClone]
    DP -->|tenant_delete| EDL[executeTenantDelete]
    DP -->|instance_replace| ER[executeInstanceReplace]
    EC --> FIN[store artifact<br/>status succeeded/failed<br/>notify]
    ED --> FIN
    EI --> FIN
    ECL --> FIN
    EDL --> FIN
    ER --> FIN

Executor Responsibilities

executor_tenant_dump.go:

  • Resolve tenant + instance URI.
  • runDumpForTenant (shared with delete) → migrate.RunDump.
  • Set job.dump_file_path.

executor_tenant_import.go:

  1. Resolve target URI + DB name.
  2. Read archive _metadata.json.
  3. preflightImportUserIDP — scan active users from archive, apply remap, verify each effective email has a Zitadel identity. Fail job if any missing.
  4. tenantRepo.CreateMinimalTenant (unless dry-run).
  5. migrate.RunImport.
  6. On fail → delete the half-created tenant record.
  7. userRepo.OnboardUsersToTenant — create Console membership rows.
  8. pushTenantStatusToInstance — notify instance cache.

executor_tenant_clone.go: same as import but source reads live DB (not archive) for user scan, and uses tenantRepo.CloneTenantRecord.

executor_tenant_delete.go:

  1. Pre-delete safety dump (via runDumpForTenant).
  2. POST /internal/tenant-deprovision to instance API.
  3. migrate.RunDelete.
  4. userRepo.DeleteAssignmentsByTenant.
  5. tenantRepo.SetStatus(archived).

IDP Preflight (internal/worker/idp_preflight.go)

mermaid
flowchart TD
    S[Archive / source DB] --> E[ReadActiveUserEmails<br/>status=1 only]
    E --> R[ApplyEffectiveEmail<br/>per remap config]
    R --> Q[Query Zitadel<br/>for each effective email]
    Q --> CH{all identities<br/>exist?}
    CH -->|no| FAIL[Fail job BEFORE<br/>creating tenant]
    CH -->|yes| OK[Proceed to import/clone]

Prevents orphaned tenant (half-created when users can't log in).


11. REST Endpoints

Handler: internal/handler/migrations/handler.go.

MethodPathCreatesNotes
POST/api/v1/migrationstenant_copysrc ≠ tgt instance; ≤3 concurrent
POST/api/v1/migrations/tenant-clonetenant_clonedefaults tgt=src
POST/api/v1/migrations/tenant-deletetenant_deletefails if tenant archived
POST/api/v1/migrations/tenant-dumptenant_dump
POST/api/v1/migrations/tenant-importtenant_importmultipart upload, saved to DumpDir
POST/api/v1/migrations/instance-replaceinstance_replace
GET/api/v1/jobs?status=list
GET/api/v1/jobs/:id
GET/api/v1/jobs/:id/logsJobLog[]
GET/api/v1/jobs/:id/artifactsJobArtifact[]
POST/api/v1/jobs/:id/download-token60s, single-use
GET/api/v1/jobs/:id/download?token=serves dump .zip
GET/api/v1/instances/:id/jobsjobs where instance is src/tgt

12. CLI

Each CLI calls the same migrate.Run* function as the executor, so behavior is identical between manual and worker runs.

BinaryFlags (key)
tenant-dump--mongo-uri, --tenant-code, -o, --dry-run, -v, -r
tenant-import-z, --mongo-uri, --tenant-code, --tenant-name, --batch-size, -m, --reuse-existing-users, --dry-run, -r
tenant-delete--mongo-uri, --tenant-code or -f, --dry-run, --verify, -y, -r
tenant-verify--mongo-uri, --tenant-code or -f, -r

See tenant-cli.md and tenant-import-idempotent.md for examples.


13. Per-Document Transform Pipeline (Import Pass 2)

mermaid
flowchart TD
    D[Raw doc from zip] --> IS{tenant-namespaced<br/>collection?}
    IS -->|yes| SKIP_TR[Skip prepare/rewrite<br/>entire coll belongs to tenant]
    IS -->|no| PR[prepareCollectionDoc<br/>prune byTenant to tgt only]
    PR --> TR[rewriteTenantRefs<br/>src code to tgt code]
    SKIP_TR --> PATH{path?}
    TR --> PATH
    PATH -->|A user reuse| A_EM[email remap<br/>reuse decision]
    PATH -->|B skip-list| B_CUS[customer name/code<br/>user-session fresh _id]
    PATH -->|C classified| C_ID[ensureTenantID<br/>_id decision]
    C_ID --> C_OID[rewriteDocObjectIDs<br/>deep-walk idMap]
    C_OID --> C_US[rewriteDocUserRefs<br/>emailMap + userOIDMap]
    A_EM --> BAT[Batch]
    B_CUS --> BAT
    C_US --> BAT
    BAT --> FL{batch full?}
    FL -->|yes path A or B| INS[InsertMany]
    FL -->|yes path C| UPS[upsertBatchDocsWithRetry<br/>ReplaceOne upsert]
    FL -->|no| NXT[Next doc]
    INS --> NXT
    UPS --> NXT

User Ref Rewrite Scope (rewriteDocUserRefs)

  • Tier 1: known email paths (createdBy, updatedBy, + array variants, + nested array variants like checklist[].createdBy).
  • Tier 2: full deep-walk on customFields subtree.
  • Out of scope: free-text fields, partial emails, non-standard OID refs.

14. Retries, Batching, Memory

KnobValue
Batch size1000 docs (configurable)
Classify chunk10 000 src _ids per $in query
IDMap in-mem threshold500 000 entries → spill to bbolt
IDMap read cache100 000 LRU on top of bolt
Write retrytransient only; 1s / 3s / 9s; then split batch; down to single doc
Wire limit48 MB per Mongo request; split-batch handles overflow
Connectionsper-operation open/close
Temp filesdumps → DumpDir or /tmp/console-dumps; clone → /tmp/clone-<oid>.zip, deleted on exit

Transient classification: broken pipe, conn reset, timeout, EOF. Context cancel is NOT transient (user abort stops everything).


15. Full Job Decision Tree (which job should I run?)

Goal ─▶ tenant data out to file ───────────────────────▶ DUMP
Goal ─▶ tenant data in from file ──────────────────────▶ IMPORT
Goal ─▶ same tenant exists twice (new code/name) ──────▶ CLONE (preflight+dump+import)
Goal ─▶ tenant gone, keep safety copy ─────────────────▶ DELETE (safety-dump+drop+$pull)
Goal ─▶ confirm previous delete left nothing ──────────▶ VERIFY
Goal ─▶ copy tenant instance→instance, one shot ───────▶ COPY (tenant_copy executor)
Goal ─▶ migrate whole instance ────────────────────────▶ INSTANCE_REPLACE

16. Error Scenario Summary

ScenarioDumpImportCloneDeleteVerify
Connection failabortabortabortabortabort
Target code/name collisionn/aabort preflightabort preflightn/an/a
Missing IDP identity (preflight)n/aabort before tenant createabortn/an/a
Per-collection errorlog + continuelog + continuelog + continuelog + continuelog + finding
Unique-index build failsn/aabortabortn/an/a
Transient write errorn/aretry + splitretry + splitretry + splitn/a
Dry-runcount onlypreflight skipped, no writesdump real, import skippedcount onlysame (read-only)
Re-run after partial successsame outputidempotent (upsert)idempotent via importidempotent ($pull)same

17. Key Files Index