Middleware that returns cached results for queries when applicable. If query caching is enabled, cache strategy has been passed and it's not a The default backend is | (ns metabase.query-processor.middleware.cache (:require [java-time.api :as t] [medley.core :as m] [metabase.config :as config] [metabase.lib.query :as lib.query] [metabase.public-settings :as public-settings] [metabase.query-processor.middleware.cache-backend.db :as backend.db] [metabase.query-processor.middleware.cache-backend.interface :as i] [metabase.query-processor.middleware.cache.impl :as impl] [metabase.query-processor.pipeline :as qp.pipeline] [metabase.query-processor.schema :as qp.schema] [metabase.query-processor.util :as qp.util] [metabase.util :as u] [metabase.util.log :as log] [metabase.util.malli :as mu]) (:import (org.eclipse.jetty.io EofException))) |
(set! *warn-on-reflection* true) | |
(comment backend.db/keep-me) | |
Current serialization format version. Basically [initial-metadata row-1 row-2 ... row-n final-metadata] | (def ^:private cache-version 3) |
Current cache backend. Dynamically rebindable primary for test purposes. | (def ^:dynamic *backend* (i/cache-backend (config/config-kw :mb-qp-cache-backend))) |
------------------------------------------------------ Save ------------------------------------------------------ | |
(defn- purge! [backend]
(try
(log/tracef "Purging cache entries older than %s" (u/format-seconds (public-settings/query-caching-max-ttl)))
(i/purge-old-entries! backend (public-settings/query-caching-max-ttl))
(log/trace "Successfully purged old cache entries.")
:done
(catch Throwable e
(log/errorf e "Error purging old cache entries: %s" (ex-message e))))) | |
The | (def ^:private ^:dynamic *in-fn* nil) |
Add | (defn- add-object-to-cache!
[object]
(when *in-fn*
(*in-fn* (cond-> object
(map? object) (m/update-existing :json_query lib.query/serializable))))) |
The | (def ^:private ^:dynamic *result-fn* nil) |
(defn- serialized-bytes []
(when *result-fn*
(*result-fn*))) | |
Save the final results of a query. | (defn- cache-results!
[query-hash]
(log/infof "Caching results for next time for query with hash %s. %s"
(pr-str (i/short-hex-hash query-hash)) (u/emoji "💾"))
(try
(let [bytez (serialized-bytes)]
(if-not (instance? (Class/forName "[B") bytez)
(log/errorf "Cannot cache results: expected byte array, got %s" (class bytez))
(do
(log/trace "Got serialized bytes; saving to cache backend")
(i/save-results! *backend* query-hash bytez)
(log/debug "Successfully cached results for query.")
(purge! *backend*))))
:done
(catch Throwable e
(if (= (:type (ex-data e)) ::impl/max-bytes)
(log/debugf e "Not caching results: results are larger than %s KB" (public-settings/query-caching-max-kb))
(log/errorf e "Error saving query results to cache: %s" (ex-message e)))))) |
(defn- save-results-xform [start-time-ns metadata query-hash strategy rf]
(let [has-rows? (volatile! false)]
(add-object-to-cache! (assoc metadata
:cache-version cache-version
:last-ran (t/zoned-date-time)))
(fn
([] (rf))
([result]
(add-object-to-cache! (if (map? result)
(m/dissoc-in result [:data :rows])
{}))
(let [duration-ms (/ (- (System/nanoTime) start-time-ns) 1e6)
min-duration-ms (:min-duration-ms strategy 0)
eligible? (and @has-rows?
(> duration-ms min-duration-ms))]
(log/infof "Query %s took %s to run; minimum for cache eligibility is %s; %s"
(i/short-hex-hash query-hash)
(u/format-milliseconds duration-ms)
(u/format-milliseconds min-duration-ms)
(if eligible? "eligible" "not eligible"))
(when eligible?
(cache-results! query-hash))
(rf (cond-> result
(map? result) (update :cache/details assoc :hash query-hash :stored (boolean eligible?))))))
([acc row]
(add-object-to-cache! row)
(vreset! has-rows? true)
(rf acc row))))) | |
----------------------------------------------------- Fetch ------------------------------------------------------ | |
Reducing function for cached results. Merges the final object in the cached results, the | (defn- cached-results-rff
[rff query-hash]
(fn [{:keys [last-ran], :as metadata}]
(let [metadata (dissoc metadata :last-ran :cache-version)
rf (rff metadata)
final-metadata (volatile! nil)]
(fn
([]
(rf))
([result]
(let [normal-format? (and (map? (unreduced result))
(seq (get-in (unreduced result) [:data :cols])))
result* (-> (if normal-format?
(m/deep-merge @final-metadata (unreduced result))
(unreduced result))
(assoc :cache/details {:hash query-hash :cached true :updated_at last-ran}))]
(rf (cond-> result*
(reduced? result) reduced))))
([acc row]
(if (map? row)
(vreset! final-metadata row)
(rf acc row))))))) |
(mu/defn- maybe-reduce-cached-results :- [:tuple
#_status
[:enum ::ok ::miss ::canceled]
#_result
:any]
"Reduces cached results if there is a hit. Otherwise, returns `::miss` directly."
[ignore-cache? query-hash strategy rff]
(try
(or (when-not ignore-cache?
(log/debugf "Looking for cached results for query with hash '%s' satisfying %s"
(i/short-hex-hash query-hash) (pr-str strategy))
(i/with-cached-results *backend* query-hash strategy [is]
(if is
(impl/with-reducible-deserialized-results [[metadata reducible-rows] is]
(log/debugf "Found cached results for hash '%s'. Version: %s"
(i/short-hex-hash query-hash) (pr-str (:cache-version metadata)))
(when (and (= (:cache-version metadata) cache-version)
reducible-rows)
(log/trace "Reducing cached rows...")
(let [result (qp.pipeline/*reduce* (cached-results-rff rff query-hash) metadata reducible-rows)]
(log/trace "All cached rows reduced")
[::ok result])))
(log/debugf "Not found cached results for hash '%s'" (i/short-hex-hash query-hash)))))
[::miss nil])
(catch EofException _
(log/debug "Request is closed; no one to return cached results to")
[::canceled nil])
(catch Throwable e
(log/errorf e "Error attempting to fetch cached results for query with hash %s: %s"
(i/short-hex-hash query-hash)
(ex-message e))
[::miss nil]))) | |
--------------------------------------------------- Middleware --------------------------------------------------- | |
(mu/defn- run-query-with-cache :- :some
[qp {:keys [cache-strategy middleware], :as query} :- ::qp.schema/query
rff :- ::qp.schema/rff]
;; Query will already have `info.hash` if it's a userland query. It's not the same hash, because this is calculated
;; after normalization, instead of before. This is necessary to make caching work properly with sandboxed users, see
;; #14388.
(let [query-hash (qp.util/query-hash query)
[status result] (maybe-reduce-cached-results (:ignore-cached-results? middleware) query-hash cache-strategy rff)]
(case status
::ok
result
::canceled
::canceled
::miss
(let [start-time-ns (System/nanoTime)
orig-reduce qp.pipeline/*reduce*]
(log/trace "Running query and saving cached results (if eligible)...")
(binding [qp.pipeline/*reduce* (fn reduce'
[rff metadata rows]
{:post [(some? %)]}
(impl/do-with-serialization
(fn [in-fn result-fn]
(binding [*in-fn* in-fn
*result-fn* result-fn]
(orig-reduce rff metadata rows)))))]
(qp query
(fn [metadata]
(save-results-xform start-time-ns metadata query-hash cache-strategy (rff metadata))))))))) | |
(defn- is-cacheable? {:arglists '([query])} [{:keys [cache-strategy]}]
(and (public-settings/enable-query-caching)
(some? cache-strategy)
(not= (:type cache-strategy) :nocache))) | |
Middleware for caching results of a query if applicable. In order for a query to be eligible for caching:
| (defn maybe-return-cached-results
[qp]
(fn maybe-return-cached-results* [query rff]
(let [cacheable? (is-cacheable? query)]
(log/tracef "Query is cacheable? %s" (boolean cacheable?))
(if cacheable?
(run-query-with-cache qp query rff)
(qp query rff))))) |