(ns metabase.query-processor.reducible (:require [clojure.core.async :as a] [metabase.query-processor.pipeline :as qp.pipeline] [metabase.util.log :as log] [metabase.util.malli :as mu] [metabase.util.performance :as perf])) | |
(set! *warn-on-reflection* true) | |
Default function returning a reducing function. Results are returned in the 'standard' map format e.g. {:data {:cols [...], :rows [...]}, :row_count ...} | (defn default-rff
[metadata]
(let [row-count (volatile! 0)
rows (volatile! (transient []))]
(fn default-rf
([]
{:data metadata})
([result]
{:pre [(map? (unreduced result))]}
;; if the result is a clojure.lang.Reduced, unwrap it so we always get back the standard-format map
(-> (unreduced result)
(assoc :row_count @row-count
:status :completed)
(assoc-in [:data :rows] (persistent! @rows))))
([result row]
(vswap! row-count inc)
(vswap! rows conj! row)
result)))) |
Utility function for generating reducible rows when implementing [[metabase.driver/execute-reducible-query]].
| (defn reducible-rows
([row-thunk]
(reducible-rows row-thunk qp.pipeline/*canceled-chan*))
([row-thunk canceled-chan]
(reify
clojure.lang.IReduceInit
(reduce [_ rf init]
(loop [acc init]
(cond
(reduced? acc)
@acc
(some-> canceled-chan a/poll!)
acc
:else
(if-let [row (row-thunk)]
(recur (rf acc row))
(do
(log/trace "All rows consumed.")
acc)))))))) |
Utility function for creating a reducing function that reduces results using (fn my-xform [rf] (combine-additional-reducing-fns rf [((take 100) conj)] (fn combine [result first-100-values] (rf (assoc result :first-100 first-100-values))))) This is useful for post-processing steps that need to reduce the result rows to provide some metadata that can be added to the final result. This is conceptually similar to a combination of [[redux.core/juxt]] and [[redux.core/post-complete]], with these differences:
| (mu/defn combine-additional-reducing-fns
[primary-rf :- ifn?
additional-rfs :- [:sequential ifn?]
combine :- ifn?]
(let [additional-accs (volatile! (perf/mapv (fn [rf] (rf))
additional-rfs))]
(fn combine-additional-reducing-fns-rf*
([] (primary-rf))
([acc]
(let [additional-results (perf/mapv (fn [rf acc]
(rf (unreduced acc)))
additional-rfs
@additional-accs)]
(apply combine acc additional-results)))
([acc x]
(vswap! additional-accs (fn [accs]
(perf/mapv (fn [rf acc]
(if (reduced? acc)
acc
(rf acc x)))
additional-rfs
accs)))
(primary-rf acc x))))) |