(ns metabase.xrays.transforms.core (:require [medley.core :as m] [metabase.driver :as driver] [metabase.legacy-mbql.schema :as mbql.s] [metabase.lib.ident :as lib.ident] [metabase.lib.schema.id :as lib.schema.id] [metabase.lib.util.match :as lib.util.match] [metabase.models.field :refer [Field]] [metabase.models.interface :as mi] [metabase.models.table :as table :refer [Table]] [metabase.query-processor.preprocess :as qp.preprocess] [metabase.util :as u] [metabase.util.i18n :refer [tru]] [metabase.util.malli :as mu] [metabase.util.malli.schema :as ms] [metabase.xrays.domain-entities.core :as de :refer [Bindings DimensionBindings SourceEntity SourceName]] [metabase.xrays.domain-entities.specs :refer [domain-entity-specs DomainEntitySpec]] [metabase.xrays.transforms.materialize :as tf.materialize] [metabase.xrays.transforms.specs :refer [Step transform-specs TransformSpec]] [toucan2.core :as t2])) | |
(mu/defn- add-bindings :- Bindings [bindings :- Bindings source :- SourceName new-bindings :- [:maybe DimensionBindings]] (reduce-kv (fn [bindings name definition] (->> definition (de/resolve-dimension-clauses bindings source) (assoc-in bindings [source :dimensions name]))) bindings new-bindings)) | |
(defn- mbql-reference->col-name [field-clause] (lib.util.match/match-one field-clause [:field (field-name :guard string?) _] field-name [:field (id :guard integer?) _] (t2/select-one-fn :name Field :id id))) | |
(mu/defn- infer-resulting-dimensions :- DimensionBindings [bindings :- Bindings {:keys [joins name]} :- Step query :- mbql.s/Query] (let [flattened-bindings (merge (apply merge (map (comp :dimensions bindings :source) joins)) (get-in bindings [name :dimensions]))] (into {} (for [{:keys [name] :as col} (qp.preprocess/query->expected-cols query)] [(if (flattened-bindings name) name ;; If the col is not one of our own we have to reconstruct to what it refers in ;; our parlance (or (some->> flattened-bindings (m/find-first (comp #{name} mbql-reference->col-name)) key) ;; If that doesn't work either, it's a duplicated col from a join name)) (de/mbql-reference col)])))) | |
(defn- maybe-add-fields [bindings {:keys [aggregation source]} query] (if-not aggregation (assoc query :fields (vals (get-in bindings [source :dimensions]))) query)) | |
(defn- maybe-add-expressions [bindings {:keys [expressions name]} query] (if expressions (let [expr-clauses (->> expressions keys (select-keys (get-in bindings [name :dimensions])))] (-> query (assoc :expressions expr-clauses :expression-idents (update-vals expr-clauses (fn [_] (u/generate-nano-id)))) (update :fields concat (for [expression (keys expressions)] [:expression expression])))) query)) | |
(defn- indexed-idents [seqable] (when (seq seqable) (into {} (map (fn [i] [i (u/generate-nano-id)])) (range (count seqable))))) | |
(defn- maybe-add-aggregation [bindings {:keys [name aggregation]} query] (let [aggs (->> (for [agg (keys aggregation)] [:aggregation-options (get-in bindings [name :dimensions agg]) {:name agg}]) not-empty)] (m/assoc-some query :aggregation aggs :aggregation-idents (indexed-idents aggs)))) | |
(defn- maybe-add-breakout [bindings {:keys [name breakout]} query] (let [breakouts (not-empty (for [breakout breakout] (de/resolve-dimension-clauses bindings name breakout)))] (m/assoc-some query :breakout breakouts :breakout-idents (indexed-idents breakouts)))) | |
Serialize | (mu/defn- ->source-table-reference [entity :- SourceEntity] (if (mi/instance-of? Table entity) (u/the-id entity) (str "card__" (u/the-id entity)))) |
(defn- maybe-add-joins [bindings {context-source :source joins :joins} query] (m/assoc-some query :joins (not-empty (for [{:keys [source condition strategy]} joins] (-> {:condition (de/resolve-dimension-clauses bindings context-source condition) :source-table (-> source bindings :entity ->source-table-reference) :alias source :ident (u/generate-nano-id) :fields :all} (m/assoc-some :strategy strategy)))))) | |
(defn- maybe-add-filter [bindings {:keys [name filter]} query] (m/assoc-some query :filter (de/resolve-dimension-clauses bindings name filter))) | |
(defn- maybe-add-limit [_bindings {:keys [limit]} query] (m/assoc-some query :limit limit)) | |
(mu/defn- transform-step! :- Bindings [bindings :- Bindings {:keys [name source aggregation expressions] :as step} :- Step] (let [source-entity (get-in bindings [source :entity]) local-bindings (-> bindings (add-bindings name (get-in bindings [source :dimensions])) (add-bindings name expressions) (add-bindings name aggregation)) inner-query (->> {:source-table (->source-table-reference source-entity)} (maybe-add-fields local-bindings step) (maybe-add-expressions local-bindings step) (maybe-add-aggregation local-bindings step) (maybe-add-breakout local-bindings step) (maybe-add-joins local-bindings step) (maybe-add-filter local-bindings step) (maybe-add-limit local-bindings step)) inner-query (-> inner-query (m/assoc-some :expression-idents (lib.ident/indexed-idents (:expression inner-query))) (m/assoc-some :aggregation-idents (lib.ident/indexed-idents (:aggregation inner-query))) (m/assoc-some :breakout-idents (lib.ident/indexed-idents (:breakout inner-query)))) query {:type :query :query inner-query :database ((some-fn :db_id :database_id) source-entity)}] (assoc bindings name {:entity (tf.materialize/make-card-for-step! step query) :dimensions (infer-resulting-dimensions local-bindings step query)}))) | |
(def ^:private Tableset [:sequential (ms/InstanceOf Table)]) | |
(mu/defn- find-tables-with-domain-entity :- Tableset [tableset :- Tableset domain-entity-spec :- DomainEntitySpec] (filter #(-> % :domain_entity :type (isa? (:type domain-entity-spec))) tableset)) | |
(mu/defn- tableset->bindings :- Bindings [tableset :- Tableset] (into {} (for [{{domain-entity-name :name dimensions :dimensions} :domain_entity :as table} tableset] [domain-entity-name {:dimensions (m/map-vals de/mbql-reference dimensions) :entity table}]))) | |
(mu/defn- apply-transform-to-tableset! :- Bindings [tableset :- Tableset {:keys [steps _provides]} :- TransformSpec] (driver/with-driver (-> tableset first table/database :engine) (reduce transform-step! (tableset->bindings tableset) (vals steps)))) | |
(mu/defn- resulting-entities :- [:sequential SourceEntity] [bindings :- Bindings {:keys [provides]} :- TransformSpec] (map (comp :entity val) (select-keys bindings provides))) | |
(mu/defn- validate-results :- Bindings [bindings :- Bindings {:keys [provides]} :- TransformSpec] (doseq [domain-entity-name provides] (assert (de/satisfies-requierments? (get-in bindings [domain-entity-name :entity]) (@domain-entity-specs domain-entity-name)) (str (tru "Resulting transforms do not conform to expectations.\nExpected: {0}" domain-entity-name)))) bindings) | |
(mu/defn- tables-matching-requirements :- [:maybe Tableset] [tableset :- Tableset {:keys [requires]} :- TransformSpec] (let [matches (map (comp (partial find-tables-with-domain-entity tableset) @domain-entity-specs) requires)] (when (every? (comp #{1} count) matches) (map first matches)))) | |
(mu/defn- tableset :- Tableset [db-id :- ::lib.schema.id/database schema :- [:maybe :string]] (-> (t2/select :model/Table :db_id db-id :schema schema) de/with-domain-entity (t2/hydrate :fields))) | |
Apply transform defined by transform spec The algorithm is as follows: 1) Try to find a set of tables in the given schema that have required domain entities. 2) If found, use these tables and their fields as the initial bindings. 3) Go through the transform steps, materialize them as cards, and accure these and their result cols to the bindings. 4) Check that all output cards have the expected result shape. 5) Return the output cards. | (mu/defn apply-transform! [db-id :- ::lib.schema.id/database schema :- [:maybe :string] spec :- TransformSpec] (tf.materialize/fresh-collection-for-transform! spec) (some-> (tableset db-id schema) (tables-matching-requirements spec) (apply-transform-to-tableset! spec) (validate-results spec) (resulting-entities spec))) |
Return a list of candidate transforms for a given table. | (defn candidates [table] (filter (comp (partial some (comp #{(u/the-id table)} u/the-id)) (partial tables-matching-requirements (tableset (:db_id table) (:schema table)))) @transform-specs)) |