The background worker which performs the analysis of queries, and updates the database in accordance. Restricts the CPU and database load corresponding to this analysis via a crude rate limiting algorithm that puts the worker to sleep such that it is active at most [[max-cpu-usage-fraction]] of the time. | (ns metabase.task.analyze-queries (:require [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.simple :as simple] [clojurewerkz.quartzite.triggers :as triggers] [metabase.public-settings :as public-settings] [metabase.query-analysis :as query-analysis] [metabase.query-analysis.failure-map :as failure-map] [metabase.task :as task] [metabase.util :as u] [metabase.util.log :as log]) (:import (org.quartz DisallowConcurrentExecution))) |
(set! *warn-on-reflection* true) | |
(def ^:private max-cpu-usage-fraction 0.2) | |
(def ^:private wait-ratio (/ (- 1 max-cpu-usage-fraction) max-cpu-usage-fraction)) | |
(def ^:private fail-wait-ms (* 2 1000)) | |
(def ^:private min-wait-ms 5) | |
(def ^:private max-wait-ms (* 10 1000)) | |
(defn- wait-proportional ^long [time-taken-ms]
(->> time-taken-ms
(* wait-ratio)
(max min-wait-ms)
(min max-wait-ms))) | |
(defn- wait-fail ^long [time-taken-ms] (max fail-wait-ms (wait-proportional time-taken-ms))) | |
(defn- analyzer-loop* [stop-after next-card-id-fn]
(try
(loop [remaining stop-after]
(let [card-or-id (next-card-id-fn)
card-id (u/the-id card-or-id)
timer (u/start-timer)
card (query-analysis/->analyzable card-or-id)]
(when (public-settings/query-analysis-enabled)
(if (failure-map/non-retryable? card)
(log/warnf "Skipping analysis of Card %s as its query has caused failures in the past." card-id)
(try
(if (:error (query-analysis/analyze!* card))
(failure-map/track-failure! card)
(failure-map/track-success! card))
(let [taken-ms (Math/ceil (u/since-ms timer))
sleep-ms (wait-proportional taken-ms)]
(log/debugf "Query analysis for Card %s took %sms (incl. persisting)" card-id taken-ms)
(log/debugf "Waiting %sms before analysing further cards" sleep-ms)
(Thread/sleep sleep-ms))
(catch Exception e
(log/errorf e "Error analysing and updating query for Card %s" card-id)
(failure-map/track-failure! card)
(Thread/sleep (wait-fail (u/since-ms timer))))))
(cond
(nil? remaining) (recur nil)
(> remaining 1) (recur (dec remaining))))))
(catch Exception e
(log/error e "Unhandled error when attempting to analyse the next card in the queue")))) | |
(defn- analyzer-loop! ([] (analyzer-loop! nil)) ([stop-after] (analyzer-loop* stop-after query-analysis/next-card-or-id!)) ([stop-after queue] (analyzer-loop! stop-after queue Long/MAX_VALUE)) ([stop-after queue timeout] (analyzer-loop* stop-after (partial query-analysis/next-card-or-id! queue timeout)))) | |
Analyze | (jobs/defjob ^{DisallowConcurrentExecution true
:doc }
QueryAnalyzer [_ctx]
(analyzer-loop!)) |
(defmethod task/init! ::BackfillQueryField [_]
(let [job (jobs/build
(jobs/of-type QueryAnalyzer)
(jobs/with-identity (jobs/key "metabase.task.analyze-queries.job")))
trigger (triggers/build
(triggers/with-identity (triggers/key "metabase.task.analyze-queries.trigger"))
(triggers/with-schedule
(simple/schedule (simple/with-interval-in-minutes 1)))
(triggers/start-now))]
(task/schedule-task! job trigger))) | |