(ns metabase.query-processor.middleware.fetch-source-query (:require [metabase.driver :as driver] [metabase.driver.ddl.interface :as ddl.i] [metabase.lib.card :as lib.card] [metabase.lib.metadata :as lib.metadata] [metabase.lib.metadata.protocols :as lib.metadata.protocols] [metabase.lib.query :as lib.query] [metabase.lib.schema :as lib.schema] [metabase.lib.schema.common :as lib.schema.common] [metabase.lib.schema.id :as lib.schema.id] [metabase.lib.schema.metadata :as lib.schema.metadata] [metabase.lib.walk :as lib.walk] [metabase.public-settings :as public-settings] [metabase.query-processor.error-type :as qp.error-type] [metabase.query-processor.store :as qp.store] [metabase.query-processor.util.persisted-cache :as qp.persisted] [metabase.util :as u] [metabase.util.i18n :refer [trs tru]] [metabase.util.log :as log] [metabase.util.malli :as mu] [weavejester.dependency :as dep])) | |
MongoDB native queries consist of a collection and a pipelne (query). TODO -- it's not great that this code lives here. This should be part of the MongoDB driver. We should NOT be hardcoding driver-specific behavior in generic QP middleware. TODO -- consider whether [[normalize-card-query]] should be moved into [[metabase.lib.card]], seems like it would make sense but it would involve teasing out some QP-specific stuff to make it work. | (defn- fix-mongodb-first-stage
[[first-stage & more]]
(let [first-stage (cond-> first-stage
(and (= driver/*driver* :mongo)
(= (:lib/type first-stage) :mbql.stage/native))
(update :native (fn [x]
(if (map? x)
x
{:collection (:collection first-stage)
:projections (:projections first-stage)
:query x}))))]
(cons first-stage more))) |
(mu/defn normalize-card-query :- ::lib.schema.metadata/card
"Convert Card's query (`:datasaet-query`) to pMBQL as needed; splice in stage metadata and some extra keys."
[metadata-providerable :- ::lib.schema.metadata/metadata-providerable
{card-id :id, :as card} :- ::lib.schema.metadata/card]
(let [persisted-info (:lib/persisted-info card)
persisted? (qp.persisted/can-substitute? card persisted-info)]
(when persisted?
(log/infof "Found substitute cached query for card %s from %s.%s"
card-id
(ddl.i/schema-name {:id (:database-id card)} (public-settings/site-uuid))
(:table-name persisted-info)))
(letfn [(update-stages [stages]
(let [stages (fix-mongodb-first-stage stages)
stages (for [stage stages]
;; This is for detecting circular refs below, and is later used as part of
;; permissions enforcement
(assoc stage :qp/stage-is-from-source-card card-id))
card-metadata (into [] (remove :remapped-from)
(lib.card/card-metadata-columns metadata-providerable card))
last-stage (cond-> (last stages)
(seq card-metadata) (assoc-in [:lib/stage-metadata :columns] card-metadata)
;; This will be applied, if still appropriate, by
;; the [[metabase.query-processor.middleware.persistence]] middleware
;;
;; TODO -- not 100% sure I did this right, there are almost no tests for this
persisted? (assoc :persisted-info/native
(qp.persisted/persisted-info-native-query
(:database-id card)
persisted-info)))]
(conj (vec (butlast stages)) last-stage)))
(update-query [query]
(-> (lib.query/query metadata-providerable query)
(update :stages update-stages)))]
(update card :dataset-query update-query)))) | |
(mu/defn- card :- ::lib.schema.metadata/card
[metadata-providerable :- ::lib.schema.metadata/metadata-providerable
card-id :- ::lib.schema.id/card]
(let [card (or (lib.metadata/card metadata-providerable card-id)
(throw (ex-info (tru "Card {0} does not exist." (pr-str card-id))
{:type qp.error-type/invalid-query, :card-id card-id})))]
;; make sure the Card has a valid query
(when-not (:dataset-query card)
(throw (ex-info (tru "Missing source query in Card {0}" card-id)
{:type qp.error-type/invalid-query, :card-id card-id})))
;; make sure this Card is from the same Database as the one we're running queries against.
(let [source-database-id (u/the-id (lib.metadata/database metadata-providerable))]
(when-not (= (:database-id card) source-database-id)
(throw (ex-info (tru "Card {0} is from a different Database." (pr-str card-id))
{:type qp.error-type/invalid-query
:source-database-id source-database-id
:card-database-id (:database-id card)}))))
(normalize-card-query metadata-providerable card))) | |
(mu/defn- resolve-source-cards-in-stage :- [:maybe ::lib.schema/stages]
[query :- ::lib.schema/query
stage :- ::lib.schema/stage
dep-graph :- (lib.schema.common/instance-of-class clojure.lang.Volatile)]
(when (and (= (:lib/type stage) :mbql.stage/mbql)
(:source-card stage))
;; make sure nested queries are enabled before resolving them.
(when-not (public-settings/enable-nested-queries)
(throw (ex-info (trs "Nested queries are disabled")
{:type qp.error-type/disabled-feature, :card-id (:source-card stage)})))
;; If the first stage came from a different source card (i.e., we are doing recursive resolution) record the
;; dependency of the previously-resolved source card on the one we're about to resolve. We can check for circular
;; dependencies this way.
(when (:qp/stage-is-from-source-card stage)
(u/prog1 (vswap! dep-graph
dep/depend
(tru "Card {0}" (:qp/stage-is-from-source-card stage))
(tru "Card {0}" (:source-card stage)))
;; This will throw if there's a cycle
(dep/topo-sort <>)))
(let [card (card query (:source-card stage))
card-stages (get-in card [:dataset-query :stages])
;; this information is used by [[metabase.query-processor.middleware.annotate/col-info-for-field-clause*]]
stage' (-> stage
;; these keys are used by the [[metabase.query-processor.middleware.annotate]] middleware to
;; decide whether to "flow" the Card's metadata or not (whether to use it preferentially over
;; the metadata associated with Fields themselves)
(assoc :qp/stage-had-source-card (:id card)
:source-query/model? (= (:type card) :model))
(dissoc :source-card))]
(into (vec card-stages) [stage'])))) | |
(def ^:private max-recursion-depth 50) | |
(defn- resolve-source-cards* [original-query recursion-depth dep-graph]
;; this is mostly to catch programmer bugs and avoid infinite loops, thus not i18n'ed. Dep graph should circular
;; dependencies if they occur
(assert (<= recursion-depth max-recursion-depth)
(format "Source Cards not fully resolved after %d iterations." max-recursion-depth))
(let [updated-query (lib.walk/walk-stages
original-query
(fn [query _path stage]
(resolve-source-cards-in-stage query stage dep-graph)))
card-id (some :qp/stage-had-source-card
(reverse (:stages updated-query)))
;; `:qp/source-card-id` is used by [[metabase.query-processor.middleware.results-metadata/record-metadata!]] to
;; decide whether to record metadata as well as by the [[add-dataset-info]] post-processing middleware, and
;; by [[metabase.query-processor.middleware.permissions/check-query-permissions*]]
updated-query (cond-> updated-query
card-id (-> (update :qp/source-card-id #(or % card-id))
(update-in [:info :card-id] #(or % card-id))))]
(if (= updated-query original-query)
original-query
;; if any resolution happened, recursively resolve things in the updated query in case we need to do MORE.
(recur updated-query (inc recursion-depth) dep-graph)))) | |
(mu/defn resolve-source-cards :- ::lib.schema/query
"If a stage has a `:source-card`, fetch the Card and prepend its underlying stages to the pipeline."
[query :- ::lib.schema/query]
(let [query (dissoc query :source-card-id :qp/source-card-id)] ; `:source-card-id` was the old key
(resolve-source-cards* query 0 (volatile! (dep/graph))))) | |
Post-processing middleware that adds TODO -- we should remove remove the | (defn add-dataset-info
[{:qp/keys [source-card-id], :as _preprocessed-query} rff]
(if-not source-card-id
rff
(let [model? (= (:type (lib.metadata.protocols/card (qp.store/metadata-provider) source-card-id)) :model)]
(fn rff' [metadata]
(rff (cond-> metadata model? (assoc :dataset model?, :model model?))))))) |