(ns metabase.lib.aggregation (:refer-clojure :exclude [count distinct max min var]) (:require [medley.core :as m] [metabase.lib.common :as lib.common] [metabase.lib.dispatch :as lib.dispatch] [metabase.lib.equality :as lib.equality] [metabase.lib.hierarchy :as lib.hierarchy] [metabase.lib.metadata :as lib.metadata] [metabase.lib.metadata.calculation :as lib.metadata.calculation] [metabase.lib.options :as lib.options] [metabase.lib.ref :as lib.ref] [metabase.lib.schema :as lib.schema] [metabase.lib.schema.aggregation :as lib.schema.aggregation] [metabase.lib.schema.common :as lib.schema.common] [metabase.lib.schema.metadata :as lib.schema.metadata] [metabase.lib.temporal-bucket :as lib.temporal-bucket] [metabase.lib.types.isa :as lib.types.isa] [metabase.lib.util :as lib.util] [metabase.util :as u] [metabase.util.i18n :as i18n] [metabase.util.malli :as mu])) | |
(mu/defn column-metadata->aggregation-ref :- :mbql.clause/aggregation
"Given `:metadata/column` column metadata for an aggregation, construct an `:aggregation` reference."
[metadata :- ::lib.schema.metadata/column]
(let [options {:lib/uuid (str (random-uuid))
:effective-type ((some-fn :effective-type :base-type) metadata)
:lib/source-name (:name metadata)}
ag-uuid (:lib/source-uuid metadata)]
(assert ag-uuid "Metadata for an aggregation reference should include :lib/source-uuid")
[:aggregation options ag-uuid])) | |
(mu/defn resolve-aggregation :- ::lib.schema.aggregation/aggregation
"Resolve an aggregation with a specific `ag-uuid`."
[query :- ::lib.schema/query
stage-number :- :int
ag-uuid :- :string]
(let [{aggregations :aggregation} (lib.util/query-stage query stage-number)
found (m/find-first (comp #{ag-uuid} :lib/uuid second) aggregations)]
(when-not found
(throw (ex-info (i18n/tru "No aggregation with uuid {0}" ag-uuid)
{:uuid ag-uuid
:query query
:stage-number stage-number})))
found)) | |
(defmethod lib.metadata.calculation/describe-top-level-key-method :aggregation
[query stage-number _k]
(when-let [aggregations (not-empty (:aggregation (lib.util/query-stage query stage-number)))]
(lib.util/join-strings-with-conjunction
(i18n/tru "and")
(for [aggregation aggregations]
(lib.metadata.calculation/display-name query stage-number aggregation :long))))) | |
(defmethod lib.metadata.calculation/metadata-method :aggregation
[query stage-number [_ag {:keys [base-type effective-type], :as _opts} index, :as _aggregation-ref]]
(let [aggregation (resolve-aggregation query stage-number index)]
(merge
(lib.metadata.calculation/metadata query stage-number aggregation)
{:lib/source :source/aggregations
:lib/source-uuid (:lib/uuid (second aggregation))}
(when base-type
{:base-type base-type})
(when effective-type
{:effective-type effective-type})))) | |
TODO -- merge this stuff into | |
(defmethod lib.metadata.calculation/display-name-method :aggregation [query stage-number [_tag _opts index] style] (lib.metadata.calculation/display-name query stage-number (resolve-aggregation query stage-number index) style)) | |
(lib.hierarchy/derive ::count-aggregation ::aggregation) | |
count and cumulative count can both be used either with no args (count of rows) or with one arg (count of X, which
I think means count where X is not NULL or something like that. Basically | (doseq [tag [:count
:cum-count
:count-where]]
(lib.hierarchy/derive tag ::count-aggregation)) |
(defmethod lib.metadata.calculation/display-name-method ::count-aggregation
[query stage-number [tag _opts x] style]
;; x is optional.
(if x
(let [x-display-name (lib.metadata.calculation/display-name query stage-number x style)]
(case tag
:count (i18n/tru "Count of {0}" x-display-name)
:cum-count (i18n/tru "Cumulative count of {0}" x-display-name)))
(case tag
:count (i18n/tru "Count")
:cum-count (i18n/tru "Cumulative count")))) | |
(defmethod lib.metadata.calculation/column-name-method ::count-aggregation
[_query _stage-number [tag :as _clause]]
(case tag
:count "count"
:cum-count "count"
:count-where "count_where")) | |
(defmethod lib.metadata.calculation/metadata-method ::quantity-aggregation
[query stage-number clause]
(assoc ((get-method lib.metadata.calculation/metadata-method ::aggregation) query stage-number clause)
:semantic-type :type/Quantity)) | |
(lib.hierarchy/derive ::quantity-aggregation ::aggregation) (lib.hierarchy/derive ::count-aggregation ::quantity-aggregation) (lib.hierarchy/derive :distinct ::quantity-aggregation) | |
(defmethod lib.metadata.calculation/display-name-method :case [_query _stage-number _case _style] (i18n/tru "Case")) | |
(defmethod lib.metadata.calculation/column-name-method :case [_query _stage-number _case] "case") | |
(defmethod lib.metadata.calculation/display-name-method :if [_query _stage-number _case _style] (i18n/tru "If")) | |
(defmethod lib.metadata.calculation/column-name-method :if [_query _stage-number _case] "if") | |
TODO - Should | |
(lib.hierarchy/derive ::unary-aggregation ::aggregation) | |
(doseq [tag [:avg
:cum-sum
:distinct
:max
:median
:min
:stddev
:sum
:var]]
(lib.hierarchy/derive tag ::unary-aggregation)) | |
(defmethod lib.metadata.calculation/column-name-method ::unary-aggregation
[_query _stage-number [tag _opts _arg]]
(case tag
:avg "avg"
:cum-sum "sum"
:distinct "count"
:max "max"
:median "median"
:min "min"
:stddev "stddev"
:sum "sum"
:var "var")) | |
(defmethod lib.metadata.calculation/display-name-method ::unary-aggregation
[query stage-number [tag _opts arg] style]
(let [arg (lib.metadata.calculation/display-name query stage-number arg style)]
(case tag
:avg (i18n/tru "Average of {0}" arg)
:cum-sum (i18n/tru "Cumulative sum of {0}" arg)
:distinct (i18n/tru "Distinct values of {0}" arg)
:max (i18n/tru "Max of {0}" arg)
:median (i18n/tru "Median of {0}" arg)
:min (i18n/tru "Min of {0}" arg)
:stddev (i18n/tru "Standard deviation of {0}" arg)
:sum (i18n/tru "Sum of {0}" arg)
:var (i18n/tru "Variance of {0}" arg)))) | |
(defmethod lib.metadata.calculation/display-name-method :percentile
[query stage-number [_percentile _opts x p] style]
(i18n/tru "{0}th percentile of {1}" p (lib.metadata.calculation/display-name query stage-number x style))) | |
(defmethod lib.metadata.calculation/column-name-method :percentile [_query _stage-number _clause] "percentile") | |
(lib.hierarchy/derive ::no-semantic-type ::aggregation) (doseq [tag [:percentile :var]] (lib.hierarchy/derive tag ::no-semantic-type)) | |
The default preserves the semantic type. But for ::no-semantic-type we should drop | |
(defmethod lib.metadata.calculation/metadata-method ::no-semantic-type
[query stage-number clause]
(dissoc ((get-method lib.metadata.calculation/metadata-method ::aggregation) query stage-number clause)
:semantic-type)) | |
we don't currently have sophisticated logic for generating nice display names for filter clauses. TODO : wait a minute, we do have that stuff now! | |
(defmethod lib.metadata.calculation/display-name-method :sum-where
[query stage-number [_sum-where _opts x _pred] style]
(i18n/tru "Sum of {0} matching condition" (lib.metadata.calculation/display-name query stage-number x style))) | |
(defmethod lib.metadata.calculation/column-name-method :sum-where [query stage-number [_sum-where _opts x _pred]] (str "sum_where_" (lib.metadata.calculation/column-name query stage-number x))) | |
(lib.hierarchy/derive :sum-where ::aggregation) | |
(defmethod lib.metadata.calculation/display-name-method :share [_query _stage-number _share _style] (i18n/tru "Share of rows matching condition")) | |
(defmethod lib.metadata.calculation/column-name-method :share [_query _stage-number _share] "share") | |
(defmethod lib.metadata.calculation/metadata-method :share
[query stage-number clause]
(assoc ((get-method lib.metadata.calculation/metadata-method ::aggregation) query stage-number clause)
:semantic-type :type/Percentage)) | |
(lib.hierarchy/derive :share ::aggregation) | |
(defmethod lib.metadata.calculation/display-name-method :count-where [_query _stage-number _count-where _style] (i18n/tru "Count of rows matching condition")) | |
(defmethod lib.metadata.calculation/metadata-method ::aggregation
[query stage-number [_tag _opts first-arg :as clause]]
(merge
;; flow the `:options` from the field we're aggregating. This is important, for some reason.
;; See [[metabase.query-processor-test.aggregation-test/field-settings-for-aggregate-fields-test]]
(when first-arg
(select-keys (lib.metadata.calculation/metadata query stage-number first-arg) [:settings :semantic-type]))
((get-method lib.metadata.calculation/metadata-method :default) query stage-number clause))) | |
(lib.common/defop count [] [x]) (lib.common/defop cum-count [] [x]) (lib.common/defop count-where [x y]) (lib.common/defop avg [x]) (lib.common/defop distinct [x]) (lib.common/defop max [x]) (lib.common/defop median [x]) (lib.common/defop min [x]) (lib.common/defop percentile [x y]) (lib.common/defop share [x]) (lib.common/defop stddev [x]) (lib.common/defop sum [x]) (lib.common/defop cum-sum [x]) (lib.common/defop sum-where [x y]) (lib.common/defop var [x]) | |
(defmethod lib.ref/ref-method :aggregation [aggregation-clause] aggregation-clause) | |
Schema for something you can pass to [[aggregate]] to add to a query as an aggregation. | (def ^:private Aggregable [:or ::lib.schema.aggregation/aggregation ::lib.schema.common/external-op ::lib.schema.metadata/metric]) |
(mu/defn aggregate :- ::lib.schema/query
"Adds an aggregation to query."
([query aggregable]
(aggregate query -1 aggregable))
([query :- ::lib.schema/query
stage-number :- :int
aggregable :- Aggregable]
;; if this is a Metric metadata, convert it to `:metric` MBQL clause before adding.
(if (= (lib.dispatch/dispatch-value aggregable) :metadata/metric)
(recur query stage-number (lib.ref/ref aggregable))
(lib.util/add-summary-clause query stage-number :aggregation aggregable)))) | |
(mu/defn aggregations :- [:maybe [:sequential ::lib.schema.aggregation/aggregation]]
"Get the aggregations in a given stage of a query."
([query]
(aggregations query -1))
([query :- ::lib.schema/query
stage-number :- :int]
(not-empty (:aggregation (lib.util/query-stage query stage-number))))) | |
(mu/defn aggregations-metadata :- [:maybe [:sequential ::lib.schema.metadata/column]]
"Get metadata about the aggregations in a given stage of a query."
([query]
(aggregations-metadata query -1))
([query :- ::lib.schema/query
stage-number :- :int]
(some->> (not-empty (:aggregation (lib.util/query-stage query stage-number)))
(into [] (map (fn [aggregation]
(let [metadata (lib.metadata.calculation/metadata query stage-number aggregation)]
(-> metadata
(u/assoc-default :effective-type (or (:base-type metadata) :type/*))
(assoc :lib/source :source/aggregations
:lib/source-uuid (:lib/uuid (second aggregation))))))))))) | |
(def ^:private OperatorWithColumns
[:merge
::lib.schema.aggregation/operator
[:map
[:columns {:optional true} [:sequential ::lib.schema.metadata/column]]]]) | |
(defmethod lib.metadata.calculation/display-name-method :operator/aggregation
[_query _stage-number {:keys [display-info]} _display-name-style]
(:display-name (display-info))) | |
(defmethod lib.metadata.calculation/display-info-method :operator/aggregation
[_query _stage-number {:keys [display-info requires-column? selected?] short-name :short}]
(cond-> (assoc (display-info)
:short-name (u/qualified-name short-name)
:requires-column requires-column?)
(some? selected?) (assoc :selected selected?))) | |
(mu/defn aggregation-operator-columns :- [:maybe [:sequential ::lib.schema.metadata/column]] "Returns the columns for which `aggregation-operator` is applicable." [aggregation-operator :- OperatorWithColumns] (:columns aggregation-operator)) | |
(mu/defn available-aggregation-operators :- [:maybe [:sequential OperatorWithColumns]]
"Returns the available aggegation operators for the stage with `stage-number` of `query`.
If `stage-number` is omitted, uses the last stage."
([query]
(available-aggregation-operators query -1))
([query :- ::lib.schema/query
stage-number :- :int]
(let [db-features (or (:features (lib.metadata/database query)) #{})
stage (lib.util/query-stage query stage-number)
columns (lib.metadata.calculation/visible-columns query stage-number stage)
with-columns (fn [{:keys [requires-column? supported-field] :as operator}]
(cond
(not requires-column?)
operator
(= supported-field :any)
(assoc operator :columns columns)
:else
(when-let [cols (->> columns
(filterv #(lib.types.isa/field-type? supported-field %))
not-empty)]
(assoc operator :columns cols))))]
(not-empty
(into []
(comp (filter (fn [op]
(let [feature (:driver-feature op)]
(or (nil? feature) (db-features feature)))))
(keep with-columns)
(map #(assoc % :lib/type :operator/aggregation)))
lib.schema.aggregation/aggregation-operators))))) | |
(mu/defn aggregation-clause :- ::lib.schema.aggregation/aggregation
"Returns a standalone aggregation clause for an `aggregation-operator` and
a `column`.
For aggregations requiring an argument `column` is mandatory, otherwise
it is optional."
([aggregation-operator :- ::lib.schema.aggregation/operator]
(if-not (:requires-column? aggregation-operator)
(lib.options/ensure-uuid [(:short aggregation-operator) {}])
(throw (ex-info (lib.util/format "aggregation operator %s requires an argument"
(:short aggregation-operator))
{:aggregation-operator aggregation-operator}))))
([aggregation-operator :- ::lib.schema.aggregation/operator
column]
(lib.options/ensure-uuid [(:short aggregation-operator) {} (lib.common/->op-arg column)]))) | |
(def ^:private SelectedOperatorWithColumns
[:merge
::lib.schema.aggregation/operator
[:map
[:columns {:optional true} [:sequential ::lib.schema.metadata/column]]
[:selected? {:optional true} :boolean]]]) | |
(mu/defn selected-aggregation-operators :- [:maybe [:sequential SelectedOperatorWithColumns]]
"Mark the operator and the column (if any) in `agg-operators` selected by `agg-clause`."
[agg-operators :- [:maybe [:sequential OperatorWithColumns]]
agg-clause]
(when (seq agg-operators)
(let [[op _ agg-col] agg-clause
agg-temporal-unit (-> agg-col lib.options/options :temporal-unit)]
(mapv (fn [agg-op]
(cond-> agg-op
(= (:short agg-op) op)
(-> (assoc :selected? true)
(m/update-existing
:columns
(fn [cols]
(if (lib.util/ref-clause? agg-col)
(let [cols (lib.equality/mark-selected-columns
cols
[(lib.options/update-options agg-col dissoc :temporal-unit)])]
(mapv (fn [c]
(cond-> c
(some? agg-temporal-unit)
(lib.temporal-bucket/with-temporal-bucket agg-temporal-unit)))
cols))
cols))))))
agg-operators)))) | |
(mu/defn aggregation-ref :- :mbql.clause/aggregation
"Find the aggregation at `ag-index` and create an `:aggregation` ref for it. Intended for use
when creating queries using threading macros e.g.
(-> (lib/query ...)
(lib/aggregate (lib/avg ...))
(as-> <> (lib/order-by <> (lib/aggregation-ref <> 0))))"
([query ag-index]
(aggregation-ref query -1 ag-index))
([query :- ::lib.schema/query
stage-number :- :int
ag-index :- ::lib.schema.common/int-greater-than-or-equal-to-zero]
(if-let [[_ {ag-uuid :lib/uuid}] (get (:aggregation (lib.util/query-stage query stage-number)) ag-index)]
(lib.options/ensure-uuid [:aggregation {} ag-uuid])
(throw (ex-info (str "Undefined aggregation " ag-index)
{:aggregation-index ag-index
:query query
:stage-number stage-number}))))) | |
(mu/defn aggregation-at-index :- [:maybe ::lib.schema.aggregation/aggregation]
"Get the aggregation at `index` in a stage of the query if it exists, otherwise `nil`. This is mostly for working
with legacy references like
[:aggregation 0]"
[query :- ::lib.schema/query
stage-number :- :int
index :- ::lib.schema.common/int-greater-than-or-equal-to-zero]
(let [ags (aggregations query stage-number)]
(when (> (clojure.core/count ags) index)
(nth ags index)))) | |
(mu/defn aggregation-column :- [:maybe ::lib.schema.metadata/column]
"Returns the column consumed by this aggregation, eg. the column being summed.
Returns nil for aggregations like `[:count]` that don't specify a column."
[query :- ::lib.schema/query
stage-number :- :int
[_operator _opts column-ref :as _aggregation] :- ::lib.schema.aggregation/aggregation]
(when column-ref
(->> (lib.util/query-stage query stage-number)
(lib.metadata.calculation/visible-columns query stage-number)
(lib.equality/find-matching-column column-ref)))) | |