(ns metabase.driver.mysql.ddl (:require [clojure.core.async :as a] [clojure.string :as str] [honey.sql :as sql] [java-time.api :as t] [metabase.driver.ddl.interface :as ddl.i] [metabase.driver.sql-jdbc.connection :as sql-jdbc.conn] [metabase.driver.sql-jdbc.execute :as sql-jdbc.execute] [metabase.driver.sql.ddl :as sql.ddl] [metabase.public-settings :as public-settings] [metabase.query-processor.compile :as qp.compile] [metabase.util.i18n :refer [trs]] [metabase.util.log :as log]) (:import (java.sql SQLNonTransientConnectionException))) | |
(set! *warn-on-reflection* true) | |
(defn- exec-async [driver conn-chan db-spec sql+params]
(a/thread
(sql-jdbc.execute/do-with-connection-with-options
driver
db-spec
{:write? true}
(fn [^java.sql.Connection conn]
(try
(let [pid (:pid (first (sql.ddl/jdbc-query conn ["select connection_id() pid"])))]
(a/put! conn-chan pid)
(sql.ddl/jdbc-query conn sql+params))
(catch SQLNonTransientConnectionException _e
;; Our connection may be killed due to timeout, [[kill!]] will throw an appropriate exception
nil)
(catch Exception e
(log/warn e)
e)))))) | |
(defn- kill! [conn pid]
(let [results (sql.ddl/jdbc-query conn ["show processlist"])
result? (some (fn [r]
(and (= (:id r) pid)
(str/starts-with? (or (:info r) ) "-- Metabase")))
results)]
(when result?
;; Can't use a prepared parameter with these statements
(sql.ddl/execute! conn [(str "kill " pid)])
(throw (Exception. (trs "Killed MySQL process id {0} due to timeout." pid)))))) | |
Spins up another channel to execute the statement.
If | (defn- execute-with-timeout!
[driver conn db-spec timeout-ms sql+params]
(let [conn-chan (a/promise-chan)
exec-chan (exec-async driver conn-chan db-spec sql+params)
pid (a/<!! conn-chan)
_ (a/close! conn-chan)
timeout-chan (a/timeout timeout-ms)
[v port] (a/alts!! [timeout-chan exec-chan])]
(a/close! exec-chan)
(cond
(= port timeout-chan) (kill! conn pid)
(= port exec-chan) (if (instance? Exception v)
(throw v)
v)))) |
(defmethod ddl.i/refresh! :mysql
[driver database definition dataset-query]
(let [{:keys [query params]} (qp.compile/compile dataset-query)
db-spec (sql-jdbc.conn/db->pooled-connection-spec database)]
(sql-jdbc.execute/do-with-connection-with-options
driver
database
{:write? true}
(fn [conn]
(sql.ddl/execute! conn [(sql.ddl/drop-table-sql database (:table-name definition))])
;; It is possible that this fails and rollback would not restore the table.
;; That is ok, the persisted-info will be marked inactive and the next refresh will try again.
(execute-with-timeout! driver
conn
db-spec
(.toMillis (t/minutes 10))
(into [(sql.ddl/create-table-sql database definition query)] params))
{:state :success})))) | |
(defmethod ddl.i/unpersist! :mysql
[driver database persisted-info]
(sql-jdbc.execute/do-with-connection-with-options
driver
database
{:write? true}
(fn [^java.sql.Connection conn]
(try
(sql.ddl/execute! conn [(sql.ddl/drop-table-sql database (:table_name persisted-info))])
(catch Exception e
(log/warn e)
(throw e)))))) | |
(defmethod ddl.i/check-can-persist :mysql
[{driver :engine, :as database}]
(let [schema-name (ddl.i/schema-name database (public-settings/site-uuid))
table-name (format "persistence_check_%s" (rand-int 10000))
db-spec (sql-jdbc.conn/db->pooled-connection-spec database)
steps [[:persist.check/create-schema
(fn check-schema [conn]
(let [existing-schemas (->> ["select schema_name from information_schema.schemata"]
(sql.ddl/jdbc-query conn)
(map :schema_name)
(into #{}))]
(or (contains? existing-schemas schema-name)
(sql.ddl/execute! conn [(sql.ddl/create-schema-sql database)]))))
(fn undo-check-schema [conn]
(sql.ddl/execute! conn [(sql.ddl/drop-schema-sql database)]))]
[:persist.check/create-table
(fn create-table [conn]
(execute-with-timeout! driver
conn
db-spec
(.toMillis (t/minutes 10))
[(sql.ddl/create-table-sql
database
{:table-name table-name
:field-definitions [{:field-name "field"
:base-type :type/Text}]}
"select 1")]))
(fn undo-create-table [conn]
(sql.ddl/execute! conn [(sql.ddl/drop-table-sql database table-name)]))]
[:persist.check/read-table
(fn read-table [conn]
(sql.ddl/jdbc-query conn [(format "select * from %s.%s"
schema-name table-name)]))
(constantly nil)]
[:persist.check/delete-table
(fn delete-table [conn]
(sql.ddl/execute! conn [(sql.ddl/drop-table-sql database table-name)]))
;; This will never be called, if the last step fails it does not need to be undone
(constantly nil)]
[:persist.check/create-kv-table
(fn create-kv-table [conn]
(sql.ddl/execute! conn [(format "drop table if exists %s.cache_info"
schema-name)])
(sql.ddl/execute! conn (sql/format
(ddl.i/create-kv-table-honey-sql-form schema-name)
{:dialect :mysql})))]
[:persist.check/populate-kv-table
(fn create-kv-table [conn]
(sql.ddl/execute! conn (sql/format
(ddl.i/populate-kv-table-honey-sql-form
schema-name)
{:dialect :mysql})))]]]
;; Unlike postgres, mysql ddl clauses will not rollback in a transaction.
;; So we keep track of undo-steps to manually rollback previous, completed steps.
(sql-jdbc.execute/do-with-connection-with-options
driver
db-spec
{:write? true}
(fn [conn]
(loop [[[step stepfn undofn] & remaining] steps
undo-steps []]
(let [result (try (stepfn conn)
(log/infof "Step %s was successful for db %s" step (:name database))
::valid
(catch Exception e
(log/warnf e "Error in `%s` while checking for model persistence permissions." step)
(try
(doseq [[undo-step undofn] (reverse undo-steps)]
(log/warnf "Undoing step `%s` for db %s" undo-step (:name database))
(undofn conn))
(catch Exception _e
(log/warn "Unable to rollback database check for model persistence")))
step))]
(cond (and (= result ::valid) remaining)
(recur remaining (conj undo-steps [step undofn]))
(= result ::valid)
[true :persist.check/valid]
:else
[false step]))))))) | |