(ns metabase-enterprise.task.cache (:require [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.cron :as cron] [clojurewerkz.quartzite.triggers :as triggers] [java-time.api :as t] [metabase.public-settings.premium-features :as premium-features :refer [defenterprise]] [metabase.query-processor :as qp] [metabase.task :as task] [metabase.util.log :as log] [toucan2.core :as t2]) (:import (java.util.concurrent Callable ExecutorService ThreadPoolExecutor TimeUnit SynchronousQueue) (org.apache.commons.lang3.concurrent BasicThreadFactory$Builder) (org.quartz.spi MutableTrigger))) | |
(set! *warn-on-reflection* true) | |
------------------------------------------- Preemptive Caching ---------------------------------------------------- | |
(defonce ^:private pool (delay (ThreadPoolExecutor. 0 ;; core pool size 10 ;; max pool size (upper limit) 100 TimeUnit/SECONDS ;; keep-alive time for idle threads (SynchronousQueue.) ;; direct handoff (.build (doto (BasicThreadFactory$Builder.) (.namingPattern "preemptive-caching-thread-pool-%d")))))) | |
Number of query variations (e.g. with different parameters) to run for a single cached card. | (def ^:dynamic *parameterized-queries-to-rerun-per-card* 10) |
Should cache refresh jobs be run asynchronously? Defaults to true, can be set to false for testing. | (def ^:dynamic *run-cache-refresh-async* true) |
Submits a job to the thread pool to run a sequence of queries for a single card or dashboard being refreshed. This is best-effort; we try each query once and discard failures. | (defn- submit-refresh-task-async! [refresh-task-fn] (.submit ^ExecutorService @pool ^Callable refresh-task-fn)) |
Returns a function that serially reruns queries based on | (defn- refresh-task [refresh-defs] (fn [] (doseq [{:keys [card-id dashboard-id queries]} refresh-defs query queries] (try (qp/process-query (qp/userland-query (assoc-in query [:middleware :ignore-cached-results?] true) {:executed-by nil :context :cache-refresh :card-id card-id :dashboard-id dashboard-id})) (catch Exception e (log/debugf "Error refreshing cache for card %s: %s" card-id (ex-message e))))))) |
(defn- duration-ago [{:keys [duration unit]}] (t/minus (t/offset-date-time) (t/duration duration (keyword unit)))) | |
HoneySQL query for selecting query definitions that should be rerun, given a list of :duration cache configs. Executed twice, once to find parameterized queries and once to find non-parameterized queries. | (defn- duration-queries-to-rerun-honeysql [configs->card-ids parameterized?] (let [queries (for [[{:keys [config]} card-ids] configs->card-ids] (let [rerun-cutoff (duration-ago config)] {:nest {:select [[:q.query :query] [:qe.card_id :card-id] [[:count :q.query_hash] :count]] :from [[(t2/table-name :model/Query) :q]] :join [[(t2/table-name :model/QueryExecution) :qe] [:= :qe.hash :q.query_hash] [(t2/table-name :model/QueryCache) :qc] [:= :qc.query_hash :qe.cache_hash]] :where [:and [:in :qe.card_id (set card-ids)] [:<= :qc.updated_at rerun-cutoff] ;; This is a safety check so that we don't scan all of query_execution -- if a query ;; has not been excuted at all in the last month (including cache hits) we won't bother ;; refreshing it again. [:>= :qe.started_at (duration-ago {:duration 30 :unit "days"})] [:= :qe.error nil] [:= :qe.is_sandboxed false] (if parameterized? [:and [:= :qe.parameterized true] ;; Only rerun a parameterized query if it's had a cache hit within the last caching window [:= :qe.cache_hit true] ;; Don't factor the last cache refresh into whether we should rerun a parameterized query [:not= :qe.context (name :cache-refresh)]] [:= :qe.parameterized false])] :group-by [:q.query_hash :q.query :qe.card_id]}}))] {:select [:u.query :u.card-id :u.count] :from [[{:union queries} :u]]})) |
Takes a list of cache configs and returns a map from cache configs to relevant card IDs. For cache configs defined on a dashboard, this includes all cards on the dashboard. | (defn- cache-configs->card-ids [cache-configs] (let [dashboard-ids (->> (filter #(= (:model %) "dashboard") cache-configs) (map :model_id)) dashboards (when (seq dashboard-ids) (-> (t2/select :model/Dashboard {:where [:in :id dashboard-ids]}) (t2/hydrate :dashcards))) dashboard-id->card-ids (into {} (map (fn [dashboard] [(:id dashboard) (mapv :card_id (:dashcards dashboard))]) dashboards)) config-to-card-ids (map (fn [config] (let [model (:model config) model-id (:model_id config)] [config (cond (= model "dashboard") (get dashboard-id->card-ids model-id []) (= model "question") [model-id] :else [])])) cache-configs)] (into {} config-to-card-ids))) |
Given a list of parameterized query definitions from the Query table with additional :count and :card-id keys, selects the 10 most common queries for each card ID that we should rerun. | (defn- select-parameterized-queries [parameterized-queries] (apply concat (-> (group-by :card-id parameterized-queries) (update-vals (fn [queries] (->> queries (sort-by :count >) (take *parameterized-queries-to-rerun-per-card*)))) vals))) |
(defn- duration-queries-to-rerun [] (let [cache-configs (t2/select :model/CacheConfig :strategy :duration :refresh_automatically true)] (when (seq cache-configs) (let [configs->card-ids (cache-configs->card-ids cache-configs) base-queries (t2/select :model/Query (duration-queries-to-rerun-honeysql configs->card-ids false)) parameterized-queries (t2/select :model/Query (duration-queries-to-rerun-honeysql configs->card-ids true))] (concat base-queries (select-parameterized-queries parameterized-queries)))))) | |
(defn- maybe-refresh-duration-caches! [] (when-let [queries (seq (duration-queries-to-rerun))] (let [refresh-defs (->> queries (group-by :card-id) (map (fn [[card-id queries]] {:card-id card-id :queries (map :query queries)}))) task (refresh-task refresh-defs)] (if *run-cache-refresh-async* (submit-refresh-task-async! task) (task))))) | |
HoneySQL query for finding the the base query definition we should run for a card ID (i.e. the unparameterized query). | (defn- scheduled-base-query-to-rerun-honeysql [card-id] {:select [:q.query [:qe.card_id :card-id]] :from [[(t2/table-name :model/Query) :q]] :join [[(t2/table-name :model/QueryExecution) :qe] [:= :qe.hash :q.query_hash]] :where [:and [:= :qe.card_id card-id] [:= :qe.parameterized false] [:= :qe.error nil] [:= :qe.is_sandboxed false] ;; Was the query executed at least once in the last month? ;; This is a safety check so that we don't scan all of query_execution -- if a query has not been excuted at ;; all in the last month (including cache hits) we won't bother refreshing it again. [:>= :qe.started_at (duration-ago {:duration 30 :unit "days"})]] :order-by [[:qe.started_at :desc]] :limit 1}) |
(defn- scheduled-parameterized-queries-to-rerun-honeysql [card-id rerun-cutoff] {:select [:q.query [:qe.card_id :card-id]] :from [[(t2/table-name :model/Query) :q]] :join [[(t2/table-name :model/QueryExecution) :qe] [:= :qe.hash :q.query_hash]] :where [:and [:= :qe.card_id card-id] [:>= :qe.started_at rerun-cutoff] ;; Don't factor the last cache refresh into whether we should rerun a parameterized query [:not= :qe.context (name :cache-refresh)] [:= :parameterized true] [:= :qe.error nil] [:= :qe.is_sandboxed false]] :group-by [:q.query_hash :q.query :qe.card_id] :order-by [[[:count :q.query_hash] :desc] [[:min :qe.started_at] :asc]] :limit *parameterized-queries-to-rerun-per-card*}) | |
Returns a list containing all of the parameterized query definitions that we should preemptively rerun for a given card that uses :schedule caching. | (defn- scheduled-queries-to-rerun [card-id rerun-cutoff] (let [base-query (t2/select-one :model/Query (scheduled-base-query-to-rerun-honeysql card-id)) parameterized-queries (t2/select :model/Query (scheduled-parameterized-queries-to-rerun-honeysql card-id rerun-cutoff))] (->> (concat (when base-query [base-query]) parameterized-queries) (map :query) distinct))) |
Given a cache config with the :schedule strategy, preemptively rerun the query (and a fixed number of parameterized variants) so that fresh results are cached. | (defn- refresh-schedule-cache! [{model :model model-id :model_id strategy :strategy last-run-at :last_run_at created-at :created_at :as cache-config}] (assert (= strategy :schedule)) (let [rerun-cutoff (or last-run-at created-at) card-ids (apply concat (vals (cache-configs->card-ids [cache-config]))) dashboard-id (when (= model "dashboard") model-id) refresh-defs (distinct (map (fn [card-id] {:dashboard-id dashboard-id :card-id card-id :queries (scheduled-queries-to-rerun card-id rerun-cutoff)}) card-ids)) task (refresh-task refresh-defs)] (if *run-cache-refresh-async* (submit-refresh-task-async! task) (task)))) |
------------------------------------------- Cache invalidation task ------------------------------------------------ | |
Fetch whatever cache configs for a given | (defn- select-ready-to-run [strategy] (t2/select :model/CacheConfig :strategy strategy {:where [:or [:= :next_run_at nil] [:<= :next_run_at (t/offset-date-time)]]})) |
Calculate when a next run should happen based on a cron schedule | (defn- calc-next-run [^String schedule now] (let [^MutableTrigger cron (cron/finalize (cron/cron-schedule schedule))] ;; Synchronize cron runner to our app time, which may be mocked in tests (.setStartTime cron (t/java-date)) (-> (.getFireTimeAfter cron (t/java-date now)) (t/offset-date-time (t/zone-offset))))) |
Update | (defn- refresh-cache-configs! [] (let [now (t/offset-date-time) invalidated-count (count (for [{:keys [id config refresh_automatically] :as cache-config} (select-ready-to-run :schedule)] (do (t2/update! :model/CacheConfig {:id id} {:next_run_at (calc-next-run (:schedule config) now) :invalidated_at now}) (when (and (premium-features/enable-preemptive-caching?) refresh_automatically) (refresh-schedule-cache! cache-config)))))] (when (premium-features/enable-preemptive-caching?) (maybe-refresh-duration-caches!)) invalidated-count)) |
Refresh 'schedule' caches | (jobs/defjob ^{org.quartz.DisallowConcurrentExecution true :doc } Cache [_ctx] (refresh-cache-configs!)) |
(def ^:private cache-job (jobs/build (jobs/with-description "Schedule Caches refresh task") (jobs/of-type Cache) (jobs/with-identity (jobs/key "metabase-enterprise.cache.job")) (jobs/store-durably))) | |
(def ^:private cache-trigger (triggers/build (triggers/with-identity (triggers/key "metabase-enterprise.cache.trigger")) (triggers/start-now) (triggers/with-schedule (cron/cron-schedule "0 * * * * ? *")))) | |
Inits periodical task checking for cache expiration | (defenterprise init-cache-task! :feature :cache-granular-controls [] (task/schedule-task! cache-job cache-trigger)) |