This module handles the analysis of queries, which determines their data dependencies. It also is used to audit these dependencies for issues - for example, making use of column that no longer exists. Analysis is typically performed on a background worker thread, and the [[analyze-async!]] method is used to add cards to the corresponding queue.

TODO -- not all of this stuff needs to live here or be public now that [[metabase.query-analysis.task.analyze-queries]] is part of this module instead of living somewhere else. See if we can clean up the API a little further.

(ns metabase.query-analysis.core
  (:require
   [clojure.set :as set]
   [medley.core :as m]
   [metabase.config :as config]
   [metabase.legacy-mbql.util :as mbql.u]
   [metabase.lib.core :as lib]
   [metabase.lib.util :as lib.util]
   [metabase.public-settings :as public-settings]
   [metabase.query-analysis.native-query-analyzer :as nqa]
   [metabase.query-analysis.native-query-analyzer.replacement :as nqa.replacement]
   [metabase.util :as u]
   [metabase.util.log :as log]
   [metabase.util.queue :as queue]
   [toucan2.core :as t2]))
(set! *warn-on-reflection* true)

The maximum number of cards which can be queued for async analysis. When exceeded, additional cards will be dropped.

(def ^:private realtime-queue-capacity
  1000)

The in-memory queue used to throttle analysis and reduce the chance of race conditions.

(def ^:private worker-queue
  (queue/bounded-transfer-queue realtime-queue-capacity {:dedupe? false}))

Managing a background thread in the REPL is likely to confuse and infuriate, especially when running tests. For this reason, we run analysis on the main thread by default.

(def ^:dynamic *analyze-execution-in-dev?*
  ::immediate)

A card's query is normally analyzed on every create/update. For most tests, this is an unnecessary expense; hence we disable analysis by default.

(def ^:dynamic *analyze-execution-in-test?*
  ::disabled)

Override the default execution mode, except in prod.

(defmacro with-execution*
  [execution & body]
  (assert (not config/is-prod?))
  `(binding [*analyze-execution-in-dev?*  ~execution
             *analyze-execution-in-test?* ~execution]
     ~@body))

Override the default execution mode to always use the queue. Does nothing in prod - only use this in tests.

(defmacro with-queued-analysis
  [& body]
  `(with-execution* ::queued ~@body))

Override the default execution mode to always use the current thread. Does nothing in prod - only use this in tests.

(defmacro with-immediate-analysis
  [& body]
  `(with-execution* ::immediate ~@body))

Override the default execution mode to always use the current thread. Does nothing in prod - only use this in tests.

(defmacro without-analysis
  [& body]
  `(with-execution* ::disabled ~@body))

The execution strategy for analysis. It can be overridden by a dynamic variable in dev and tests.

By default, it is async in production for the backfill.

(defn- execution
  []
  (case config/run-mode
    :prod ::queued
    :dev  *analyze-execution-in-dev?*
    :test *analyze-execution-in-test?*))

Is analysis of the given query type enabled?

(defn enabled-type?
  [query-type]
  (and (public-settings/query-analysis-enabled)
       (case query-type
         :native     (public-settings/sql-parsing-enabled)
         :query      true
         :mbql/query true
         false)))
(defn- explicit-field-references [field-ids]
  (when (seq field-ids)
    ;; We add this on in code as `true` in MySQL-based drivers would be returned as 1.
    (map #(assoc % :explicit-reference true)
         (t2/select :model/QueryField {:select [[:t.id :table-id] [:t.name :table]
                                                [:f.id :field-id] [:f.name :column]]
                                       :from   [[(t2/table-name :model/Field) :f]]
                                       :join   [[(t2/table-name :model/Table) :t] [:= :t.id :f.table_id]]
                                       :where  [:in :f.id field-ids]}))))
