A model representing a migration to cloud.

(ns metabase.cloud-migration.models.cloud-migration
  (:require
   [clj-http.client :as http]
   [clojure.java.io :as io]
   [clojure.set :as set]
   [metabase.cloud-migration.settings :as cloud-migration.settings]
   [metabase.cmd.copy :as copy]
   [metabase.cmd.dump-to-h2 :as dump-to-h2]
   [metabase.config :as config]
   [metabase.db :as mdb]
   [metabase.models.interface :as mi]
   [metabase.models.setting.cache :as setting.cache]
   [metabase.util :as u]
   [metabase.util.i18n :refer [tru]]
   [metabase.util.json :as json]
   [metabase.util.log :as log]
   [methodical.core :as methodical]
   [toucan2.core :as t2]
   [toucan2.pipeline :as t2.pipeline])
  (:import
   (java.io File InputStream)
   (org.apache.commons.io.input BoundedInputStream)))
(set! *warn-on-reflection* true)
(doto :model/CloudMigration
  (derive :metabase/model)
  (derive :hook/timestamped?))
(methodical/defmethod t2/table-name :model/CloudMigration [_model] :cloud_migration)
(t2/deftransforms :model/CloudMigration
  {:state mi/transform-keyword})
(def ^:private read-only-mode-inclusions
  (->> copy/entities (map t2/table-name) (into #{})))
(def ^:private read-only-mode-exceptions
  (update-keys {;; Migrations need to update their own state
                :model/CloudMigration :all :model/Setting :all
                ;; Users need to login, make queries, and we need need to audit them.
                :model/User :all :model/Session :all :model/LoginHistory :all
                :model/UserParameterValue :all
                :model/AuditLog :all :model/ViewLog :all
                ;; Cards need to able to update their last used at timestamp, but we don't want to create
                ;; new cards or update other fields.
                :model/Card #{:id :last_used_at :updated_at}}
               ;; These exceptions use table name instead of model name because you can actually bypass the model
               ;; and write toucan2 functions that interact with table directly.
               t2/table-name))
(defn- update-exempted?
  [table-name {:keys [changes]}]
  (or (=  (read-only-mode-exceptions table-name) :all)
      (set/subset? (read-only-mode-exceptions table-name)
                   (->> changes keys (into #{})))))

Block write calls to most tables in read-only mode.

(methodical/defmethod t2.pipeline/build :before [#_query-type     :toucan.statement-type/DML
                                                 #_model          :default
                                                 #_resolved-query :default]
  [_query-type model parsed-args resolved-query]
  (let [table-name (t2/table-name model)]
    (when (and (cloud-migration.settings/read-only-mode)
               (read-only-mode-inclusions table-name)
               (not (update-exempted? table-name parsed-args)))
      (throw (ex-info (tru "Metabase is in read-only-mode mode!")
                      {:status-code 403}))))
  resolved-query)

Helpers

Store API URL for migrations.

(defn migration-url
  ([]
   (str (cloud-migration.settings/store-api-url) "/api/v2/migration"))
  ([external-id path]
   (str (migration-url) "/" external-id path)))

Cloud migration states that are terminal.

(def terminal-states
  #{:done :error :cancelled})

EXPERIMENTAL Returns true if this metabase instance is part of a cluster. Works by checking how many Quartz nodes there are. See https://github.com/quartz-scheduler/quartz/issues/733

(defn cluster?
  []
  (>= (t2/count (if (= (mdb/db-type) :postgres)
                  "qrtz_scheduler_state"
                  "QRTZ_SCHEDULER_STATE"))
      2))

File input stream that calls on-percent-progress with current read progress as int from 0 to 100.

(defn- progress-file-input-stream
  ^InputStream [^File file on-percent-progress]
  (let [input-stream             (io/input-stream file)
        length                   (.length file)
        *bytes                   (atom 0)
        add-bytes                #(on-percent-progress (int (* 100 (/ (swap! *bytes + %) length))))
        f                        (fn [ret & single?]
                                   (cond
                                     ;; -1 is end of stream
                                     (= -1 ret)  (on-percent-progress 100)
                                     single?     (add-bytes 1)
                                     (int? ret)  (add-bytes ret))
                                   ret)]
    (proxy [InputStream] []
      (read
        ([]
         (f (.read input-stream) true))
        ([^bytes b]
         (f (.read input-stream b)))
        ([^bytes b off len]
         (f (.read input-stream b off len))))
      (close [] (.close input-stream)))))

Attempt to set id to state and progress. Throws if the migration has already reached a terminal state (e.g. cancelled). This is the main cluster coordination mechanism for migrations, since any instance can cancel the migration, not just the one that initiated it.

(defn- set-progress
  [id state progress]
  (when (= 0 (t2/update! :model/CloudMigration :id id :state [:not-in terminal-states]
                         {:state state :progress progress}))
    (throw (ex-info "Cannot update migration in terminal state" {:terminal true}))))

Returns absolute progress from a relative progress. E.g. if you're at relative 50 from 51 to 99, that's absolute 75.

(defn abs-progress
  [relative-progress from to]
  (int (+ from (* (- to from) (/ relative-progress 100)))))

Like subs, but for input-streams. Returns a sub-stream that should be used inside with-open.

(defn sub-stream
  [^InputStream stream start end]
  (.skip stream start)
  (BoundedInputStream. stream (- end start)))

Put file, whole or from start to end, on url, reporting on-progress. Retries up to 3 times.

(defn- put-file
  [url ^File file on-progress & {:keys [headers start end]}]
  (u/auto-retry 3
    (with-open [file-stream (progress-file-input-stream file on-progress)]
      (let [[stream length] (if (and start end)
                              [(sub-stream file-stream start end) (- end start)]
                              [file-stream (.length file)])]
        (http/put url {:headers headers :length length :body stream})))))

~100mb

(def ^:private part-size 100e6)
(defn- upload [{:keys [id external_id upload_url]} ^File dump-file]
  (let [;; memoize so we don't have repeats for each percent and don't go back on retries
        set-progress-memo (memoize set-progress)
        ;; 50 is set before we start the upload
        on-progress       #(set-progress-memo id :upload (abs-progress % 51 99))
        ;; the migration-dump-file setting is used for testing older dumps in the rich comment
        ;; at the end of this file
        file              (if (cloud-migration.settings/migration-dump-file)
                            (io/file (cloud-migration.settings/migration-dump-file))
                            dump-file)
        file-length       (.length file)]
    (if-not (> file-length part-size)
      ;; single put uses SSE, but multipart doesn't support it.
      (put-file upload_url file on-progress :headers {"x-amz-server-side-encryption" "aws:kms"})
      (let [parts
            (partition 2 1 (-> (range 0 file-length part-size)
                               vec
                               (conj file-length)))
            {:keys [multipart-upload-id multipart-urls]}
            (-> (http/put (migration-url external_id "/multipart")
                          {:form-params  {:part_count (count parts)}
                           :content-type :json})
                :body
                json/decode+kw)
            etags
            (->> (map (fn [[start end] [part-number url]]
                        [part-number
                         (get-in (put-file url file on-progress :start start :end end)
                                 [:headers "ETag"])])
                      parts multipart-urls)
                 (into {}))]
        (http/put (migration-url external_id "/multipart/complete")
                  {:form-params  {:multipart_upload_id multipart-upload-id
                                  :multipart_etags     etags}
                   :content-type :json})))))

Migrate this instance to Metabase Cloud. Will exit early if migration has been cancelled in any cluster instance. Should run in a separate thread since it can take a long time to complete.

(defn migrate!
  [{:keys [id external_id] :as migration} & {:keys [retry?]}]
  ;; dump-to-h2 starts behaving oddly if you try to dump repeatly to the same file
  ;; in the same process, so use a random name.
  ;; The docker image process runs in non-root, so write to a dir it can access.
  (let [dump-file (io/file (System/getProperty "java.io.tmpdir")
                           (str "cloud_migration_dump_" (random-uuid) ".mv.db"))]
    (try
      (when retry?
        (t2/update! :model/CloudMigration :id id {:state :init}))
      (log/info "Setting read-only mode")
      (set-progress id :setup 1)
      (cloud-migration.settings/read-only-mode! true)
      (when (cluster?)
        (log/info "Cluster detected, waiting for read-only mode to propagate")
        (Thread/sleep (int (* 1.5 setting.cache/cache-update-check-interval-ms))))
      (log/info "Dumping h2 backup to" (.getAbsolutePath dump-file))
      (set-progress id :dump 20)
      (dump-to-h2/dump-to-h2! (.getAbsolutePath dump-file) {:dump-plaintext? true})
      (when-not (cloud-migration.settings/read-only-mode)
        (throw (ex-info "Read-only mode disabled before h2 dump was completed, contents might not be self-consistent!"
                        {:id id})))
      (cloud-migration.settings/read-only-mode! false)
      (log/info "Uploading dump to store")
      (set-progress id :upload 50)
      (upload migration dump-file)
      (log/info "Notifying store that upload is done")
      (http/put (migration-url external_id "/uploaded"))
      (log/info "Migration finished")
      (set-progress id :done 100)
      (catch Exception e
        ;; See set-progress for when :terminal is set.
        (if (-> e ex-data :terminal)
          (log/info "Migration interruped due to terminal state")
          (do
            (t2/update! :model/CloudMigration id {:state :error})
            (log/info "Migration failed")
            (throw (ex-info "Error performing migration" {:error e})))))
      (finally
        (cloud-migration.settings/read-only-mode! false)
        (io/delete-file dump-file :silently)))))

Calls Store and returns {:externalid ,,, :uploadurl ,,,}.

(defn get-store-migration
  []
  (-> (migration-url)
      (http/post {:form-params  {:local_mb_version (or (cloud-migration.settings/migration-dump-version)
                                                       (config/mb-version-info :tag))}
                  :content-type :json})
      :body
      json/decode+kw
      (select-keys [:id :upload_url])
      (set/rename-keys {:id :external_id})))
(comment
  ;; are we in read-only-mode ?
  (read-only-mode)
  ;; test settings you might want to change manually
  ;; force prod if even in dev
  #_(migration-use-staging! false)
  ;; make sure to use a version that store supports, and a dump for that version.
  #_(migration-dump-version! "v0.49.7")
  ;; make a new dump with any released metabase jar using the command below:
  ;;   java --add-opens java.base/java.nio=ALL-UNNAMED -jar metabase.jar dump-to-h2 dump --dump-plaintext
  #_(migration-dump-file! "/path/to/dump.mv.db")
  ;; force migration with a smaller multipart threshold (~6mb is minimum)
  #_(def ^:private part-size 6e6)
  ;; add new
  (t2/insert-returning-instance! :model/CloudMigration (get-store-migration))
  ;; get migration
  @(def mig (t2/select-one :model/CloudMigration {:order-by [[:created_at :desc]]}))
  ;; migrate
  (migrate! mig)
  ;; retry failed migration
  (migrate! mig :retry? true)
  ;; cancel all
  (t2/update! :model/CloudMigration {:state [:not-in terminal-states]} {:state :cancelled}))