(ns metabase.query-processor.streaming (:require [metabase.async.streaming-response :as streaming-response] [metabase.legacy-mbql.util :as mbql.u] [metabase.lib.schema.common :as lib.schema.common] [metabase.models.visualization-settings :as mb.viz] [metabase.query-processor.pipeline :as qp.pipeline] [metabase.query-processor.schema :as qp.schema] [metabase.query-processor.streaming.csv :as qp.csv] [metabase.query-processor.streaming.interface :as qp.si] [metabase.query-processor.streaming.json :as qp.json] [metabase.query-processor.streaming.xlsx :as qp.xlsx] [metabase.util :as u] [metabase.util.log :as log] [metabase.util.malli :as mu]) (:import (clojure.core.async.impl.channels ManyToManyChannel) (java.io OutputStream) (metabase.async.streaming_response StreamingResponse) (org.eclipse.jetty.io EofException))) | |
(set! *warn-on-reflection* true) | |
these are loaded for side-effects so their impls of | (comment qp.csv/keep-me qp.json/keep-me qp.xlsx/keep-me) |
Deduplicate column names that would otherwise conflict. TODO: This function includes logic that is normally is done by the annotate middleware, but hasn't been run yet at this point in the code. We should eventually refactor this (#17195) | (defn- deduplicate-col-names [cols] (map (fn [col unique-name] (let [col-with-display-name (if (:display_name col) col (assoc col :display_name (:name col)))] (assoc col-with-display-name :name unique-name))) cols (mbql.u/uniquify-names (map :name cols)))) |
Validate that all of the columns in | (defn- validate-table-columms [table-columns cols] (let [col-field-refs (set (remove nil? (map :field_ref cols))) col-names (set (remove nil? (map :name cols)))] (when (every? (fn [table-col] (or (col-field-refs (::mb.viz/table-column-field-ref table-col)) (col-names (::mb.viz/table-column-name table-col)))) table-columns) table-columns))) |
Returns | (defn- pivot-grouping-exists? [cols] (some #(= (:name %) "pivot-grouping") cols)) |
For each entry in The resulting list of indices determines the order of column names and data in exports. | (defn- export-column-order [cols table-columns] (if (pivot-grouping-exists? cols) ;; If the columns contain a pivot-grouping, we're exporting a pivot and the cols order is not used, ;; so we can just pass the indices in order. (range (count cols)) (let [table-columns' (or (validate-table-columms table-columns cols) ;; If table-columns is not provided (e.g. for saved cards), we can construct a fake one ;; that retains the original column ordering in `cols` (for [col cols] (let [col-name (:name col) id-or-name (or (:id col) col-name) field-ref (:field_ref col)] {::mb.viz/table-column-field-ref (or field-ref [:field id-or-name nil]) ::mb.viz/table-column-enabled true ::mb.viz/table-column-name col-name}))) enabled-table-cols (filter ::mb.viz/table-column-enabled table-columns') cols-vector (into [] cols) ;; cols-index is a map from keys representing fields to their indices into `cols` cols-index (reduce-kv (fn [m i col] ;; Always add col-name as a key, so that native queries and remapped fields work correctly (let [m' (assoc m (:name col) i)] (if-let [field-ref (:field_ref col)] ;; Add a map key based on the column's field-ref, if available (assoc m' field-ref i) m'))) {} cols-vector)] (->> (map (fn [{field-ref ::mb.viz/table-column-field-ref, col-name ::mb.viz/table-column-name}] (let [index (or (get cols-index field-ref) (get cols-index col-name)) col (get cols-vector index) remapped-to-name (:remapped_to col) remapped-from-name (:remapped_from col)] (cond remapped-to-name (get cols-index remapped-to-name) (not remapped-from-name) index))) enabled-table-cols) (remove nil?))))) |
Dedups and orders | (defn order-cols [cols viz-settings] (let [deduped-cols (deduplicate-col-names cols) output-order (export-column-order deduped-cols (::mb.viz/table-columns viz-settings)) ordered-cols (if output-order (let [v (into [] deduped-cols)] (for [i output-order] (v i))) deduped-cols)] [ordered-cols output-order])) |
(mu/defn- streaming-rff :- ::qp.schema/rff [results-writer :- (lib.schema.common/instance-of-class metabase.query_processor.streaming.interface.StreamingResultsWriter)] (fn [{:keys [cols viz-settings] :as initial-metadata}] (let [[ordered-cols output-order] (order-cols cols viz-settings) viz-settings' (assoc viz-settings :output-order output-order) row-count (volatile! 0)] (fn ([] (log/trace "Writing initial metadata to results writer.") (qp.si/begin! results-writer {:data (assoc initial-metadata :ordered-cols ordered-cols)} viz-settings') {:data initial-metadata}) ([result] (assoc result :row_count @row-count :status :completed)) ([metadata row] (log/trace "Writing one row to results writer.") (qp.si/write-row! results-writer row (dec (vswap! row-count inc)) ordered-cols viz-settings') metadata))))) | |
(mu/defn- streaming-result-fn :- fn? [results-writer :- (lib.schema.common/instance-of-class metabase.query_processor.streaming.interface.StreamingResultsWriter) ^OutputStream os :- (lib.schema.common/instance-of-class OutputStream)] (fn result [result] (when (= (:status result) :completed) (log/debug "Finished writing results; closing results writer.") (try (qp.si/finish! results-writer result) (catch EofException _e (log/warn "Client closed connection prematurely"))) (u/ignore-exceptions (.flush os) (.close os))) (qp.pipeline/default-result-handler result))) | |
Context to pass to the QP to streaming results as (with-open [os ...] (qp.streaming/do-with-streaming-rff :csv os (fn [rff] (qp/process-query query rff)))) | (defn do-with-streaming-rff [export-format os f] (let [results-writer (qp.si/streaming-results-writer export-format os) rff (streaming-rff results-writer)] (binding [qp.pipeline/*result* (streaming-result-fn results-writer os)] (f rff)))) |
Impl for [[streaming-response]]. | (defn -streaming-response ^StreamingResponse [export-format filename-prefix f] (streaming-response/streaming-response (qp.si/stream-options export-format filename-prefix) [os canceled-chan] (do-with-streaming-rff export-format os (^:once fn* [rff] (let [result (try (binding [qp.pipeline/*canceled-chan* canceled-chan] (f rff)) (catch Throwable e e))] (assert (some? result) "QP unexpectedly returned nil.") ;; if you see this, it's because it's old code written before the changes in #35465... rework the code in ;; question to return a response directly instead of a core.async channel (assert (not (instance? ManyToManyChannel result)) "QP should not return a core.async channel.") (when (or (instance? Throwable result) (= (:status result) :failed)) (streaming-response/write-error! os result export-format))))))) |
Return results of processing a query as a streaming response. This response implements the appropriate Ring/Compojure
protocols, so return or Typical example: (api/defendpoint-schema GET "/whatever" [] (qp.streaming/streaming-response [rff :json] (qp/process-query (qp/userland-query-with-default-constraints query) rff))) Handles either async or sync QP results, but you should prefer returning sync results so we can handle query cancelations properly. | (defmacro streaming-response {:style/indent 1} [[map-binding export-format filename-prefix] & body] `(-streaming-response ~export-format ~filename-prefix (^:once fn* [~map-binding] ~@body))) |
Set of valid streaming response formats. Currently, | (defn export-formats [] (set (keys (methods qp.si/stream-options)))) |