(ns metabase.query-processor.middleware.cache.impl
(:require
[flatland.ordered.map :as ordered-map]
[metabase.public-settings :as public-settings]
[metabase.util :as u]
[metabase.util.i18n :refer [trs]]
[metabase.util.log :as log]
[taoensso.nippy :as nippy])
(:import
(java.io BufferedInputStream BufferedOutputStream ByteArrayOutputStream DataInputStream DataOutputStream
EOFException FilterOutputStream InputStream OutputStream)
(java.util.zip GZIPInputStream GZIPOutputStream))) | |
(set! *warn-on-reflection* true) | |
(defn- max-bytes-output-stream ^OutputStream
[max-bytes ^OutputStream os]
(let [byte-count (atom 0)
check-total (fn [current-total]
(when (> current-total max-bytes)
(log/info "Results are too large to cache." (u/emoji "😫"))
(throw (ex-info (trs "Results are too large to cache.") {:type ::max-bytes}))))]
(proxy [FilterOutputStream] [os]
(write
([x]
(if (int? x)
(do
(check-total (swap! byte-count inc))
(.write os ^int x))
(do
(check-total (swap! byte-count + (alength ^bytes x)))
(.write os ^bytes x))))
([^bytes ba ^Integer off ^Integer len]
(check-total (swap! byte-count + len))
(.write os ba off len)))))) | |
flatland.ordered.map.OrderedMap gets encoded and decoded incorrectly, for some reason. See #25915 | |
(nippy/extend-freeze flatland.ordered.map.OrderedMap :flatland/ordered-map [x data-output] (nippy/freeze-to-out! data-output (vec x))) | |
(nippy/extend-thaw :flatland/ordered-map [data-input] (ordered-map/ordered-map-reader-clj (nippy/thaw-from-in! data-input))) | |
(defn- freeze! [^OutputStream os obj] (log/tracef "Freezing %s" (pr-str obj)) (nippy/freeze-to-out! os obj) (.flush os)) | |
Create output streams for serializing QP results and invoke (f in-fn result-fn)
When you have serialized all objects, call (do-with-serialization (fn [in result] (doseq [obj objects] (in obj)) (result))) | (defn do-with-serialization
([f]
(do-with-serialization f {:max-bytes (* (public-settings/query-caching-max-kb) 1024)}))
([f {:keys [max-bytes]}]
(with-open [bos (ByteArrayOutputStream.)]
(let [os (-> (max-bytes-output-stream max-bytes bos)
BufferedOutputStream.
(GZIPOutputStream. true)
DataOutputStream.)
error (atom nil)]
(try
(f (fn in* [obj]
(when-not @error
(try
(freeze! os obj)
(catch Throwable e
(log/trace e "Caught error when freezing object")
(reset! error e))))
nil)
(fn result* []
(when @error
(throw @error))
(log/trace "Getting result byte array")
(.toByteArray bos)))
;; this is done manually instead of `with-open` because it might throw an Exception when we close it if it's
;; past the byte limit; that's fine and we can ignore it
(finally
(u/ignore-exceptions (.close os)))))))) |
(defn- thaw!
[^InputStream is]
(try
(nippy/thaw-from-in! is)
(catch EOFException _e
::eof))) | |
(defn- reducible-rows
[^InputStream is]
(reify clojure.lang.IReduceInit
(reduce [_ rf init]
(loop [acc init]
;; NORMALLY we would be checking whether `acc` is `reduced?` here and stop reading from the database if it was,
;; but since we currently store the final metadata at the very end of the database entry as a special pseudo-row
;; we actually have to keep reading the whole thing until we get to that last result. Don't worry, the reducing
;; functions can just throw out everything we don't need. See
;; [[metabase.query-processor.middleware.cache/cache-version]] for a description of our caching format.
(let [row (thaw! is)]
(if (= row ::eof)
acc
(recur (rf acc row)))))))) | |
Impl for [[with-reducible-deserialized-results]]. | (defn do-reducible-deserialized-results
[^InputStream is f]
(with-open [is (DataInputStream. (GZIPInputStream. (BufferedInputStream. is)))]
(let [metadata (thaw! is)]
(if (= metadata ::eof)
(f nil)
(f [metadata (reducible-rows is)]))))) |
Fetches metadata and reducible rows from an InputStream (with-reducible-deserialized-results [[metadata reducible-rows] is] ...)
| (defmacro with-reducible-deserialized-results [[metadata-rows-binding is] & body] `(do-reducible-deserialized-results ~is (fn [~metadata-rows-binding] ~@body))) |