This module handles the analysis of queries, which determines their data dependencies. It also is used to audit these dependencies for issues - for example, making use of column that no longer exists. Analysis is typically performed on a background worker thread, and the [[analyze-async!]] method is used to add cards to the corresponding queue. TODO -- not all of this stuff needs to live here or be public now that [[metabase.query-analysis.task.analyze-queries]] is part of this module instead of living somewhere else. See if we can clean up the API a little further. | (ns metabase.query-analysis.core (:require [clojure.set :as set] [medley.core :as m] [metabase.config :as config] [metabase.legacy-mbql.util :as mbql.u] [metabase.lib.core :as lib] [metabase.lib.util :as lib.util] [metabase.public-settings :as public-settings] [metabase.query-analysis.native-query-analyzer :as nqa] [metabase.query-analysis.native-query-analyzer.replacement :as nqa.replacement] [metabase.util :as u] [metabase.util.log :as log] [metabase.util.namespaces :as shared.ns] [metabase.util.queue :as queue] [toucan2.core :as t2])) |
(set! *warn-on-reflection* true) | |
(shared.ns/import-fns [nqa tables-for-native]) | |
The maximum number of cards which can be queued for async analysis. When exceeded, additional cards will be dropped. | (def ^:private realtime-queue-capacity 1000) |
The in-memory queue used to throttle analysis and reduce the chance of race conditions. | (def ^:private worker-queue
(queue/bounded-transfer-queue realtime-queue-capacity {:dedupe? false})) |
Managing a background thread in the REPL is likely to confuse and infuriate, especially when running tests. For this reason, we run analysis on the main thread by default. | (def ^:dynamic *analyze-execution-in-dev?* ::immediate) |
A card's query is normally analyzed on every create/update. For most tests, this is an unnecessary expense; hence we disable analysis by default. | (def ^:dynamic *analyze-execution-in-test?* ::disabled) |
Override the default execution mode, except in prod. | (defmacro with-execution*
[execution & body]
(assert (not config/is-prod?))
`(binding [*analyze-execution-in-dev?* ~execution
*analyze-execution-in-test?* ~execution]
~@body)) |
Override the default execution mode to always use the queue. Does nothing in prod - only use this in tests. | (defmacro with-queued-analysis [& body] `(with-execution* ::queued ~@body)) |
Override the default execution mode to always use the current thread. Does nothing in prod - only use this in tests. | (defmacro with-immediate-analysis [& body] `(with-execution* ::immediate ~@body)) |
Override the default execution mode to always use the current thread. Does nothing in prod - only use this in tests. | (defmacro without-analysis [& body] `(with-execution* ::disabled ~@body)) |
The execution strategy for analysis. It can be overridden by a dynamic variable in dev and tests. By default, it is async in production for the backfill. | (defn- execution
[]
(case config/run-mode
:prod ::queued
:dev *analyze-execution-in-dev?*
:test *analyze-execution-in-test?*)) |
Is analysis of the given query type enabled? | (defn enabled-type?
[query-type]
(and (public-settings/query-analysis-enabled)
(case query-type
:native (public-settings/sql-parsing-enabled)
:query true
:mbql/query true
false))) |
(defn- explicit-field-references [field-ids]
(when (seq field-ids)
;; We add this on in code as `true` in MySQL-based drivers would be returned as 1.
(map #(assoc % :explicit-reference true)
(t2/select :model/QueryField {:select [[:t.id :table-id] [:t.name :table]
[:f.id :field-id] [:f.name :column]]
:from [[(t2/table-name :model/Field) :f]]
:join [[(t2/table-name :model/Table) :t] [:= :t.id :f.table_id]]
:where [:in :f.id field-ids]})))) | |
(defn- explicit-references [field-ids]
(let [field-refs (explicit-field-references field-ids)]
{:fields (distinct field-refs)
:tables (distinct (map #(dissoc % :field-id :column :explicit-reference) field-refs))})) | |
Find out ids of all fields used in a query. Conforms to the same protocol as [[query-analyzer/field-ids-for-sql]],
so returns Does not track wildcards for queries rendered as tables afterward. | (defn- query-references
([query]
(query-references query (lib/normalized-query-type query)))
([query query-type]
(case query-type
:native (try
(nqa/references-for-native query)
(catch Exception e
(log/debug e "Failed to analyze native query" query)
{:error :query-analysis.error/exception, :context {:exception e}}))
;; For now, all model references are resolved transitively to the ultimate field ids.
;; We may want to change to record model references directly rather than resolving them.
;; This would remove the need to invalidate consuming cards when a given model changes.
:query (explicit-references (mbql.u/referenced-field-ids query))
:mbql/query (explicit-references (lib.util/referenced-field-ids query))))) |
(defn- truncate-string [x]
(if (and (string? x) (> (count x) 254))
(subs x 0 254)
x)) | |
Clears QueryFields associated with this card and creates fresh, up-to-date-ones. Returns | (defn- update-query-analysis-for-card!
[{card-id :id, query :dataset_query}]
(let [query-type (lib/normalized-query-type query)]
(when (enabled-type? query-type)
(let [analysis-id (t2/insert-returning-pk! :model/QueryAnalysis {:card_id card-id :status "running"})
result (query-references query query-type)
safely (fn [f] (fn [m] (update-vals (f m) truncate-string)))
table->row (safely
(fn [{:keys [schema table table-id]}]
{:card_id card-id
:analysis_id analysis-id
:schema schema
:table table
:table_id table-id}))
field->row (safely
(fn [{:keys [schema table column table-id field-id explicit-reference]}]
{:card_id card-id
:analysis_id analysis-id
:schema schema
:table table
:column column
:table_id table-id
:field_id field-id
:explicit_reference explicit-reference}))]
(if (contains? result :error)
;; TODO we should track cases where the driver is disabled or not-supported differently.
(t2/update! :model/QueryAnalysis analysis-id {:status "failed"})
(do
(t2/insert! :model/QueryField (map field->row (:fields result)))
(t2/insert! :model/QueryTable (map table->row (:tables result)))
(t2/update! :model/QueryAnalysis analysis-id {:status "complete"})))
(t2/delete! :model/QueryAnalysis
{:where [:and
[:= :card_id card-id]
[:not= :id analysis-id]]})
result)))) |
Substitute new references for certain fields and tables, based upon the given mappings. | (defn- replaced-inner-query-for-native-card
[query {:keys [fields tables] :as _replacement-ids}]
(let [keyvals-set #(set/union (set (keys %))
(set (vals %)))
id->field (if (empty? fields)
{}
(m/index-by :id
(t2/query {:select [[:f.id :id]
[:f.name :column]
[:t.name :table]
[:t.schema :schema]]
:from [[:metabase_field :f]]
:join [[:metabase_table :t] [:= :f.table_id :t.id]]
:where [:in :f.id (keyvals-set fields)]})))
id->table (if (empty? tables)
{}
(m/index-by :id
(t2/query {:select [[:t.id :id]
[:t.name :table]
[:t.schema :schema]]
:from [[:metabase_table :t]]
:where [:in :t.id (keyvals-set tables)]})))
remove-id #(select-keys % [:column :table :schema])
get-or-throw-from (fn [m] (fn [k] (if (contains? m k)
(remove-id (get m k))
(throw (ex-info "ID not found" {:id k :available m})))))
ids->replacements (fn [id->replacement-id id->row row->identifier]
(-> id->replacement-id
(u/update-keys-vals (get-or-throw-from id->row))
(update-vals row->identifier)))
;; Note: we are naively providing unqualified new identifier names as the replacements.
;; this will break if previously unambiguous identifiers become ambiguous due to the replacements
column-replacements (ids->replacements fields id->field :column)
table-replacements (ids->replacements tables id->table :table)]
(nqa.replacement/replace-names query {:columns column-replacements
:tables table-replacements}))) |
Given a card and a map of the form {:fields {1 2, 3 4} :tables {100 101}} Update the card so that its references to the Field with ID 1 are replaced by Field 2, etc. | (defn replace-fields-and-tables
[{card-type :query_type, q :dataset_query :as card} replacements]
(case card-type
:native (replaced-inner-query-for-native-card q replacements)
(throw (ex-info "We don't (yet) support replacing field and table refs in cards with MBQL queries"
{:card card :replacements replacements})))) |
Given a partial card or its id, ensure that we have all the fields required for analysis. | (defn ->analyzable
[card-or-id]
;; If we don't know whether a card has been archived, give it the benefit of the doubt.
(if (every? #(some? (% card-or-id)) [:id :dataset_query])
card-or-id
;; If we need to query the database though, find out for sure.
(t2/select-one [:model/Card :id :archived :dataset_query] (u/the-id card-or-id)))) |
Update the analysis for a given card if it is active. Should only be called from [[metabase.query-analysis.task.analyze-queries]]; otherwise favor [[analyze!]] | (defn analyze!*
[card-or-id]
(let [card (->analyzable card-or-id)
card-id (:id card)]
(cond
(not card) (log/warnf "Card not found: %s" card-id)
(:archived card) (log/debugf "Skipping archived card: %s" card-id)
:else (do
(log/debugf "Performing query analysis for card %s" card-id)
(update-query-analysis-for-card! card))))) |
Get the id of the next card id to be analyzed. May block indefinitely, relies on producer. Should only be called from [[metabase.query-analysis.task.analyze-queries]]. | (defn next-card-or-id! ([] (next-card-or-id! worker-queue)) ([queue] (next-card-or-id! queue Long/MAX_VALUE)) ([queue timeout] (queue/blocking-take! queue timeout))) |
Indirection used to modify the execution strategy for analysis in dev and tests. | (defn- queue-or-analyze!
[offer-fn! card-or-id]
(case (execution)
::immediate (analyze!* card-or-id)
::queued (offer-fn! card-or-id)
::disabled nil)) |
(defn- blocking-put! [queue timeout card-or-id]
(let [id (u/the-id card-or-id)]
(log/debugf "Synchronously analyzing Card %s" id)
(queue/blocking-put! queue timeout card-or-id))) | |
Assuming analysis is enabled, analyze the card immediately (and in the current thread). | (defn analyze!
[card-or-id]
(when-not (= ::disabled (execution))
(try
(analyze!* card-or-id)
;; Don't throw exceptions on the main thread path, analysis is best-effort.
(catch Exception e
(log/errorf e "Failure analysing card %s" (u/the-id card-or-id)))))) |
Synchronously hand off the given card for analysis, at a low priority. May block indefinitely, relies on consumer. Note that only the hand-off is sync; if we're using the queue the processing could happen asynchronously. | (defn queue-analysis! ([card-or-id] (queue-analysis! card-or-id worker-queue)) ([card-or-id queue] (queue-or-analyze! (partial queue/blocking-put! queue Long/MAX_VALUE) card-or-id))) |