(ns metabase.query-processor.streaming.csv (:require [clojure.data.csv] [clojure.string :as str] [java-time.api :as t] [medley.core :as m] [metabase.formatter :as formatter] [metabase.models.visualization-settings :as mb.viz] [metabase.public-settings :as public-settings] [metabase.query-processor.pivot.postprocess :as qp.pivot.postprocess] [metabase.query-processor.streaming.common :as common] [metabase.query-processor.streaming.interface :as qp.si] [metabase.util :as u] [metabase.util.date-2 :as u.date] [metabase.util.performance :as perf]) (:import (java.io BufferedWriter OutputStream OutputStreamWriter) (java.nio.charset StandardCharsets))) | |
(set! *warn-on-reflection* true) | |
(defmethod qp.si/stream-options :csv ([_] (qp.si/stream-options :csv "query_result")) ([_ filename-prefix] {:content-type "text/csv" :status 200 :headers {"Content-Disposition" (format "attachment; filename=\"%s_%s.csv\"" (or filename-prefix "query_result") (u.date/format (t/zoned-date-time)))} :write-keepalive-newlines? false})) | |
Flag to enable/disable export post-processing of pivot tables. Disabled by default and should remain disabled until Issue #44556 is resolved and a clear plan is made. As a first step towards hollistically solving this issue: https://github.com/metabase/metabase/issues/44556
(which is basically that very large pivot tables can crash the export process),
The post processing is disabled completely.
This should remain | (def ^:dynamic *pivot-export-post-processing-enabled* false) |
Custom implementation of | (defn- write-csv [writer data] (let [separator \, quote \" quote? (fn [^String s] (let [n (.length s)] (loop [i 0] (if (>= i n) false (let [ch (.charAt s (unchecked-int i))] (if (or (= ch \,) ;; separator (= ch \") ;; quote (= ch \return) (= ch \newline)) true (recur (unchecked-inc i)))))))) newline "\n"] (#'clojure.data.csv/write-csv* writer data separator quote quote? newline))) |
Rebind write-cell to avoid using clojure.core/escape. Instead, use String.replace with known arguments (we never change quote symbol anyway). | (.bindRoot #'clojure.data.csv/write-cell (fn [^java.io.Writer writer obj _ _ quote?] (let [^String string (str obj) must-quote (quote? string)] (when must-quote (.write writer "\"")) (.write writer (if must-quote (.replace string "\"" "\"\"") string)) (when must-quote (.write writer "\""))))) |
(defn- col->aggregation-fn-key [{agg-name :name source :source}] (when (= :aggregation source) (let [agg-name (u/lower-case-en agg-name)] (cond (str/starts-with? agg-name "sum") :sum (str/starts-with? agg-name "avg") :avg (str/starts-with? agg-name "min") :min (str/starts-with? agg-name "max") :max (str/starts-with? agg-name "count") :count (str/starts-with? agg-name "stddev") :stddev)))) | |
(defmethod qp.si/streaming-results-writer :csv [_ ^OutputStream os] (let [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8)) ordered-formatters (volatile! nil) pivot-data (atom nil)] (reify qp.si/StreamingResultsWriter (begin! [_ {{:keys [ordered-cols results_timezone format-rows? pivot-export-options pivot?] :or {format-rows? true pivot? false}} :data} viz-settings] (let [col-names (vec (common/column-titles ordered-cols (::mb.viz/column-settings viz-settings) format-rows?)) opts (when (and pivot? pivot-export-options) (-> (merge {:pivot-rows [] :pivot-cols [] :measures (mapv col->aggregation-fn-key ordered-cols)} pivot-export-options) (assoc :column-titles col-names) (qp.pivot.postprocess/add-totals-settings viz-settings) qp.pivot.postprocess/add-pivot-measures)) pivot-grouping-key (qp.pivot.postprocess/pivot-grouping-key col-names)] ;; initialize the pivot-data ;; If exporting pivoted, init the pivot data structure ;; Otherwise, just store the pivot-grouping key index (when (and pivot? pivot-export-options) (reset! pivot-data (qp.pivot.postprocess/init-pivot opts))) (when pivot-grouping-key (swap! pivot-data assoc :pivot-grouping pivot-grouping-key)) (vreset! ordered-formatters (mapv #(formatter/create-formatter results_timezone % viz-settings format-rows?) ordered-cols)) ;; write the column names for non-pivot tables (when (or (not opts) (not (public-settings/enable-pivoted-exports))) (let [header (m/remove-nth (or pivot-grouping-key (inc (count col-names))) col-names)] (write-csv writer [header]) (.flush writer))))) (write-row! [_ row _row-num _ {:keys [output-order]}] (let [ordered-row (if output-order (let [row-v (into [] row)] (into [] (for [i output-order] (row-v i)))) row) {:keys [pivot-grouping]} (or (:config @pivot-data) @pivot-data) group (get ordered-row pivot-grouping)] (if (and (contains? @pivot-data :config) (public-settings/enable-pivoted-exports)) ;; if we're processing a pivot result, we don't write it out yet, just aggregate it ;; so that we can post process the data in finish! (when (= qp.pivot.postprocess/NON_PIVOT_ROW_GROUP (int group)) (swap! pivot-data (fn [pivot-data] (qp.pivot.postprocess/add-row pivot-data ordered-row)))) (if group (when (= qp.pivot.postprocess/NON_PIVOT_ROW_GROUP (int group)) (let [formatted-row (->> (perf/mapv (fn [formatter r] (formatter (common/format-value r))) @ordered-formatters ordered-row) (m/remove-nth pivot-grouping))] (write-csv writer [formatted-row]) (.flush writer))) (let [formatted-row (perf/mapv (fn [formatter r] (formatter (common/format-value r))) @ordered-formatters ordered-row)] (write-csv writer [formatted-row]) (.flush writer)))))) (finish! [_ _] ;; TODO -- not sure we need to flush both (when (and (contains? @pivot-data :config) (public-settings/enable-pivoted-exports)) (doseq [xf-row (qp.pivot.postprocess/build-pivot-output @pivot-data @ordered-formatters)] (write-csv writer [xf-row]))) (.flush writer) (.flush os) (.close writer))))) | |