(ns metabase.query-processor.middleware.fetch-source-query (:require [metabase.driver :as driver] [metabase.driver.ddl.interface :as ddl.i] [metabase.lib.card :as lib.card] [metabase.lib.metadata :as lib.metadata] [metabase.lib.metadata.protocols :as lib.metadata.protocols] [metabase.lib.query :as lib.query] [metabase.lib.schema :as lib.schema] [metabase.lib.schema.common :as lib.schema.common] [metabase.lib.schema.id :as lib.schema.id] [metabase.lib.schema.metadata :as lib.schema.metadata] [metabase.lib.walk :as lib.walk] [metabase.public-settings :as public-settings] [metabase.query-processor.error-type :as qp.error-type] [metabase.query-processor.store :as qp.store] [metabase.query-processor.util.persisted-cache :as qp.persisted] [metabase.util :as u] [metabase.util.i18n :refer [trs tru]] [metabase.util.log :as log] [metabase.util.malli :as mu] [weavejester.dependency :as dep])) | |
MongoDB native queries consist of a collection and a pipelne (query). TODO -- it's not great that this code lives here. This should be part of the MongoDB driver. We should NOT be hardcoding driver-specific behavior in generic QP middleware. TODO -- consider whether [[normalize-card-query]] should be moved into [[metabase.lib.card]], seems like it would make sense but it would involve teasing out some QP-specific stuff to make it work. | (defn- fix-mongodb-first-stage [[first-stage & more]] (let [first-stage (cond-> first-stage (and (= driver/*driver* :mongo) (= (:lib/type first-stage) :mbql.stage/native)) (update :native (fn [x] (if (map? x) x {:collection (:collection first-stage) :projections (:projections first-stage) :query x}))))] (cons first-stage more))) |
(mu/defn normalize-card-query :- ::lib.schema.metadata/card "Convert Card's query (`:datasaet-query`) to pMBQL as needed; splice in stage metadata and some extra keys." [metadata-providerable :- ::lib.schema.metadata/metadata-providerable {card-id :id, :as card} :- ::lib.schema.metadata/card] (let [persisted-info (:lib/persisted-info card) persisted? (qp.persisted/can-substitute? card persisted-info)] (when persisted? (log/infof "Found substitute cached query for card %s from %s.%s" card-id (ddl.i/schema-name {:id (:database-id card)} (public-settings/site-uuid)) (:table-name persisted-info))) (letfn [(update-stages [stages] (let [stages (fix-mongodb-first-stage stages) stages (for [stage stages] ;; This is for detecting circular refs below, and is later used as part of ;; permissions enforcement (assoc stage :qp/stage-is-from-source-card card-id)) card-metadata (into [] (remove :remapped-from) (lib.card/card-metadata-columns metadata-providerable card)) last-stage (cond-> (last stages) (seq card-metadata) (assoc-in [:lib/stage-metadata :columns] card-metadata) ;; This will be applied, if still appropriate, by ;; the [[metabase.query-processor.middleware.persistence]] middleware ;; ;; TODO -- not 100% sure I did this right, there are almost no tests for this persisted? (assoc :persisted-info/native (qp.persisted/persisted-info-native-query (:database-id card) persisted-info)))] (conj (vec (butlast stages)) last-stage))) (update-query [query] (-> (lib.query/query metadata-providerable query) (update :stages update-stages)))] (update card :dataset-query update-query)))) | |
(mu/defn- card :- ::lib.schema.metadata/card [metadata-providerable :- ::lib.schema.metadata/metadata-providerable card-id :- ::lib.schema.id/card] (let [card (or (lib.metadata/card metadata-providerable card-id) (throw (ex-info (tru "Card {0} does not exist." (pr-str card-id)) {:type qp.error-type/invalid-query, :card-id card-id})))] ;; make sure the Card has a valid query (when-not (:dataset-query card) (throw (ex-info (tru "Missing source query in Card {0}" card-id) {:type qp.error-type/invalid-query, :card-id card-id}))) ;; make sure this Card is from the same Database as the one we're running queries against. (let [source-database-id (u/the-id (lib.metadata/database metadata-providerable))] (when-not (= (:database-id card) source-database-id) (throw (ex-info (tru "Card {0} is from a different Database." (pr-str card-id)) {:type qp.error-type/invalid-query :source-database-id source-database-id :card-database-id (:database-id card)})))) (normalize-card-query metadata-providerable card))) | |
(mu/defn- resolve-source-cards-in-stage :- [:maybe ::lib.schema/stages] [query :- ::lib.schema/query stage :- ::lib.schema/stage dep-graph :- (lib.schema.common/instance-of-class clojure.lang.Volatile)] (when (and (= (:lib/type stage) :mbql.stage/mbql) (:source-card stage)) ;; make sure nested queries are enabled before resolving them. (when-not (public-settings/enable-nested-queries) (throw (ex-info (trs "Nested queries are disabled") {:type qp.error-type/disabled-feature, :card-id (:source-card stage)}))) ;; If the first stage came from a different source card (i.e., we are doing recursive resolution) record the ;; dependency of the previously-resolved source card on the one we're about to resolve. We can check for circular ;; dependencies this way. (when (:qp/stage-is-from-source-card stage) (u/prog1 (vswap! dep-graph dep/depend (tru "Card {0}" (:qp/stage-is-from-source-card stage)) (tru "Card {0}" (:source-card stage))) ;; This will throw if there's a cycle (dep/topo-sort <>))) (let [card (card query (:source-card stage)) card-stages (get-in card [:dataset-query :stages]) ;; this information is used by [[metabase.query-processor.middleware.annotate/col-info-for-field-clause*]] stage' (-> stage ;; these keys are used by the [[metabase.query-processor.middleware.annotate]] middleware to ;; decide whether to "flow" the Card's metadata or not (whether to use it preferentially over ;; the metadata associated with Fields themselves) (assoc :qp/stage-had-source-card (:id card) :source-query/model? (= (:type card) :model)) (dissoc :source-card))] (into (vec card-stages) [stage'])))) | |
(def ^:private max-recursion-depth 50) | |
(defn- resolve-source-cards* [original-query recursion-depth dep-graph] ;; this is mostly to catch programmer bugs and avoid infinite loops, thus not i18n'ed. Dep graph should circular ;; dependencies if they occur (assert (<= recursion-depth max-recursion-depth) (format "Source Cards not fully resolved after %d iterations." max-recursion-depth)) (let [updated-query (lib.walk/walk-stages original-query (fn [query _path stage] (resolve-source-cards-in-stage query stage dep-graph))) card-id (some :qp/stage-had-source-card (reverse (:stages updated-query))) ;; `:qp/source-card-id` is used by [[metabase.query-processor.middleware.results-metadata/record-metadata!]] to ;; decide whether to record metadata as well as by the [[add-dataset-info]] post-processing middleware, and ;; by [[metabase.query-processor.middleware.permissions/check-query-permissions*]] updated-query (cond-> updated-query card-id (-> (update :qp/source-card-id #(or % card-id)) (update-in [:info :card-id] #(or % card-id))))] (if (= updated-query original-query) original-query ;; if any resolution happened, recursively resolve things in the updated query in case we need to do MORE. (recur updated-query (inc recursion-depth) dep-graph)))) | |
(mu/defn resolve-source-cards :- ::lib.schema/query "If a stage has a `:source-card`, fetch the Card and prepend its underlying stages to the pipeline." [query :- ::lib.schema/query] (let [query (dissoc query :source-card-id :qp/source-card-id)] ; `:source-card-id` was the old key (resolve-source-cards* query 0 (volatile! (dep/graph))))) | |
Post-processing middleware that adds TODO -- we should remove remove the | (defn add-dataset-info [{:qp/keys [source-card-id], :as _preprocessed-query} rff] (if-not source-card-id rff (let [model? (= (:type (lib.metadata.protocols/card (qp.store/metadata-provider) source-card-id)) :model)] (fn rff' [metadata] (rff (cond-> metadata model? (assoc :dataset model?, :model model?))))))) |