(ns metabase-enterprise.task.cache (:require [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.cron :as cron] [clojurewerkz.quartzite.triggers :as triggers] [java-time.api :as t] [metabase-enterprise.cache.strategies :as strategies] [metabase.premium-features.core :as premium-features :refer [defenterprise]] [metabase.query-processor :as qp] [metabase.task :as task] [metabase.util.log :as log] [toucan2.core :as t2]) (:import (java.util.concurrent Callable ExecutorService ThreadPoolExecutor TimeUnit SynchronousQueue) (org.apache.commons.lang3.concurrent BasicThreadFactory$Builder) (org.quartz.spi MutableTrigger))) | |
(set! *warn-on-reflection* true) | |
------------------------------------------- Preemptive Caching ---------------------------------------------------- | |
(defonce ^:private pool
(delay
(ThreadPoolExecutor.
0 ;; core pool size
10 ;; max pool size (upper limit)
100 TimeUnit/SECONDS ;; keep-alive time for idle threads
(SynchronousQueue.) ;; direct handoff
(.build
(doto (BasicThreadFactory$Builder.)
(.namingPattern "preemptive-caching-thread-pool-%d")))))) | |
Number of query variations (e.g. with different parameters) to run for a single cached card. | (def ^:dynamic *parameterized-queries-to-rerun-per-card* 10) |
Should cache refresh jobs be run asynchronously? Defaults to true, can be set to false for testing. | (def ^:dynamic *run-cache-refresh-async* true) |
Submits a job to the thread pool to run a sequence of queries for a single card or dashboard being refreshed. This is best-effort; we try each query once and discard failures. | (defn- submit-refresh-task-async! [refresh-task-fn] (.submit ^ExecutorService @pool ^Callable refresh-task-fn)) |
Returns a reducing function that discards result rows | (defn discarding-rff
[metadata]
(fn discarding-rf
([] {:data metadata})
([result] result)
([result _row] result))) |
Returns a function that serially reruns queries based on | (defn- refresh-task
[refresh-defs]
(fn []
(let [card-ids (into #{} (map :card-id refresh-defs))
cards-by-id (t2/select-pk->fn identity :model/Card :id [:in card-ids])]
(doseq [{:keys [card-id dashboard-id queries]} refresh-defs]
;; Annotate the query with its cache strategy in the format expected by the QP
(let [cache-strategy (strategies/cache-strategy (get cards-by-id card-id) dashboard-id)]
(doseq [query queries]
(try
(qp/process-query
(qp/userland-query
(-> query
(assoc-in [:middleware :ignore-cached-results?] true)
(assoc :cache-strategy cache-strategy))
{:executed-by nil
:context :cache-refresh
:card-id card-id
:dashboard-id dashboard-id})
discarding-rff)
(catch Exception e
(log/debugf "Error refreshing cache for card %s: %s" card-id (ex-message e)))))))))) |
(defn- duration-ago
[{:keys [duration unit]}]
(t/minus (t/offset-date-time)
(t/duration duration (keyword unit)))) | |
HoneySQL query for selecting query definitions that should be rerun, given a list of :duration cache configs. Executed twice, once to find parameterized queries and once to find non-parameterized queries. | (defn- duration-queries-to-rerun-honeysql
[cache-configs parameterized?]
(let [queries
(for [{:keys [model model_id config]} cache-configs]
(let [rerun-cutoff (duration-ago config)]
{:nest
{:select [[:q.query :query]
[:qc.query_hash :cache-hash]
[:qe.card_id :card-id]
[:qe.dashboard_id :dashboard-id]
[[:count :q.query_hash] :count]]
:from [[(t2/table-name :model/Query) :q]]
:join [[(t2/table-name :model/QueryExecution) :qe] [:= :qe.hash :q.query_hash]
[(t2/table-name :model/QueryCache) :qc] [:= :qc.query_hash :qe.cache_hash]]
:where [:and
(case model
"question" [:= :qe.card_id model_id]
"dashboard" [:= :qe.dashboard_id model_id])
[:<= :qc.updated_at rerun-cutoff]
;; This is a safety check so that we don't scan all of query_execution -- if a query
;; has not been excuted at all in the last month (including cache hits) we won't bother
;; refreshing it again.
[:>= :qe.started_at (duration-ago {:duration 30 :unit "days"})]
[:= :qe.error nil]
[:= :qe.is_sandboxed false]
(if parameterized?
[:and
[:= :qe.parameterized true]
;; Only rerun a parameterized query if it's had a cache hit within the last caching window
[:= :qe.cache_hit true]
;; Don't factor the last cache refresh into whether we should rerun a parameterized query
[:not= :qe.context (name :cache-refresh)]]
[:= :qe.parameterized false])]
:group-by [:q.query_hash :q.query :qc.query_hash :qe.card_id :qe.dashboard_id]}}))]
{:select [:u.query :u.cache-hash :u.card-id :u.dashboard-id :u.count]
:from [[{:union queries} :u]]})) |
Given a list of parameterized query definitions from the Query table with additional :count and :card-id keys, selects the 10 most common queries for each card ID that we should rerun. | (defn- select-parameterized-queries
[parameterized-queries]
(apply concat
(-> (group-by :card-id parameterized-queries)
(update-vals
(fn [queries]
(->> queries
(sort-by :count >)
(take *parameterized-queries-to-rerun-per-card*))))
vals))) |
(defn- duration-queries-to-rerun
[]
(let [cache-configs (t2/select :model/CacheConfig :strategy :duration :refresh_automatically true)]
(when (seq cache-configs)
(let [base-queries (t2/select :model/Query (duration-queries-to-rerun-honeysql cache-configs false))
parameterized-queries (t2/select :model/Query (duration-queries-to-rerun-honeysql cache-configs true))]
(concat base-queries (select-parameterized-queries parameterized-queries)))))) | |
Deletes any existing cache entries for queries that we are about to re-run, so that subsequent tasks don't also try to re-run them before the cache has been refreshed. | (defn- clear-caches-for-queries! [queries] (t2/delete! :model/QueryCache :query_hash [:in (map :cache-hash queries)])) |
Detects caches with strategy=duration that are eligible for refreshing, and returns a count of the refresh jobs that were generated (i.e. the number of different cards refreshed, with each card potentially having multiple queries). | (defn- maybe-refresh-duration-caches!
[]
(if-let [queries (seq (duration-queries-to-rerun))]
(let [refresh-defs (->> queries
(group-by (juxt :card-id :dashboard-id))
(map (fn [[[card-id dashboard-id] queries]]
{:card-id card-id
:dashboard-id dashboard-id
:queries (map :query queries)})))
task (refresh-task refresh-defs)]
(clear-caches-for-queries! queries)
(if *run-cache-refresh-async*
(submit-refresh-task-async! task)
(task))
(count refresh-defs))
0)) |
HoneySQL query for finding the the base query definition we should run for a card ID (i.e. the unparameterized query). | (defn- scheduled-base-query-to-rerun-honeysql
[card-id]
{:select [:q.query [:qe.card_id :card-id]]
:from [[(t2/table-name :model/Query) :q]]
:join [[(t2/table-name :model/QueryExecution) :qe] [:= :qe.hash :q.query_hash]]
:where [:and
[:= :qe.card_id card-id]
[:= :qe.parameterized false]
[:= :qe.error nil]
[:= :qe.is_sandboxed false]
;; Was the query executed at least once in the last month?
;; This is a safety check so that we don't scan all of query_execution -- if a query has not been excuted at
;; all in the last month (including cache hits) we won't bother refreshing it again.
[:>= :qe.started_at (duration-ago {:duration 30 :unit "days"})]]
:order-by [[:qe.started_at :desc]]
:limit 1}) |
(defn- scheduled-parameterized-queries-to-rerun-honeysql
[card-id rerun-cutoff]
{:select [:q.query [:qe.card_id :card-id]]
:from [[(t2/table-name :model/Query) :q]]
:join [[(t2/table-name :model/QueryExecution) :qe] [:= :qe.hash :q.query_hash]]
:where [:and
[:= :qe.card_id card-id]
[:>= :qe.started_at rerun-cutoff]
;; Don't factor the last cache refresh into whether we should rerun a parameterized query
[:not= :qe.context (name :cache-refresh)]
[:= :parameterized true]
[:= :qe.error nil]
[:= :qe.is_sandboxed false]]
:group-by [:q.query_hash :q.query :qe.card_id]
:order-by [[[:count :q.query_hash] :desc]
[[:min :qe.started_at] :asc]]
:limit *parameterized-queries-to-rerun-per-card*}) | |
Returns a list containing all of the parameterized query definitions that we should preemptively rerun for a given card that uses :schedule caching. | (defn- scheduled-queries-to-rerun
[card-id rerun-cutoff]
(let [base-query (t2/select-one :model/Query (scheduled-base-query-to-rerun-honeysql card-id))
parameterized-queries (t2/select :model/Query (scheduled-parameterized-queries-to-rerun-honeysql card-id rerun-cutoff))]
(->> (concat (when base-query [base-query])
parameterized-queries)
(map :query)
distinct))) |
Takes a schedule cache config and returns a sequence of card IDs to rerun. | (defn- schedule-cache-config->card-ids
[{:keys [model_id model]}]
(case model
"question" [model_id]
"dashboard" (let [dashboard (-> (t2/select-one :model/Dashboard :id model_id)
(t2/hydrate :dashcards))]
(map :card_id (:dashcards dashboard))))) |
Given a cache config with the :schedule strategy, preemptively rerun the query (and a fixed number of parameterized variants) so that fresh results are cached. | (defn- refresh-schedule-cache!
[{model :model
model-id :model_id
strategy :strategy
last-run-at :last_run_at
created-at :created_at
:as cache-config}]
(assert (= strategy :schedule))
(let [rerun-cutoff (or last-run-at created-at)
card-ids (schedule-cache-config->card-ids cache-config)
dashboard-id (when (= model "dashboard") model-id)
refresh-defs (distinct
(map
(fn [card-id]
{:dashboard-id dashboard-id
:card-id card-id
:queries (scheduled-queries-to-rerun card-id rerun-cutoff)})
card-ids))
task (refresh-task refresh-defs)]
(if *run-cache-refresh-async*
(submit-refresh-task-async! task)
(task)))) |
------------------------------------------- Cache invalidation task ------------------------------------------------ | |
Fetch whatever cache configs for a given | (defn- select-ready-to-run
[strategy]
(t2/select :model/CacheConfig
:strategy strategy
{:where [:or
[:= :next_run_at nil]
[:<= :next_run_at (t/offset-date-time)]]})) |
Calculate when a next run should happen based on a cron schedule | (defn- calc-next-run
[^String schedule now]
(let [^MutableTrigger cron (cron/finalize (cron/cron-schedule schedule))]
;; Synchronize cron runner to our app time, which may be mocked in tests
(.setStartTime cron (t/java-date))
(-> (.getFireTimeAfter cron (t/java-date now))
(t/offset-date-time (t/zone-offset))))) |
Update | (defenterprise refresh-cache-configs!
:feature :none
[]
(let [now (t/offset-date-time)
schedule-caches-to-invalidate (select-ready-to-run :schedule)
schedule-refresh-count
(reduce
(fn [refreshed-count {:keys [id config refresh_automatically] :as cache-config}]
(t2/update! :model/CacheConfig
{:id id}
{:next_run_at (calc-next-run (:schedule config) now)
:invalidated_at now})
(if (and (premium-features/enable-preemptive-caching?) refresh_automatically)
(do
(refresh-schedule-cache! cache-config)
(inc refreshed-count))
refreshed-count))
0
schedule-caches-to-invalidate)
duration-refresh-count
(if (premium-features/enable-preemptive-caching?)
(maybe-refresh-duration-caches!)
0)]
{:schedule-invalidated (count schedule-caches-to-invalidate)
:schedule-refreshed schedule-refresh-count
:duration-refreshed duration-refresh-count})) |
Refresh 'schedule' caches | (jobs/defjob ^{org.quartz.DisallowConcurrentExecution true
:doc }
Cache [_ctx]
(refresh-cache-configs!)) |
(def ^:private cache-job (jobs/build (jobs/with-description "Schedule Caches refresh task") (jobs/of-type Cache) (jobs/with-identity (jobs/key "metabase-enterprise.cache.job")) (jobs/store-durably))) | |
(def ^:private cache-trigger
(triggers/build
(triggers/with-identity (triggers/key "metabase-enterprise.cache.trigger"))
(triggers/start-now)
(triggers/with-schedule
(cron/cron-schedule "0 * * * * ? *")))) | |
Inits periodical task checking for cache expiration | (defenterprise init-cache-task! :feature :cache-granular-controls [] (task/schedule-task! cache-job cache-trigger)) |