| (ns metabase.query-processor.streaming (:require [metabase.legacy-mbql.util :as mbql.u] [metabase.lib.schema.common :as lib.schema.common] [metabase.models.visualization-settings :as mb.viz] [metabase.query-processor.pipeline :as qp.pipeline] [metabase.query-processor.schema :as qp.schema] [metabase.query-processor.streaming.csv :as qp.csv] [metabase.query-processor.streaming.interface :as qp.si] [metabase.query-processor.streaming.json :as qp.json] [metabase.query-processor.streaming.xlsx :as qp.xlsx] [metabase.server.streaming-response :as streaming-response] [metabase.util :as u] [metabase.util.log :as log] [metabase.util.malli :as mu]) (:import (clojure.core.async.impl.channels ManyToManyChannel) (java.io OutputStream) (metabase.server.streaming_response StreamingResponse) (org.eclipse.jetty.io EofException))) | |
| (set! *warn-on-reflection* true) | |
| these are loaded for side-effects so their impls of  | (comment qp.csv/keep-me
         qp.json/keep-me
         qp.xlsx/keep-me) | 
| Deduplicate column names that would otherwise conflict. TODO: This function includes logic that is normally is done by the annotate middleware, but hasn't been run yet at this point in the code. We should eventually refactor this (#17195) | (defn- deduplicate-col-names
  [cols]
  (map (fn [col unique-name]
         (let [col-with-display-name (if (:display_name col)
                                       col
                                       (assoc col :display_name (:name col)))]
           (assoc col-with-display-name :name unique-name)))
       cols
       (mbql.u/uniquify-names (map :name cols)))) | 
| Validate that all of the columns in  | (defn- validate-table-columms
  [table-columns cols]
  (let [col-field-refs (set (remove nil? (map :field_ref cols)))
        col-names      (set (remove nil? (map :name cols)))]
    (when (every? (fn [table-col] (or (col-field-refs (::mb.viz/table-column-field-ref table-col))
                                      (col-names (::mb.viz/table-column-name table-col))))
                  table-columns)
      table-columns))) | 
