(ns metabase.query-processor.middleware.cache.impl (:require [flatland.ordered.map :as ordered-map] [metabase.public-settings :as public-settings] [metabase.util :as u] [metabase.util.i18n :refer [trs]] [metabase.util.log :as log] [taoensso.nippy :as nippy]) (:import (java.io BufferedInputStream BufferedOutputStream ByteArrayOutputStream DataInputStream DataOutputStream EOFException FilterOutputStream InputStream OutputStream) (java.util.zip GZIPInputStream GZIPOutputStream))) | |
(set! *warn-on-reflection* true) | |
(defn- max-bytes-output-stream ^OutputStream [max-bytes ^OutputStream os] (let [byte-count (atom 0) check-total (fn [current-total] (when (> current-total max-bytes) (log/info "Results are too large to cache." (u/emoji "😫")) (throw (ex-info (trs "Results are too large to cache.") {:type ::max-bytes}))))] (proxy [FilterOutputStream] [os] (write ([x] (if (int? x) (do (check-total (swap! byte-count inc)) (.write os ^int x)) (do (check-total (swap! byte-count + (alength ^bytes x))) (.write os ^bytes x)))) ([^bytes ba ^Integer off ^Integer len] (check-total (swap! byte-count + len)) (.write os ba off len)))))) | |
flatland.ordered.map.OrderedMap gets encoded and decoded incorrectly, for some reason. See #25915 | |
(nippy/extend-freeze flatland.ordered.map.OrderedMap :flatland/ordered-map [x data-output] (nippy/freeze-to-out! data-output (vec x))) | |
(nippy/extend-thaw :flatland/ordered-map [data-input] (ordered-map/ordered-map-reader-clj (nippy/thaw-from-in! data-input))) | |
(defn- freeze! [^OutputStream os obj] (log/tracef "Freezing %s" (pr-str obj)) (nippy/freeze-to-out! os obj) (.flush os)) | |
Create output streams for serializing QP results and invoke (f in-fn result-fn)
When you have serialized all objects, call (do-with-serialization (fn [in result] (doseq [obj objects] (in obj)) (result))) | (defn do-with-serialization ([f] (do-with-serialization f {:max-bytes (* (public-settings/query-caching-max-kb) 1024)})) ([f {:keys [max-bytes]}] (with-open [bos (ByteArrayOutputStream.)] (let [os (-> (max-bytes-output-stream max-bytes bos) BufferedOutputStream. (GZIPOutputStream. true) DataOutputStream.) error (atom nil)] (try (f (fn in* [obj] (when-not @error (try (freeze! os obj) (catch Throwable e (log/trace e "Caught error when freezing object") (reset! error e)))) nil) (fn result* [] (when @error (throw @error)) (log/trace "Getting result byte array") (.toByteArray bos))) ;; this is done manually instead of `with-open` because it might throw an Exception when we close it if it's ;; past the byte limit; that's fine and we can ignore it (finally (u/ignore-exceptions (.close os)))))))) |
(defn- thaw! [^InputStream is] (try (nippy/thaw-from-in! is) (catch EOFException _e ::eof))) | |
(defn- reducible-rows [^InputStream is] (reify clojure.lang.IReduceInit (reduce [_ rf init] (loop [acc init] ;; NORMALLY we would be checking whether `acc` is `reduced?` here and stop reading from the database if it was, ;; but since we currently store the final metadata at the very end of the database entry as a special pseudo-row ;; we actually have to keep reading the whole thing until we get to that last result. Don't worry, the reducing ;; functions can just throw out everything we don't need. See ;; [[metabase.query-processor.middleware.cache/cache-version]] for a description of our caching format. (let [row (thaw! is)] (if (= row ::eof) acc (recur (rf acc row)))))))) | |
Impl for [[with-reducible-deserialized-results]]. | (defn do-reducible-deserialized-results [^InputStream is f] (with-open [is (DataInputStream. (GZIPInputStream. (BufferedInputStream. is)))] (let [metadata (thaw! is)] (if (= metadata ::eof) (f nil) (f [metadata (reducible-rows is)]))))) |
Fetches metadata and reducible rows from an InputStream (with-reducible-deserialized-results [[metadata reducible-rows] is] ...)
| (defmacro with-reducible-deserialized-results [[metadata-rows-binding is] & body] `(do-reducible-deserialized-results ~is (fn [~metadata-rows-binding] ~@body))) |