(ns metabase.driver.sql-jdbc.actions (:require [clojure.java.jdbc :as jdbc] [clojure.set :as set] [clojure.string :as str] [flatland.ordered.set :as ordered-set] [medley.core :as m] [metabase.actions.core :as actions] [metabase.driver :as driver] [metabase.driver.sql-jdbc.execute :as sql-jdbc.execute] [metabase.driver.sql.query-processor :as sql.qp] [metabase.driver.util :as driver.u] [metabase.legacy-mbql.schema :as mbql.s] [metabase.lib.metadata.protocols :as lib.metadata.protocols] [metabase.lib.schema.actions :as lib.schema.actions] [metabase.lib.schema.common :as lib.schema.common] [metabase.lib.schema.id :as lib.schema.id] [metabase.query-processor.preprocess :as qp.preprocess] [metabase.query-processor.store :as qp.store] [metabase.util :as u] [metabase.util.honey-sql-2 :as h2x] [metabase.util.i18n :refer [tru]] [metabase.util.log :as log] [metabase.util.malli :as mu] [metabase.util.malli.registry :as mr]) (:import (java.sql Connection SQLException))) | |
(set! *warn-on-reflection* true) | |
+----------------------------------------------------------------------------------------------------------------+ | Error handling | +----------------------------------------------------------------------------------------------------------------+ | |
Try to parse the SQL error message returned by JDBC driver. The methods should returns a map of: - type: the error type. Check [[metabase.actions.error]] for the full list - message: a nice message summarized of what went wrong - errors: a map from field-name => sepcific error message. This is used by UI to display per fields error If non per-column error is available, returns an empty map. Or return | (defmulti maybe-parse-sql-error {:changelog-test/ignore true, :arglists '([driver error-type database action-type error-message]), :added "0.48.0"} (fn [driver error-type _database _action-type _error-message] [(driver/dispatch-on-initialized-driver driver) error-type]) :hierarchy #'driver/hierarchy) |
(defmethod maybe-parse-sql-error :default [_driver _error-type _database _e] nil) | |
(defn- parse-sql-error [driver database action-type e] (let [parsers-for-driver (keep (fn [[[method-driver error-type] method]] (when (= method-driver driver) (partial method driver error-type))) (dissoc (methods maybe-parse-sql-error) :default))] (try (some #(% database action-type (ex-message e)) parsers-for-driver) ;; Catch errors in parse-sql-error and log them so more errors in the future don't break the entire action. ;; We'll still get the original unparsed error message. (catch Throwable new-e (log/errorf new-e "Error parsing SQL error message %s: %s" (pr-str (ex-message e)) (ex-message new-e)) nil)))) | |
(defn- do-with-auto-parse-sql-error [driver database action thunk] (try (thunk) (catch SQLException e (throw (ex-info (or (ex-message e) "Error executing action.") (merge (or (some-> (parse-sql-error driver database action e) ;; the columns in error message should match with columns ;; in the parameter. It's usually got from calling ;; GET /api/action/:id/execute, and in there all column names are slugified (m/update-existing :errors update-keys u/slugify)) (assoc (ex-data e) :message (ex-message e))) {:status-code 400})))))) | |
Execute body and if there is an exception, try to parse the error message to search for known sql errors then throw a regular (and easier to understand/process) exception. | (defmacro ^:private with-auto-parse-sql-exception [driver database action-type & body] `(do-with-auto-parse-sql-error ~driver ~database ~action-type (fn [] ~@body))) |
(defn- mbql-query->raw-hsql [driver {database-id :database, :as query}] (qp.store/with-metadata-provider database-id ;; catch errors in the query (qp.preprocess/preprocess query) (sql.qp/mbql->honeysql driver query))) | |
+----------------------------------------------------------------------------------------------------------------+ | Action Execution | +----------------------------------------------------------------------------------------------------------------+ | |
Return a map of [[metabase.types]] type to SQL string type name. Used for casting. Looks like we're just copypasting this from implementations of [[metabase.test.data.sql/field-base-type->sql-type]] so go find that stuff if you need to write more implementations for this. | (defmulti base-type->sql-type-map {:changelog-test/ignore true, :arglists '([driver]), :added "0.44.0"} driver/dispatch-on-initialized-driver :hierarchy #'driver/hierarchy) |
(mu/defn- cast-values :- ::lib.schema.actions/row "Certain value types need to have their honeysql form updated to work properly during update/creation. This function uses honeysql casting to wrap values in the map that need to be cast with their column's type, and passes through types that do not need casting like integer or string." [driver :- :keyword column->value :- ::lib.schema.actions/row database-id :- ::lib.schema.id/database table-id :- ::lib.schema.id/table] (let [type->sql-type (base-type->sql-type-map driver) column->field (actions/cached-value [::cast-values table-id] (fn [] (into {} #_{:clj-kondo/ignore [:deprecated-var]} (map (juxt :name qp.store/->legacy-metadata)) (qp.store/with-metadata-provider database-id (lib.metadata.protocols/fields (qp.store/metadata-provider) table-id)))))] (m/map-kv-vals (fn [col-name value] (let [col-name (u/qualified-name col-name) {base-type :base_type :as field} (get column->field col-name)] (if-let [sql-type (type->sql-type base-type)] (h2x/cast sql-type value) (try (sql.qp/->honeysql driver [:value value field]) (catch Exception e (throw (ex-info (str "column cast failed: " (pr-str col-name)) {:column col-name :status-code 400} e))))))) column->value))) | |
Data warehouse JDBC Connection to use for doing CRUD Actions. Bind this to reuse the same Connection/transaction throughout a single bulk Action. | (def ^:private ^:dynamic ^Connection *connection* nil) |
Impl function for [[with-jdbc-transaction]]. Why not just use [[jdbc/with-db-transaction]] to do this stuff? Why reinvent the wheel? There are a few reasons:
| (defn do-with-jdbc-transaction [database-id f] (if *connection* (f *connection*) (let [driver (driver.u/database->driver database-id)] (sql-jdbc.execute/do-with-connection-with-options driver database-id {:write? true} (fn [^Connection conn] ;; execute inside of a transaction. (.setAutoCommit conn false) (log/tracef "BEGIN transaction on conn %s@0x%s" (.getCanonicalName (class conn)) (System/identityHashCode conn)) (try (let [result (binding [*connection* conn] (f conn))] (log/debug "f completed successfully; committing transaction.") (log/tracef "COMMIT transaction on conn %s@0x%s" (.getCanonicalName (class conn)) (System/identityHashCode conn)) (.commit conn) result) (catch Throwable e (log/debugf "f threw Exception; rolling back transaction. Error: %s" (ex-message e)) (log/tracef "ROLLBACK transaction on conn %s@0x%s" (.getCanonicalName (class conn)) (System/identityHashCode conn)) (.rollback conn) (throw e)))))))) |
Execute | (defmacro with-jdbc-transaction {:style/indent 1} [[connection-binding database-id] & body] `(do-with-jdbc-transaction ~database-id (fn [~(vary-meta connection-binding assoc :tag 'Connection)] ~@body))) |
Multimethod for preparing a honeysql query | (defmulti prepare-query* {:changelog-test/ignore true, :arglists '([driver action hsql-query]), :added "0.46.0"} (fn [driver action _] [(driver/dispatch-on-initialized-driver driver) (keyword action)]) :hierarchy #'driver/hierarchy) |
(defmethod prepare-query* :default [_driver _action hsql-query] hsql-query) | |
(defn- prepare-query [hsql-query driver action] (prepare-query* driver action hsql-query)) | |
(defmethod actions/perform-action!* [:sql-jdbc :row/delete] [driver action database {database-id :database, :as query}] (let [raw-hsql (mbql-query->raw-hsql driver query) delete-hsql (-> raw-hsql (dissoc :select) (assoc :delete []) (prepare-query driver action)) sql-args (sql.qp/format-honeysql driver delete-hsql)] (with-jdbc-transaction [conn database-id] ;; TODO -- this should probably be using [[metabase.driver/execute-write-query!]] (let [rows-deleted (with-auto-parse-sql-exception driver database action (first (jdbc/execute! {:connection conn} sql-args {:transaction? false})))] (when-not (= rows-deleted 1) (throw (ex-info (if (zero? rows-deleted) (tru "Sorry, the row you''re trying to delete doesn''t exist") (tru "Sorry, this would delete {0} rows, but you can only act on 1" rows-deleted)) {:staus-code 400}))) {:rows-deleted [1]})))) | |
(defmethod actions/perform-action!* [:sql-jdbc :row/update] [driver action database {database-id :database :keys [update-row] :as query}] (let [raw-hsql (mbql-query->raw-hsql driver query) target-table (first (:from raw-hsql)) update-hsql (-> raw-hsql (select-keys [:where]) (assoc :update target-table :set (cast-values driver update-row database-id (get-in query [:query :source-table]))) (prepare-query driver action)) sql-args (sql.qp/format-honeysql driver update-hsql)] (with-jdbc-transaction [conn database-id] ;; TODO -- this should probably be using [[metabase.driver/execute-write-query!]] (let [rows-updated (with-auto-parse-sql-exception driver database action (first (jdbc/execute! {:connection conn} sql-args {:transaction? false})))] (when-not (= rows-updated 1) (throw (ex-info (if (zero? rows-updated) (tru "Sorry, the row you''re trying to update doesn''t exist") (tru "Sorry, this would update {0} rows, but you can only act on 1" rows-updated)) {:staus-code 400}))) {:rows-updated [1]})))) | |
Multimethod for converting the result of an insert into the created row.
| (defmulti select-created-row {:changelog-test/ignore true, :arglists '([driver create-hsql conn result]), :added "0.46.0"} (fn [driver _ _ _] (driver/dispatch-on-initialized-driver driver)) :hierarchy #'driver/hierarchy) |
(mr/def ::created-row [:map-of :string :any]) | |
H2 and MySQL are dumb and | (mu/defmethod select-created-row :default :- [:maybe ::created-row] [driver create-hsql conn result] (let [select-hsql (-> create-hsql (dissoc :insert-into :values) (assoc :select [:*] :from [(:insert-into create-hsql)] ;; :and with a single clause will be optimized in HoneySQL :where (into [:and] (for [[col val] result] [:= (keyword col) val])))) select-sql-args (sql.qp/format-honeysql driver select-hsql)] (log/tracef ":row/create SELECT HoneySQL:\n\n%s" (u/pprint-to-str select-hsql)) (log/tracef ":row/create SELECT SQL + args:\n\n%s" (u/pprint-to-str select-sql-args)) (first (jdbc/query {:connection conn} select-sql-args {:identifiers identity, :transaction? false, :keywordize? false})))) |
(mu/defmethod actions/perform-action!* [:sql-jdbc :row/create] :- [:map [:created-row [:maybe ::created-row]]] [driver action database {database-id :database :keys [create-row] :as query} :- ::mbql.s/Query] (let [raw-hsql (mbql-query->raw-hsql driver query) create-hsql (-> raw-hsql (assoc :insert-into (first (:from raw-hsql))) (assoc :values [(cast-values driver create-row database-id (get-in query [:query :source-table]))]) (dissoc :select :from) (prepare-query driver action)) sql-args (sql.qp/format-honeysql driver create-hsql)] (log/tracef ":row/create HoneySQL:\n\n%s" (u/pprint-to-str create-hsql)) (log/tracef ":row/create SQL + args:\n\n%s" (u/pprint-to-str sql-args)) (with-jdbc-transaction [conn database-id] (let [result (with-auto-parse-sql-exception driver database action (jdbc/execute! {:connection conn} sql-args {:return-keys true :identifiers identity :transaction? false :keywordize? false})) _ (log/tracef ":row/create INSERT returned\n\n%s" (u/pprint-to-str result)) row (select-created-row driver create-hsql conn result)] (log/tracef ":row/create returned row %s" (pr-str row)) {:created-row row})))) | |
Bulk actions | |
Execute Why do we need this? With things like bulk insert, we want to collect all the errors for all the rows in one go. Say you have 4 rows, 1 2 3 and 4. If 1 errors then depending on the DBMS, the transaction enters an error state that disallows doing anything else. 2, 3, and 4 will error with a "transaction has been aborted" error that you can't clear (AFAIK). This affects Postgres but not H2. Not sure about other DBs yet. Without using nested transactions, if you have errors in rows 2 and 4 you'd only see the error in row 2 since 3 and 4 would fail with "transaction has been aborted" or whatever. So the point of using nested transactions is that if 2 is done inside a nested transaction we can rollback the nested transaction which allows the top-level transaction to proceed even tho part of it errored. | (defmulti do-nested-transaction {:changelog-test/ignore true :arglists '([driver ^java.sql.Connection connection thunk]), :added "0.44.0"} driver/dispatch-on-initialized-driver :hierarchy #'driver/hierarchy) |
(defn- perform-bulk-action-with-repeated-single-row-actions! [{:keys [driver database action rows xform] :or {xform identity}}] (assert (seq rows)) (with-jdbc-transaction [conn (u/the-id database)] (transduce (comp xform (m/indexed)) (fn ([] [[] []]) ([[errors successes]] (when (seq errors) (.rollback conn)) [errors successes]) ([[errors successes] [row-index arg-map]] (try (let [result (do-nested-transaction driver conn (fn [] (actions/perform-action!* driver action database arg-map)))] [errors (conj successes result)]) (catch Throwable e [(conj errors {:index row-index, :error (ex-message e)}) successes])))) rows))) | |
| |
(mu/defmethod actions/perform-action!* [:sql-jdbc :bulk/create] [driver :- :keyword _action database :- [:map [:id ::lib.schema.id/database]] {:keys [table-id rows]} :- [:map [:table-id ::lib.schema.id/table] [:rows [:sequential ::lib.schema.actions/row]]]] (log/tracef "Inserting %d rows" (count rows)) (perform-bulk-action-with-repeated-single-row-actions! {:driver driver :database database :action :row/create :rows rows :xform (comp (map (fn [row] {:database (u/the-id database) :type :query :query {:source-table table-id} :create-row row})) #(completing % (fn [[errors successes]] (when (seq errors) (throw (ex-info (tru "Error(s) inserting rows.") {:status-code 400, :errors errors}))) {:created-rows (map :created-row successes)})))})) | |
Shared stuff for both | |
(mu/defn- table-id->pk-field-name->id :- [:map-of ::lib.schema.common/non-blank-string ::lib.schema.id/field] "Given a `table-id` return a map of string Field name -> Field ID for the primary key columns for that Table." [database-id :- ::lib.schema.id/database table-id :- ::lib.schema.id/table] (into {} (comp (filter (fn [{:keys [semantic-type], :as _field}] (isa? semantic-type :type/PK))) (map (juxt :name :id))) (qp.store/with-metadata-provider database-id (lib.metadata.protocols/fields (qp.store/metadata-provider) table-id)))) | |
Given [[field-name->id]] as returned by [[table-id->pk-field-name->id]] or similar and a | (defn- row->mbql-filter-clause [field-name->id row] (when (empty? row) (throw (ex-info (tru "Cannot build filter clause: row cannot be empty.") {:field-name->id field-name->id, :row row, :status-code 400}))) (into [:and] (for [[field-name value] row :let [field-id (get field-name->id (u/qualified-name field-name)) ;; if the field isn't in `field-name->id` then it's an error in our code. Not ;; i18n'ed because this is not something that should be User facing unless our ;; backend code is broken. ;; ;; Unknown column names in user input WILL NOT trigger this error. ;; [[row->mbql-filter-clause]] is only used for *known* PK columns that are ;; used for the MBQL `:filter` clause. Unknown columns will trigger an error in ;; the DW but not here. _ (assert field-id (format "Field %s is not present in field-name->id map" (pr-str field-name)))]] [:= [:field field-id nil] value]))) |
| |
Make sure all | (defn- check-rows-have-expected-columns-and-no-other-keys [rows expected-columns] ;; we only actually need to check the first map since [[check-consistent-row-keys]] should have checked that ;; they all have the same keys. (let [expected-columns (set expected-columns) actual-columns (set (keys (first rows)))] (when-not (= actual-columns expected-columns) (throw (ex-info (tru "Rows have the wrong columns: expected {0}, but got {1}" expected-columns actual-columns) {:status-code 400, :expected-columns expected-columns, :actual-columns actual-columns}))))) |
Make sure all | (defn- check-consistent-row-keys [rows] (let [all-row-column-sets (reduce (fn [seen-set row] (conj seen-set (set (keys row)))) #{} rows)] (when (> (count all-row-column-sets) 1) (throw (ex-info (tru "Some rows have different sets of columns: {0}" (str/join ", " (map pr-str all-row-column-sets))) {:status-code 400, :column-sets all-row-column-sets}))))) |
Make sure all | (defn- check-unique-rows [rows] (when-let [repeats (not-empty (into ;; ordered set so the results are deterministic for test purposes (ordered-set/ordered-set) (filter (fn [[_row repeat-count]] (> repeat-count 1))) (frequencies rows)))] (throw (ex-info (tru "Rows need to be unique: repeated rows {0}" (str/join ", " (for [[row repeat-count] repeats] (format "%s × %d" (pr-str row) repeat-count)))) {:status-code 400, :repeated-rows repeats})))) |
(defmethod actions/perform-action!* [:sql-jdbc :bulk/delete] [driver _action {database-id :id, :as database} {:keys [table-id rows]}] (log/tracef "Deleting %d rows" (count rows)) (let [pk-name->id (table-id->pk-field-name->id database-id table-id)] ;; validate the keys in `rows` (check-consistent-row-keys rows) (check-rows-have-expected-columns-and-no-other-keys rows (keys pk-name->id)) (check-unique-rows rows) ;; now do one `:row/delete` for each row (perform-bulk-action-with-repeated-single-row-actions! {:driver driver :database database :action :row/delete :rows rows :xform (comp (map (fn [row] {:database database-id :type :query :query {:source-table table-id :filter (row->mbql-filter-clause pk-name->id row)}})) #(completing % (fn [[errors _successes]] (when (seq errors) (throw (ex-info (tru "Error(s) deleting rows.") {:status-code 400, :errors errors}))) ;; `:bulk/delete` just returns a simple status message on success. {:success true})))}))) | |
| |
Return a 400 if | (mu/defn- check-row-has-all-pk-columns [row :- ::lib.schema.actions/row pk-names :- [:set :string]] (doseq [pk-key pk-names :when (not (contains? row pk-key))] (throw (ex-info (tru "Row is missing required primary key column. Required {0}; got {1}" (pr-str pk-names) (pr-str (set (keys row)))) {:row row, :pk-names pk-names, :status-code 400})))) |
Return a 400 if | (mu/defn- check-row-has-some-non-pk-columns [row :- ::lib.schema.actions/row pk-names :- [:set :string]] (let [non-pk-names (set/difference (set (keys row)) pk-names)] (when (empty? non-pk-names) (throw (ex-info (tru "Invalid update row map: no non-PK columns. Got {0}, all of which are PKs." (pr-str (set (keys row)))) {:status-code 400 :row row :all-keys (set (keys row)) :pk-names pk-names}))))) |
Create a function to use to transform each row coming in to a | (defn- bulk-update-row-xform [{database-id :id, :as _database} table-id] ;; TODO -- make sure all rows specify the PK columns (let [pk-name->id (table-id->pk-field-name->id database-id table-id) pk-names (set (keys pk-name->id))] (fn [row] (check-row-has-all-pk-columns row pk-names) (let [pk-column->value (select-keys row pk-names)] (check-row-has-some-non-pk-columns row pk-names) {:database database-id :type :query :query {:source-table table-id :filter (row->mbql-filter-clause pk-name->id pk-column->value)} :update-row (apply dissoc row pk-names)})))) |
(defmethod actions/perform-action!* [:sql-jdbc :bulk/update] [driver _action database {:keys [table-id rows]}] (log/tracef "Updating %d rows" (count rows)) (perform-bulk-action-with-repeated-single-row-actions! {:driver driver :database database :action :row/update :rows rows :xform (comp (map (bulk-update-row-xform database table-id)) #(completing % (fn [[errors successes]] (when (seq errors) (throw (ex-info (tru "Error(s) updating rows.") {:status-code 400, :errors errors}))) ;; `:bulk/update` returns {:rows-updated <number-of-rows-updated>} on success. (transduce (map (comp first :rows-updated)) (completing + (fn [num-rows-updated] {:rows-updated num-rows-updated})) 0 successes))))})) | |