The background worker which performs the analysis of queries, and updates the database in accordance. Restricts the CPU and database load corresponding to this analysis via a crude rate limiting algorithm that puts the worker to sleep such that it is active at most [[max-cpu-usage-fraction]] of the time. | (ns metabase.query-analysis.task.analyze-queries (:require [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.simple :as simple] [clojurewerkz.quartzite.triggers :as triggers] [metabase.public-settings :as public-settings] [metabase.query-analysis.core :as query-analysis] [metabase.query-analysis.failure-map :as failure-map] [metabase.task :as task] [metabase.util :as u] [metabase.util.log :as log]) (:import (org.quartz DisallowConcurrentExecution))) |
(set! *warn-on-reflection* true) | |
(def ^:private max-cpu-usage-fraction 0.2) | |
(def ^:private wait-ratio (/ (- 1 max-cpu-usage-fraction) max-cpu-usage-fraction)) | |
(def ^:private fail-wait-ms (* 2 1000)) | |
(def ^:private min-wait-ms 5) | |
(def ^:private max-wait-ms (* 10 1000)) | |
(defn- wait-proportional ^long [time-taken-ms] (->> time-taken-ms (* wait-ratio) (max min-wait-ms) (min max-wait-ms))) | |
(defn- wait-fail ^long [time-taken-ms] (max fail-wait-ms (wait-proportional time-taken-ms))) | |
(defn- analyzer-loop* [stop-after next-card-id-fn] (try (loop [remaining stop-after] (let [card-or-id (next-card-id-fn) card-id (u/the-id card-or-id) timer (u/start-timer) card (query-analysis/->analyzable card-or-id)] (when (public-settings/query-analysis-enabled) (if (failure-map/non-retryable? card) (log/warnf "Skipping analysis of Card %s as its query has caused failures in the past." card-id) (try (if (:error (query-analysis/analyze!* card)) (failure-map/track-failure! card) (failure-map/track-success! card)) (let [taken-ms (Math/ceil (u/since-ms timer)) sleep-ms (wait-proportional taken-ms)] (log/debugf "Query analysis for Card %s took %sms (incl. persisting)" card-id taken-ms) (log/debugf "Waiting %sms before analysing further cards" sleep-ms) (Thread/sleep sleep-ms)) (catch Exception e (log/errorf e "Error analysing and updating query for Card %s" card-id) (failure-map/track-failure! card) (Thread/sleep (wait-fail (u/since-ms timer)))))) (cond (nil? remaining) (recur nil) (> remaining 1) (recur (dec remaining)))))) (catch Exception e (log/error e "Unhandled error when attempting to analyse the next card in the queue")))) | |
(defn- analyzer-loop! ([] (analyzer-loop! nil)) ([stop-after] (analyzer-loop* stop-after query-analysis/next-card-or-id!)) ([stop-after queue] (analyzer-loop! stop-after queue Long/MAX_VALUE)) ([stop-after queue timeout] (analyzer-loop* stop-after (partial query-analysis/next-card-or-id! queue timeout)))) | |
Analyze | (jobs/defjob ^{DisallowConcurrentExecution true :doc } QueryAnalyzer [_ctx] (analyzer-loop!)) |
(defmethod task/init! ::BackfillQueryField [_] (let [job (jobs/build (jobs/of-type QueryAnalyzer) (jobs/with-identity (jobs/key "metabase.task.analyze-queries.job"))) trigger (triggers/build (triggers/with-identity (triggers/key "metabase.task.analyze-queries.trigger")) (triggers/with-schedule (simple/schedule (simple/with-interval-in-minutes 1))) (triggers/start-now))] (task/schedule-task! job trigger))) | |