This module handles the analysis of queries, which determines their data dependencies. It also is used to audit these dependencies for issues - for example, making use of column that no longer exists. Analysis is typically performed on a background worker thread, and the [[analyze-async!]] method is used to add cards to the corresponding queue. TODO -- not all of this stuff needs to live here or be public now that [[metabase.query-analysis.task.analyze-queries]] is part of this module instead of living somewhere else. See if we can clean up the API a little further. | (ns metabase.query-analysis.core (:require [clojure.set :as set] [medley.core :as m] [metabase.config :as config] [metabase.legacy-mbql.util :as mbql.u] [metabase.lib.core :as lib] [metabase.lib.util :as lib.util] [metabase.public-settings :as public-settings] [metabase.query-analysis.native-query-analyzer :as nqa] [metabase.query-analysis.native-query-analyzer.replacement :as nqa.replacement] [metabase.util :as u] [metabase.util.log :as log] [metabase.util.queue :as queue] [toucan2.core :as t2])) |
(set! *warn-on-reflection* true) | |
The maximum number of cards which can be queued for async analysis. When exceeded, additional cards will be dropped. | (def ^:private realtime-queue-capacity 1000) |
The in-memory queue used to throttle analysis and reduce the chance of race conditions. | (def ^:private worker-queue (queue/bounded-transfer-queue realtime-queue-capacity {:dedupe? false})) |
Managing a background thread in the REPL is likely to confuse and infuriate, especially when running tests. For this reason, we run analysis on the main thread by default. | (def ^:dynamic *analyze-execution-in-dev?* ::immediate) |
A card's query is normally analyzed on every create/update. For most tests, this is an unnecessary expense; hence we disable analysis by default. | (def ^:dynamic *analyze-execution-in-test?* ::disabled) |
Override the default execution mode, except in prod. | (defmacro with-execution* [execution & body] (assert (not config/is-prod?)) `(binding [*analyze-execution-in-dev?* ~execution *analyze-execution-in-test?* ~execution] ~@body)) |
Override the default execution mode to always use the queue. Does nothing in prod - only use this in tests. | (defmacro with-queued-analysis [& body] `(with-execution* ::queued ~@body)) |
Override the default execution mode to always use the current thread. Does nothing in prod - only use this in tests. | (defmacro with-immediate-analysis [& body] `(with-execution* ::immediate ~@body)) |
Override the default execution mode to always use the current thread. Does nothing in prod - only use this in tests. | (defmacro without-analysis [& body] `(with-execution* ::disabled ~@body)) |
The execution strategy for analysis. It can be overridden by a dynamic variable in dev and tests. By default, it is async in production for the backfill. | (defn- execution [] (case config/run-mode :prod ::queued :dev *analyze-execution-in-dev?* :test *analyze-execution-in-test?*)) |
Is analysis of the given query type enabled? | (defn enabled-type? [query-type] (and (public-settings/query-analysis-enabled) (case query-type :native (public-settings/sql-parsing-enabled) :query true :mbql/query true false))) |
(defn- explicit-field-references [field-ids] (when (seq field-ids) ;; We add this on in code as `true` in MySQL-based drivers would be returned as 1. (map #(assoc % :explicit-reference true) (t2/select :model/QueryField {:select [[:t.id :table-id] [:t.name :table] [:f.id :field-id] [:f.name :column]] :from [[(t2/table-name :model/Field) :f]] :join [[(t2/table-name :model/Table) :t] [:= :t.id :f.table_id]] :where [:in :f.id field-ids]})))) | |
(defn- explicit-references [field-ids] (let [field-refs (explicit-field-references field-ids)] {:fields (distinct field-refs) :tables (distinct (map #(dissoc % :field-id :column :explicit-reference) field-refs))})) | |
Find out ids of all fields used in a query. Conforms to the same protocol as [[query-analyzer/field-ids-for-sql]],
so returns Does not track wildcards for queries rendered as tables afterward. | (defn- query-references ([query] (query-references query (lib/normalized-query-type query))) ([query query-type] (case query-type :native (try (nqa/references-for-native query) (catch Exception e (log/debug e "Failed to analyze native query" query) {:error :query-analysis.error/exception, :context {:exception e}})) ;; For now, all model references are resolved transitively to the ultimate field ids. ;; We may want to change to record model references directly rather than resolving them. ;; This would remove the need to invalidate consuming cards when a given model changes. :query (explicit-references (mbql.u/referenced-field-ids query)) :mbql/query (explicit-references (lib.util/referenced-field-ids query))))) |
(defn- truncate-string [x] (if (and (string? x) (> (count x) 254)) (subs x 0 254) x)) | |
Clears QueryFields associated with this card and creates fresh, up-to-date-ones. Returns | (defn- update-query-analysis-for-card! [{card-id :id, query :dataset_query}] (let [query-type (lib/normalized-query-type query)] (when (enabled-type? query-type) (let [analysis-id (t2/insert-returning-pk! :model/QueryAnalysis {:card_id card-id :status "running"}) result (query-references query query-type) safely (fn [f] (fn [m] (update-vals (f m) truncate-string))) table->row (safely (fn [{:keys [schema table table-id]}] {:card_id card-id :analysis_id analysis-id :schema schema :table table :table_id table-id})) field->row (safely (fn [{:keys [schema table column table-id field-id explicit-reference]}] {:card_id card-id :analysis_id analysis-id :schema schema :table table :column column :table_id table-id :field_id field-id :explicit_reference explicit-reference}))] (if (contains? result :error) ;; TODO we should track cases where the driver is disabled or not-supported differently. (t2/update! :model/QueryAnalysis analysis-id {:status "failed"}) (do (t2/insert! :model/QueryField (map field->row (:fields result))) (t2/insert! :model/QueryTable (map table->row (:tables result))) (t2/update! :model/QueryAnalysis analysis-id {:status "complete"}))) (t2/delete! :model/QueryAnalysis {:where [:and [:= :card_id card-id] [:not= :id analysis-id]]}) result)))) |
Substitute new references for certain fields and tables, based upon the given mappings. | (defn- replaced-inner-query-for-native-card [query {:keys [fields tables] :as _replacement-ids}] (let [keyvals-set #(set/union (set (keys %)) (set (vals %))) id->field (if (empty? fields) {} (m/index-by :id (t2/query {:select [[:f.id :id] [:f.name :column] [:t.name :table] [:t.schema :schema]] :from [[:metabase_field :f]] :join [[:metabase_table :t] [:= :f.table_id :t.id]] :where [:in :f.id (keyvals-set fields)]}))) id->table (if (empty? tables) {} (m/index-by :id (t2/query {:select [[:t.id :id] [:t.name :table] [:t.schema :schema]] :from [[:metabase_table :t]] :where [:in :t.id (keyvals-set tables)]}))) remove-id #(select-keys % [:column :table :schema]) get-or-throw-from (fn [m] (fn [k] (if (contains? m k) (remove-id (get m k)) (throw (ex-info "ID not found" {:id k :available m}))))) ids->replacements (fn [id->replacement-id id->row row->identifier] (-> id->replacement-id (u/update-keys-vals (get-or-throw-from id->row)) (update-vals row->identifier))) ;; Note: we are naively providing unqualified new identifier names as the replacements. ;; this will break if previously unambiguous identifiers become ambiguous due to the replacements column-replacements (ids->replacements fields id->field :column) table-replacements (ids->replacements tables id->table :table)] (nqa.replacement/replace-names query {:columns column-replacements :tables table-replacements}))) |
Given a card and a map of the form {:fields {1 2, 3 4} :tables {100 101}} Update the card so that its references to the Field with ID 1 are replaced by Field 2, etc. | (defn replace-fields-and-tables [{card-type :query_type, q :dataset_query :as card} replacements] (case card-type :native (replaced-inner-query-for-native-card q replacements) (throw (ex-info "We don't (yet) support replacing field and table refs in cards with MBQL queries" {:card card :replacements replacements})))) |
Given a partial card or its id, ensure that we have all the fields required for analysis. | (defn ->analyzable [card-or-id] ;; If we don't know whether a card has been archived, give it the benefit of the doubt. (if (every? #(some? (% card-or-id)) [:id :dataset_query]) card-or-id ;; If we need to query the database though, find out for sure. (t2/select-one [:model/Card :id :archived :dataset_query] (u/the-id card-or-id)))) |
Update the analysis for a given card if it is active. Should only be called from [[metabase.query-analysis.task.analyze-queries]]; otherwise favor [[analyze!]] | (defn analyze!* [card-or-id] (let [card (->analyzable card-or-id) card-id (:id card)] (cond (not card) (log/warnf "Card not found: %s" card-id) (:archived card) (log/debugf "Skipping archived card: %s" card-id) :else (do (log/debugf "Performing query analysis for card %s" card-id) (update-query-analysis-for-card! card))))) |
Get the id of the next card id to be analyzed. May block indefinitely, relies on producer. Should only be called from [[metabase.query-analysis.task.analyze-queries]]. | (defn next-card-or-id! ([] (next-card-or-id! worker-queue)) ([queue] (next-card-or-id! queue Long/MAX_VALUE)) ([queue timeout] (queue/blocking-take! queue timeout))) |
Indirection used to modify the execution strategy for analysis in dev and tests. | (defn- queue-or-analyze! [offer-fn! card-or-id] (case (execution) ::immediate (analyze!* card-or-id) ::queued (offer-fn! card-or-id) ::disabled nil)) |
(defn- blocking-put! [queue timeout card-or-id] (let [id (u/the-id card-or-id)] (log/debugf "Synchronously analyzing Card %s" id) (queue/blocking-put! queue timeout card-or-id))) | |
Assuming analysis is enabled, analyze the card immediately (and in the current thread). | (defn analyze! [card-or-id] (when-not (= ::disabled (execution)) (try (analyze!* card-or-id) ;; Don't throw exceptions on the main thread path, analysis is best-effort. (catch Exception e (log/errorf e "Failure analysing card %s" (u/the-id card-or-id)))))) |
Synchronously hand off the given card for analysis, at a low priority. May block indefinitely, relies on consumer. Note that only the hand-off is sync; if we're using the queue the processing could happen asynchronously. | (defn queue-analysis! ([card-or-id] (queue-analysis! card-or-id worker-queue)) ([card-or-id queue] (queue-or-analyze! (partial queue/blocking-put! queue Long/MAX_VALUE) card-or-id))) |