Impls for JSON-based QP streaming response types. | (ns metabase.query-processor.streaming.json (:require [java-time.api :as t] [medley.core :as m] [metabase.formatter :as formatter] [metabase.models.visualization-settings :as mb.viz] [metabase.query-processor.pivot.postprocess :as qp.pivot.postprocess] [metabase.query-processor.streaming.common :as common] [metabase.query-processor.streaming.interface :as qp.si] [metabase.util.date-2 :as u.date] [metabase.util.json :as json]) (:import (com.fasterxml.jackson.core JsonGenerator) (java.io BufferedWriter OutputStream OutputStreamWriter) (java.nio.charset StandardCharsets))) |
(set! *warn-on-reflection* true) | |
(defmethod qp.si/stream-options :json
([_]
(qp.si/stream-options :json "query_result"))
([_ filename-prefix]
{:content-type "application/json; charset=utf-8"
:status 200
:headers {"Content-Disposition" (format "attachment; filename=\"%s_%s.json\""
(or filename-prefix "query_result")
(u.date/format (t/zoned-date-time)))}})) | |
(defmethod qp.si/streaming-results-writer :json
[_ ^OutputStream os]
(let [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))
col-names (volatile! nil)
ordered-formatters (volatile! nil)
;; if we're processing results from a pivot query, there will be a column 'pivot-grouping' that we don't want to include
;; in the final results, so we get the idx into the row in order to remove it
pivot-grouping-idx (volatile! nil)]
(reify qp.si/StreamingResultsWriter
(begin! [_ {{:keys [ordered-cols results_timezone format-rows?]
:or {format-rows? true}} :data} viz-settings]
(let [cols (common/column-titles ordered-cols (::mb.viz/column-settings viz-settings) format-rows?)
pivot-grouping (qp.pivot.postprocess/pivot-grouping-key cols)]
(when pivot-grouping (vreset! pivot-grouping-idx pivot-grouping))
(let [names (cond->> cols
pivot-grouping (m/remove-nth pivot-grouping))]
(vreset! col-names names))
(vreset! ordered-formatters
(mapv #(formatter/create-formatter results_timezone % viz-settings format-rows?) ordered-cols))
(.write writer "[\n")))
(write-row! [_ row row-num _ {:keys [output-order]}]
(let [ordered-row (vec
(if output-order
(let [row-v (into [] row)]
(for [i output-order] (row-v i)))
row))
pivot-grouping-key @pivot-grouping-idx
group (get ordered-row pivot-grouping-key)
cleaned-row (cond->> ordered-row
pivot-grouping-key (m/remove-nth pivot-grouping-key))]
;; when a pivot-grouping col exists, we check its group number. When it's zero,
;; we keep it, otherwise don't include it in the results as it's a row representing a subtotal of some kind
(when (or (not group)
(= qp.pivot.postprocess/NON_PIVOT_ROW_GROUP (int group)))
(when-not (zero? row-num)
(.write writer ",\n"))
(json/encode-to
(zipmap
@col-names
(map (fn [formatter r]
;; NOTE: Stringification of formatted values ensures consistency with what is shown in the
;; Metabase UI, especially numbers (e.g. percents, currencies, and rounding). However, this
;; does mean that all JSON values are strings. Any other strategy requires some level of
;; inference to know if we should or should not parse a string (or not stringify an object).
(let [res (formatter (common/format-value r))]
(if-some [num-str (:num-str res)]
num-str
res)))
@ordered-formatters cleaned-row))
writer {})
(.flush writer))))
(finish! [_ _]
(.write writer "\n]")
(.flush writer)
(.flush os)
(.close writer))))) | |
(defmethod qp.si/stream-options :api
([_] (qp.si/stream-options :api nil))
([_ _] {:content-type "application/json; charset=utf-8"})) | |
(defn- generate-map-contents [^JsonGenerator jgen maplike]
(reduce (fn [^JsonGenerator jg kv]
(let [k (key kv)
v (val kv)]
(.writeFieldName jg (if (keyword? k)
(subs (str k) 1)
(str k)))
(json/generate jg v json/default-date-format nil nil)
jg))
jgen maplike)) | |
(defn- make-generator ^JsonGenerator [^OutputStream os]
(-> os
(OutputStreamWriter. StandardCharsets/UTF_8)
(BufferedWriter.)
json/create-generator)) | |
(defmethod qp.si/streaming-results-writer :api
[_ ^OutputStream os]
;; Cheshire supports a custom encoding API that we would like to use for streaming, say by generating JSON on
;; `{:data {:rows (a/chan), ...}}` and having a custom encoder for the channel.
;; But there's a problem: the `metadata that arrives in `finish!` adds new keys to both the `:data` and outer maps,
;; and there's no way to handle that mutation with a custom encoder.
;; So instead we would like to use Cheshire's streaming generator API. But it is very eager to `(.flush writer)`.
;; That results in sending 3 + N packets over the wire, where N is the number of rows in the response! That's a lot
;; of TCP overhead, which causes download time slowdowns, especially with complex load balancing etc. See #34795.
;; And so that leads to this low-level code that mixes calls to the underlying Jackson Java library with calls to
;; Cheshire's streaming API for handling Clojure data. It duplicates some Cheshire logic for generating maps, since
;; Cheshire doesn't have an API which generate in key-value pairs of a map without including the `{}`s.
(let [jgen (make-generator os)]
(reify qp.si/StreamingResultsWriter
(begin! [_ _ _]
(doto jgen
(.writeStartObject)
(.writeFieldName "data")
(.writeStartObject)
(.writeFieldName "rows")
(.writeStartArray)))
(write-row! [_ row _ _ _]
(json/generate jgen row json/default-date-format nil nil))
(finish! [_ {:keys [data], :as metadata}]
(.writeEndArray jgen)
;; write any remaining keys in data
(when-not (empty? data)
(generate-map-contents jgen data))
;; close data
(.writeEndObject jgen)
;; write any remaining top-level keys
(when-let [other-metadata (not-empty (dissoc metadata :data))]
(generate-map-contents jgen other-metadata))
;; close top-level map
(doto jgen
(.writeEndObject)
(.flush)
(.close)))))) | |