A model representing a migration to cloud. | (ns metabase.cloud-migration.models.cloud-migration (:require [clj-http.client :as http] [clojure.java.io :as io] [clojure.set :as set] [metabase.cloud-migration.settings :as cloud-migration.settings] [metabase.cmd.copy :as copy] [metabase.cmd.dump-to-h2 :as dump-to-h2] [metabase.config :as config] [metabase.db :as mdb] [metabase.models.interface :as mi] [metabase.models.setting.cache :as setting.cache] [metabase.util :as u] [metabase.util.i18n :refer [tru]] [metabase.util.json :as json] [metabase.util.log :as log] [methodical.core :as methodical] [toucan2.core :as t2] [toucan2.pipeline :as t2.pipeline]) (:import (java.io File InputStream) (org.apache.commons.io.input BoundedInputStream))) |
(set! *warn-on-reflection* true) | |
(doto :model/CloudMigration (derive :metabase/model) (derive :hook/timestamped?)) | |
(methodical/defmethod t2/table-name :model/CloudMigration [_model] :cloud_migration) | |
(t2/deftransforms :model/CloudMigration
{:state mi/transform-keyword}) | |
(def ^:private read-only-mode-inclusions
(->> copy/entities (map t2/table-name) (into #{}))) | |
(def ^:private read-only-mode-exceptions
(update-keys {;; Migrations need to update their own state
:model/CloudMigration :all :model/Setting :all
;; Users need to login, make queries, and we need need to audit them.
:model/User :all :model/Session :all :model/LoginHistory :all
:model/UserParameterValue :all
:model/AuditLog :all :model/ViewLog :all
;; Cards need to able to update their last used at timestamp, but we don't want to create
;; new cards or update other fields.
:model/Card #{:id :last_used_at :updated_at}}
;; These exceptions use table name instead of model name because you can actually bypass the model
;; and write toucan2 functions that interact with table directly.
t2/table-name)) | |
(defn- update-exempted?
[table-name {:keys [changes]}]
(or (= (read-only-mode-exceptions table-name) :all)
(set/subset? (read-only-mode-exceptions table-name)
(->> changes keys (into #{}))))) | |
Block write calls to most tables in read-only mode. | (methodical/defmethod t2.pipeline/build :before [#_query-type :toucan.statement-type/DML
#_model :default
#_resolved-query :default]
[_query-type model parsed-args resolved-query]
(let [table-name (t2/table-name model)]
(when (and (cloud-migration.settings/read-only-mode)
(read-only-mode-inclusions table-name)
(not (update-exempted? table-name parsed-args)))
(throw (ex-info (tru "Metabase is in read-only-mode mode!")
{:status-code 403}))))
resolved-query) |
Helpers | |
Store API URL for migrations. | (defn migration-url ([] (str (cloud-migration.settings/store-api-url) "/api/v2/migration")) ([external-id path] (str (migration-url) "/" external-id path))) |
Cloud migration states that are terminal. | (def terminal-states
#{:done :error :cancelled}) |
EXPERIMENTAL Returns true if this metabase instance is part of a cluster. Works by checking how many Quartz nodes there are. See https://github.com/quartz-scheduler/quartz/issues/733 | (defn cluster?
[]
(>= (t2/count (if (= (mdb/db-type) :postgres)
"qrtz_scheduler_state"
"QRTZ_SCHEDULER_STATE"))
2)) |
File input stream that calls on-percent-progress with current read progress as int from 0 to 100. | (defn- progress-file-input-stream
^InputStream [^File file on-percent-progress]
(let [input-stream (io/input-stream file)
length (.length file)
*bytes (atom 0)
add-bytes #(on-percent-progress (int (* 100 (/ (swap! *bytes + %) length))))
f (fn [ret & single?]
(cond
;; -1 is end of stream
(= -1 ret) (on-percent-progress 100)
single? (add-bytes 1)
(int? ret) (add-bytes ret))
ret)]
(proxy [InputStream] []
(read
([]
(f (.read input-stream) true))
([^bytes b]
(f (.read input-stream b)))
([^bytes b off len]
(f (.read input-stream b off len))))
(close [] (.close input-stream))))) |
Attempt to set id to state and progress. Throws if the migration has already reached a terminal state (e.g. cancelled). This is the main cluster coordination mechanism for migrations, since any instance can cancel the migration, not just the one that initiated it. | (defn- set-progress
[id state progress]
(when (= 0 (t2/update! :model/CloudMigration :id id :state [:not-in terminal-states]
{:state state :progress progress}))
(throw (ex-info "Cannot update migration in terminal state" {:terminal true})))) |
Returns absolute progress from a relative progress. E.g. if you're at relative 50 from 51 to 99, that's absolute 75. | (defn abs-progress [relative-progress from to] (int (+ from (* (- to from) (/ relative-progress 100))))) |
Like subs, but for input-streams. Returns a sub-stream that should be used inside with-open. | (defn sub-stream [^InputStream stream start end] (.skip stream start) (BoundedInputStream. stream (- end start))) |
Put file, whole or from start to end, on url, reporting on-progress. Retries up to 3 times. | (defn- put-file
[url ^File file on-progress & {:keys [headers start end]}]
(u/auto-retry 3
(with-open [file-stream (progress-file-input-stream file on-progress)]
(let [[stream length] (if (and start end)
[(sub-stream file-stream start end) (- end start)]
[file-stream (.length file)])]
(http/put url {:headers headers :length length :body stream}))))) |
~100mb | (def ^:private part-size 100e6) |
(defn- upload [{:keys [id external_id upload_url]} ^File dump-file]
(let [;; memoize so we don't have repeats for each percent and don't go back on retries
set-progress-memo (memoize set-progress)
;; 50 is set before we start the upload
on-progress #(set-progress-memo id :upload (abs-progress % 51 99))
;; the migration-dump-file setting is used for testing older dumps in the rich comment
;; at the end of this file
file (if (cloud-migration.settings/migration-dump-file)
(io/file (cloud-migration.settings/migration-dump-file))
dump-file)
file-length (.length file)]
(if-not (> file-length part-size)
;; single put uses SSE, but multipart doesn't support it.
(put-file upload_url file on-progress :headers {"x-amz-server-side-encryption" "aws:kms"})
(let [parts
(partition 2 1 (-> (range 0 file-length part-size)
vec
(conj file-length)))
{:keys [multipart-upload-id multipart-urls]}
(-> (http/put (migration-url external_id "/multipart")
{:form-params {:part_count (count parts)}
:content-type :json})
:body
json/decode+kw)
etags
(->> (map (fn [[start end] [part-number url]]
[part-number
(get-in (put-file url file on-progress :start start :end end)
[:headers "ETag"])])
parts multipart-urls)
(into {}))]
(http/put (migration-url external_id "/multipart/complete")
{:form-params {:multipart_upload_id multipart-upload-id
:multipart_etags etags}
:content-type :json}))))) | |
Migrate this instance to Metabase Cloud. Will exit early if migration has been cancelled in any cluster instance. Should run in a separate thread since it can take a long time to complete. | (defn migrate!
[{:keys [id external_id] :as migration} & {:keys [retry?]}]
;; dump-to-h2 starts behaving oddly if you try to dump repeatly to the same file
;; in the same process, so use a random name.
;; The docker image process runs in non-root, so write to a dir it can access.
(let [dump-file (io/file (System/getProperty "java.io.tmpdir")
(str "cloud_migration_dump_" (random-uuid) ".mv.db"))]
(try
(when retry?
(t2/update! :model/CloudMigration :id id {:state :init}))
(log/info "Setting read-only mode")
(set-progress id :setup 1)
(cloud-migration.settings/read-only-mode! true)
(when (cluster?)
(log/info "Cluster detected, waiting for read-only mode to propagate")
(Thread/sleep (int (* 1.5 setting.cache/cache-update-check-interval-ms))))
(log/info "Dumping h2 backup to" (.getAbsolutePath dump-file))
(set-progress id :dump 20)
(dump-to-h2/dump-to-h2! (.getAbsolutePath dump-file) {:dump-plaintext? true})
(when-not (cloud-migration.settings/read-only-mode)
(throw (ex-info "Read-only mode disabled before h2 dump was completed, contents might not be self-consistent!"
{:id id})))
(cloud-migration.settings/read-only-mode! false)
(log/info "Uploading dump to store")
(set-progress id :upload 50)
(upload migration dump-file)
(log/info "Notifying store that upload is done")
(http/put (migration-url external_id "/uploaded"))
(log/info "Migration finished")
(set-progress id :done 100)
(catch Exception e
;; See set-progress for when :terminal is set.
(if (-> e ex-data :terminal)
(log/info "Migration interruped due to terminal state")
(do
(t2/update! :model/CloudMigration id {:state :error})
(log/info "Migration failed")
(throw (ex-info "Error performing migration" {:error e})))))
(finally
(cloud-migration.settings/read-only-mode! false)
(io/delete-file dump-file :silently))))) |
Calls Store and returns {:externalid ,,, :uploadurl ,,,}. | (defn get-store-migration
[]
(-> (migration-url)
(http/post {:form-params {:local_mb_version (or (cloud-migration.settings/migration-dump-version)
(config/mb-version-info :tag))}
:content-type :json})
:body
json/decode+kw
(select-keys [:id :upload_url])
(set/rename-keys {:id :external_id}))) |
(comment
;; are we in read-only-mode ?
(read-only-mode)
;; test settings you might want to change manually
;; force prod if even in dev
#_(migration-use-staging! false)
;; make sure to use a version that store supports, and a dump for that version.
#_(migration-dump-version! "v0.49.7")
;; make a new dump with any released metabase jar using the command below:
;; java --add-opens java.base/java.nio=ALL-UNNAMED -jar metabase.jar dump-to-h2 dump --dump-plaintext
#_(migration-dump-file! "/path/to/dump.mv.db")
;; force migration with a smaller multipart threshold (~6mb is minimum)
#_(def ^:private part-size 6e6)
;; add new
(t2/insert-returning-instance! :model/CloudMigration (get-store-migration))
;; get migration
@(def mig (t2/select-one :model/CloudMigration {:order-by [[:created_at :desc]]}))
;; migrate
(migrate! mig)
;; retry failed migration
(migrate! mig :retry? true)
;; cancel all
(t2/update! :model/CloudMigration {:state [:not-in terminal-states]} {:state :cancelled})) | |