(ns metabase.query-processor.reducible (:require [clojure.core.async :as a] [metabase.query-processor.pipeline :as qp.pipeline] [metabase.util.log :as log] [metabase.util.malli :as mu] [metabase.util.performance :as perf])) | |
(set! *warn-on-reflection* true) | |
Default function returning a reducing function. Results are returned in the 'standard' map format e.g. {:data {:cols [...], :rows [...]}, :row_count ...} | (defn default-rff [metadata] (let [row-count (volatile! 0) rows (volatile! (transient []))] (fn default-rf ([] {:data metadata}) ([result] {:pre [(map? (unreduced result))]} ;; if the result is a clojure.lang.Reduced, unwrap it so we always get back the standard-format map (-> (unreduced result) (assoc :row_count @row-count :status :completed) (assoc-in [:data :rows] (persistent! @rows)))) ([result row] (vswap! row-count inc) (vswap! rows conj! row) result)))) |
Utility function for generating reducible rows when implementing [[metabase.driver/execute-reducible-query]].
| (defn reducible-rows ([row-thunk] (reducible-rows row-thunk qp.pipeline/*canceled-chan*)) ([row-thunk canceled-chan] (reify clojure.lang.IReduceInit (reduce [_ rf init] (loop [acc init] (cond (reduced? acc) @acc (some-> canceled-chan a/poll!) acc :else (if-let [row (row-thunk)] (recur (rf acc row)) (do (log/trace "All rows consumed.") acc)))))))) |
Utility function for creating a reducing function that reduces results using (fn my-xform [rf] (combine-additional-reducing-fns rf [((take 100) conj)] (fn combine [result first-100-values] (rf (assoc result :first-100 first-100-values))))) This is useful for post-processing steps that need to reduce the result rows to provide some metadata that can be added to the final result. This is conceptually similar to a combination of [[redux.core/juxt]] and [[redux.core/post-complete]], with these differences:
| (mu/defn combine-additional-reducing-fns [primary-rf :- ifn? additional-rfs :- [:sequential ifn?] combine :- ifn?] (let [additional-accs (volatile! (perf/mapv (fn [rf] (rf)) additional-rfs))] (fn combine-additional-reducing-fns-rf* ([] (primary-rf)) ([acc] (let [additional-results (perf/mapv (fn [rf acc] (rf (unreduced acc))) additional-rfs @additional-accs)] (apply combine acc additional-results))) ([acc x] (vswap! additional-accs (fn [accs] (perf/mapv (fn [rf acc] (if (reduced? acc) acc (rf acc x))) additional-rfs accs))) (primary-rf acc x))))) |