| Returns  | (defn- pivot-grouping-exists? [cols] (some #(= (:name %) "pivot-grouping") cols)) | 
| For each entry in  The resulting list of indices determines the order of column names and data in exports. | (defn- export-column-order
  [cols table-columns]
  (if (pivot-grouping-exists? cols)
    ;; If the columns contain a pivot-grouping, we're exporting a pivot and the cols order is not used,
    ;; so we can just pass the indices in order.
    (range (count cols))
    (let [table-columns'     (or (validate-table-columms table-columns cols)
                                 ;; If table-columns is not provided (e.g. for saved cards), we can construct a fake one
                                 ;; that retains the original column ordering in `cols`
                                 (for [col cols]
                                   (let [col-name   (:name col)
                                         id-or-name (or (:id col) col-name)
                                         field-ref  (:field_ref col)]
                                     {::mb.viz/table-column-field-ref (or field-ref [:field id-or-name nil])
                                      ::mb.viz/table-column-enabled   true
                                      ::mb.viz/table-column-name      col-name})))
          enabled-table-cols (filter ::mb.viz/table-column-enabled table-columns')
          cols-vector        (into [] cols)
          ;; cols-index is a map from keys representing fields to their indices into `cols`
          cols-index         (reduce-kv (fn [m i col]
                                          ;; Always add col-name as a key, so that native queries and remapped fields work correctly
                                          (let [m' (assoc m (:name col) i)]
                                            (if-let [field-ref (:field_ref col)]
                                              ;; Add a map key based on the column's field-ref, if available
                                              (assoc m' field-ref i)
                                              m')))
                                        {}
                                        cols-vector)]
      (->> (map
            (fn [{field-ref ::mb.viz/table-column-field-ref, col-name ::mb.viz/table-column-name}]
              (let [index              (or (get cols-index field-ref)
                                           (get cols-index col-name))
                    col                (get cols-vector index)
                    remapped-to-name   (:remapped_to col)
                    remapped-from-name (:remapped_from col)]
                (cond
                  remapped-to-name
                  (get cols-index remapped-to-name)
                  (not remapped-from-name)
                  index)))
            enabled-table-cols)
           (remove nil?))))) | 
| Dedups and orders  | (defn order-cols
  [cols viz-settings]
  (let [deduped-cols  (deduplicate-col-names cols)
        output-order  (export-column-order deduped-cols (::mb.viz/table-columns viz-settings))
        ordered-cols  (if output-order
                        (let [v (into [] deduped-cols)]
                          (for [i output-order] (v i)))
                        deduped-cols)]
    [ordered-cols output-order])) | 
| (mu/defn- streaming-rff :- ::qp.schema/rff
  [results-writer :- (lib.schema.common/instance-of-class metabase.query_processor.streaming.interface.StreamingResultsWriter)]
  (fn [{:keys [cols viz-settings] :as initial-metadata}]
    (let [[ordered-cols output-order] (order-cols cols viz-settings)
          viz-settings'               (assoc viz-settings :output-order output-order)
          row-count                   (volatile! 0)]
      (fn
        ([]
         (log/trace "Writing initial metadata to results writer.")
         (qp.si/begin! results-writer
                       {:data (assoc initial-metadata :ordered-cols ordered-cols)}
                       viz-settings')
         {:data initial-metadata})
        ([result]
         (assoc result
                :row_count @row-count
                :status :completed))
        ([metadata row]
         (log/trace "Writing one row to results writer.")
         (qp.si/write-row! results-writer row (dec (vswap! row-count inc)) ordered-cols viz-settings')
         metadata))))) | |
| (mu/defn- streaming-result-fn :- fn?
  [results-writer   :- (lib.schema.common/instance-of-class metabase.query_processor.streaming.interface.StreamingResultsWriter)
   ^OutputStream os :- (lib.schema.common/instance-of-class OutputStream)]
  (fn result [result]
    (when (= (:status result) :completed)
      (log/debug "Finished writing results; closing results writer.")
      (try
        (qp.si/finish! results-writer result)
        (catch EofException _e
          (log/warn "Client closed connection prematurely")))
      (u/ignore-exceptions
        (.flush os)
        (.close os)))
    (qp.pipeline/default-result-handler result))) | |
| Context to pass to the QP to streaming results as  (with-open [os ...] (qp.streaming/do-with-streaming-rff :csv os (fn [rff] (qp/process-query query rff)))) | (defn do-with-streaming-rff
  [export-format os f]
  (let [results-writer (qp.si/streaming-results-writer export-format os)
        rff            (streaming-rff results-writer)]
    (binding [qp.pipeline/*result* (streaming-result-fn results-writer os)]
      (f rff)))) | 
| Impl for [[streaming-response]]. | (defn -streaming-response
  ^StreamingResponse [export-format filename-prefix f]
  (streaming-response/streaming-response (qp.si/stream-options export-format filename-prefix) [os canceled-chan]
    (do-with-streaming-rff
     export-format os
     (^:once fn* [rff]
       (let [result (try
                      (binding [qp.pipeline/*canceled-chan* canceled-chan]
                        (f rff))
                      (catch Throwable e
                        e))]
         (assert (some? result) "QP unexpectedly returned nil.")
        ;; if you see this, it's because it's old code written before the changes in #35465... rework the code in
        ;; question to return a response directly instead of a core.async channel
         (assert (not (instance? ManyToManyChannel result)) "QP should not return a core.async channel.")
         (when (or (instance? Throwable result)
                   (= (:status result) :failed))
           (streaming-response/write-error! os result export-format))))))) | 
| Return results of processing a query as a streaming response. This response implements the appropriate Ring/Compojure
protocols, so return or  Typical example: (api.macros/defendpoint :get "/whatever" [] (qp.streaming/streaming-response [rff :json] (qp/process-query (qp/userland-query-with-default-constraints query) rff))) Handles either async or sync QP results, but you should prefer returning sync results so we can handle query cancelations properly. | (defmacro streaming-response
  {:style/indent 1}
  [[map-binding export-format filename-prefix] & body]
  `(-streaming-response ~export-format ~filename-prefix (^:once fn* [~map-binding] ~@body))) | 
| Set of valid streaming response formats. Currently,  | (defn export-formats [] (set (keys (methods qp.si/stream-options)))) |