A background worker making sure that analyze the queries for all active cards, and that it is up-to-date. | (ns metabase.query-analysis.task.sweep-query-analysis (:require [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.cron :as cron] [clojurewerkz.quartzite.triggers :as triggers] [metabase.public-settings :as public-settings] [metabase.query-analysis.core :as query-analysis] [metabase.task :as task] [metabase.util :as u] [metabase.util.log :as log] [toucan2.core :as t2] [toucan2.realize :as t2.realize]) (:import (org.quartz DisallowConcurrentExecution))) |
(set! *warn-on-reflection* true) | |
Has the sweeper been run before, in this process? | (def ^:private has-run? (atom false)) |
This number has not been chosen scientifically. | (def ^:private max-delete-batch-size 1000) |
(defn- run-realized! [f reducible] (run! (comp f t2.realize/realize) reducible)) | |
(defn- analyze-cards-without-complete-analysis! ([] (analyze-cards-without-complete-analysis! query-analysis/queue-analysis!)) ([analyze-fn] (let [cards (t2/reducible-select [:model/Card :id :dataset_query :entity_id :collection_id :name :created_at] {:left-join [[:query_analysis :qa] [:and [:= :qa.card_id :report_card.id] [:= :qa.status "complete"]]] :where [:and [:not :report_card.archived] [:= :qa.id nil]]})] (run-realized! analyze-fn cards)))) | |
(defn- analyze-stale-cards! ([] (analyze-cards-without-complete-analysis! query-analysis/queue-analysis!)) ([analyze-fn] ;; TODO once we are storing the hash of the query used for analysis, we'll be able to filter this properly. (let [cards (t2/reducible-select [:model/Card :id :dataset_query :entity_id :collection_id :name :created_at])] (run-realized! analyze-fn cards)))) | |
(defn- delete-orphan-analysis! [] (transduce (comp (map :id) (partition-all max-delete-batch-size)) (fn ([final-count] final-count) ([running-count ids] (t2/delete! :model/QueryAnalysis :id [:in ids]) (+ running-count (count ids)))) 0 (t2/reducible-select [:model/QueryAnalysis :id] {:join [[:report_card :c] [:= :c.id :query_analysis.card_id]] :where :c.archived}))) | |
(defn- sweep-query-analysis-loop! ([] (sweep-query-analysis-loop! (not @has-run?)) (reset! has-run? true)) ([first-time?] (sweep-query-analysis-loop! first-time? (fn [card-or-id] (log/debugf "Queueing card %s for query analysis" (u/the-id card-or-id)) (query-analysis/queue-analysis! card-or-id)))) ([first-time? analyze-fn] ;; prioritize cards that are missing analysis (log/info "Calculating analysis for cards without any") (analyze-cards-without-complete-analysis! analyze-fn) ;; we run through all the existing analysis on our first run, as it may be stale due to an old macaw version, etc. (when first-time? (log/info "Recalculating potentially stale analysis") ;; this will repeat the cards we've just back-filled, but in the steady state there should be none of those. ;; in the future, we will track versions, hashes, and timestamps to reduce the cost of this operation. (analyze-stale-cards! analyze-fn)) ;; empty out useless records (log/info "Deleting analysis for archived cards") (log/infof "Deleted analysis for %s cards" (delete-orphan-analysis!)))) | |
Backfill QueryField for cards created earlier. Runs once per instance. | (jobs/defjob ^{DisallowConcurrentExecution true :doc } SweepQueryAnalysis [_ctx] (when (public-settings/query-analysis-enabled) (sweep-query-analysis-loop!))) |
(defmethod task/init! ::SweepQueryAnalysis [_] (let [job-key (jobs/key "metabase.task.backfill-query-fields.job") job (jobs/build (jobs/of-type SweepQueryAnalysis) (jobs/with-identity job-key) (jobs/store-durably)) trigger (triggers/build (triggers/with-identity (triggers/key "metabase.task.backfill-query-fields.trigger")) (triggers/with-schedule (cron/schedule (cron/cron-schedule ;; run every 4 hours at a random minute: (format "0 %d 0/4 1/1 * ? *" (rand-int 60))) (cron/with-misfire-handling-instruction-do-nothing))))] (task/schedule-task! job trigger) (task/trigger-now! job-key))) | |