(defn- explicit-references [field-ids]
  (let [field-refs (explicit-field-references field-ids)]
    {:fields (distinct field-refs)
     :tables (distinct (map #(dissoc % :field-id :column :explicit-reference) field-refs))}))

Find out ids of all fields used in a query. Conforms to the same protocol as [[query-analyzer/field-ids-for-sql]], so returns {:explicit #{...int ids}} map.

Does not track wildcards for queries rendered as tables afterward.

(defn- query-references
  ([query]
   (query-references query (lib/normalized-query-type query)))
  ([query query-type]
   (case query-type
     :native     (try
                   (nqa/references-for-native query)
                   (catch Exception e
                     (log/debug e "Failed to analyze native query" query)
                     {:error :query-analysis.error/exception, :context {:exception e}}))
     ;; For now, all model references are resolved transitively to the ultimate field ids.
     ;; We may want to change to record model references directly rather than resolving them.
     ;; This would remove the need to invalidate consuming cards when a given model changes.
     :query      (explicit-references (mbql.u/referenced-field-ids query))
     :mbql/query (explicit-references (lib.util/referenced-field-ids query)))))
(defn- truncate-string [x]
  (if (and (string? x) (> (count x) 254))
    (subs x 0 254)
    x))

Clears QueryFields associated with this card and creates fresh, up-to-date-ones.

Returns nil (and logs the error) if there was a parse error. Returns nil and leaves the database records as-is if analysis is disabled for the given query type.

(defn- update-query-analysis-for-card!
  [{card-id :id, query :dataset_query}]
  (let [query-type (lib/normalized-query-type query)]
    (when (enabled-type? query-type)
      (let [analysis-id (t2/insert-returning-pk! :model/QueryAnalysis {:card_id card-id :status "running"})
            result      (query-references query query-type)
            safely      (fn [f] (fn [m] (update-vals (f m) truncate-string)))
            table->row  (safely
                         (fn [{:keys [schema table table-id]}]
                           {:card_id     card-id
                            :analysis_id analysis-id
                            :schema      schema
                            :table       table
                            :table_id    table-id}))
            field->row  (safely
                         (fn [{:keys [schema table column table-id field-id explicit-reference]}]
                           {:card_id            card-id
                            :analysis_id        analysis-id
                            :schema             schema
                            :table              table
                            :column             column
                            :table_id           table-id
                            :field_id           field-id
                            :explicit_reference explicit-reference}))]
        (if (contains? result :error)
          ;; TODO we should track cases where the driver is disabled or not-supported differently.
          (t2/update! :model/QueryAnalysis analysis-id {:status "failed"})
          (do
            (t2/insert! :model/QueryField (map field->row (:fields result)))
            (t2/insert! :model/QueryTable (map table->row (:tables result)))
            (t2/update! :model/QueryAnalysis analysis-id {:status "complete"})))
        (t2/delete! :model/QueryAnalysis
                    {:where [:and
                             [:= :card_id card-id]
                             [:not= :id analysis-id]]})
        result))))

Substitute new references for certain fields and tables, based upon the given mappings.

(defn- replaced-inner-query-for-native-card
  [query {:keys [fields tables] :as _replacement-ids}]
  (let [keyvals-set         #(set/union (set (keys %))
                                        (set (vals %)))
        id->field           (if (empty? fields)
                              {}
                              (m/index-by :id
                                          (t2/query {:select [[:f.id :id]
                                                              [:f.name :column]
                                                              [:t.name :table]
                                                              [:t.schema :schema]]
                                                     :from   [[:metabase_field :f]]
                                                     :join   [[:metabase_table :t] [:= :f.table_id :t.id]]
                                                     :where  [:in :f.id (keyvals-set fields)]})))
        id->table           (if (empty? tables)
                              {}
                              (m/index-by :id
                                          (t2/query {:select [[:t.id :id]
                                                              [:t.name :table]
                                                              [:t.schema :schema]]
                                                     :from   [[:metabase_table :t]]
                                                     :where  [:in :t.id (keyvals-set tables)]})))
        remove-id           #(select-keys % [:column :table :schema])
        get-or-throw-from   (fn [m] (fn [k] (if (contains? m k)
                                              (remove-id (get m k))
                                              (throw (ex-info "ID not found" {:id k :available m})))))
        ids->replacements   (fn [id->replacement-id id->row row->identifier]
                              (-> id->replacement-id
                                  (u/update-keys-vals (get-or-throw-from id->row))
                                  (update-vals row->identifier)))
        ;; Note: we are naively providing unqualified new identifier names as the replacements.
        ;; this will break if previously unambiguous identifiers become ambiguous due to the replacements
        column-replacements (ids->replacements fields id->field :column)
        table-replacements  (ids->replacements tables id->table :table)]
    (nqa.replacement/replace-names query {:columns column-replacements
                                          :tables  table-replacements})))

Given a card and a map of the form

{:fields {1 2, 3 4} :tables {100 101}}

Update the card so that its references to the Field with ID 1 are replaced by Field 2, etc.

(defn replace-fields-and-tables
  [{card-type :query_type, q :dataset_query :as card} replacements]
  (case card-type
    :native (replaced-inner-query-for-native-card q replacements)
    (throw (ex-info "We don't (yet) support replacing field and table refs in cards with MBQL queries"
                    {:card card :replacements replacements}))))

Given a partial card or its id, ensure that we have all the fields required for analysis.

(defn ->analyzable
  [card-or-id]
  ;; If we don't know whether a card has been archived, give it the benefit of the doubt.
  (if (every? #(some? (% card-or-id)) [:id :dataset_query])
    card-or-id
    ;; If we need to query the database though, find out for sure.
    (t2/select-one [:model/Card :id :archived :dataset_query] (u/the-id card-or-id))))

Update the analysis for a given card if it is active. Should only be called from [[metabase.query-analysis.task.analyze-queries]]; otherwise favor [[analyze!]]

(defn analyze!*
  [card-or-id]
  (let [card    (->analyzable card-or-id)
        card-id (:id card)]
    (cond
      (not card)       (log/warnf "Card not found: %s" card-id)
      (:archived card) (log/debugf "Skipping archived card: %s" card-id)
      :else            (do
                         (log/debugf "Performing query analysis for card %s" card-id)
                         (update-query-analysis-for-card! card)))))

Get the id of the next card id to be analyzed. May block indefinitely, relies on producer. Should only be called from [[metabase.query-analysis.task.analyze-queries]].

(defn next-card-or-id!
  ([]
   (next-card-or-id! worker-queue))
  ([queue]
   (next-card-or-id! queue Long/MAX_VALUE))
  ([queue timeout]
   (queue/blocking-take! queue timeout)))

Indirection used to modify the execution strategy for analysis in dev and tests.

(defn- queue-or-analyze!
  [offer-fn! card-or-id]
  (case (execution)
    ::immediate (analyze!* card-or-id)
    ::queued    (offer-fn! card-or-id)
    ::disabled  nil))
(defn- blocking-put! [queue timeout card-or-id]
  (let [id (u/the-id card-or-id)]
    (log/debugf "Synchronously analyzing Card %s" id)
    (queue/blocking-put! queue timeout card-or-id)))

Assuming analysis is enabled, analyze the card immediately (and in the current thread).

(defn analyze!
  [card-or-id]
  (when-not (= ::disabled (execution))
    (try
      (analyze!* card-or-id)
      ;; Don't throw exceptions on the main thread path, analysis is best-effort.
      (catch Exception e
        (log/errorf e "Failure analysing card %s" (u/the-id card-or-id))))))

Synchronously hand off the given card for analysis, at a low priority. May block indefinitely, relies on consumer.

Note that only the hand-off is sync; if we're using the queue the processing could happen asynchronously.

(defn queue-analysis!
  ([card-or-id]
   (queue-analysis! card-or-id worker-queue))
  ([card-or-id queue]
   (queue-or-analyze! (partial queue/blocking-put! queue Long/MAX_VALUE) card-or-id)))