A background worker making sure that analyze the queries for all active cards, and that it is up-to-date. | (ns metabase.query-analysis.task.sweep-query-analysis (:require [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.cron :as cron] [clojurewerkz.quartzite.triggers :as triggers] [metabase.public-settings :as public-settings] [metabase.query-analysis.core :as query-analysis] [metabase.task :as task] [metabase.util :as u] [metabase.util.log :as log] [toucan2.core :as t2] [toucan2.realize :as t2.realize]) (:import (org.quartz DisallowConcurrentExecution))) |
(set! *warn-on-reflection* true) | |
Has the sweeper been run before, in this process? | (def ^:private has-run? (atom false)) |
This number has not been chosen scientifically. | (def ^:private max-delete-batch-size 1000) |
(defn- run-realized! [f reducible] (run! (comp f t2.realize/realize) reducible)) | |
(defn- analyze-cards-without-complete-analysis!
([]
(analyze-cards-without-complete-analysis! query-analysis/queue-analysis!))
([analyze-fn]
(let [cards (t2/reducible-select [:model/Card :id :dataset_query :entity_id :collection_id :name :created_at]
{:left-join [[:query_analysis :qa]
[:and
[:= :qa.card_id :report_card.id]
[:= :qa.status "complete"]]]
:where [:and
[:not :report_card.archived]
[:= :qa.id nil]]})]
(run-realized! analyze-fn cards)))) | |
(defn- analyze-stale-cards!
([]
(analyze-cards-without-complete-analysis! query-analysis/queue-analysis!))
([analyze-fn]
;; TODO once we are storing the hash of the query used for analysis, we'll be able to filter this properly.
(let [cards (t2/reducible-select [:model/Card :id :dataset_query :entity_id :collection_id :name :created_at])]
(run-realized! analyze-fn cards)))) | |
(defn- delete-orphan-analysis! []
(transduce
(comp (map :id)
(partition-all max-delete-batch-size))
(fn
([final-count] final-count)
([running-count ids]
(t2/delete! :model/QueryAnalysis :id [:in ids])
(+ running-count (count ids))))
0
(t2/reducible-select [:model/QueryAnalysis :id]
{:join [[:report_card :c] [:= :c.id :query_analysis.card_id]]
:where :c.archived}))) | |
(defn- sweep-query-analysis-loop!
([]
(sweep-query-analysis-loop! (not @has-run?))
(reset! has-run? true))
([first-time?]
(sweep-query-analysis-loop! first-time?
(fn [card-or-id]
(log/debugf "Queueing card %s for query analysis" (u/the-id card-or-id))
(query-analysis/queue-analysis! card-or-id))))
([first-time? analyze-fn]
;; prioritize cards that are missing analysis
(log/info "Calculating analysis for cards without any")
(analyze-cards-without-complete-analysis! analyze-fn)
;; we run through all the existing analysis on our first run, as it may be stale due to an old macaw version, etc.
(when first-time?
(log/info "Recalculating potentially stale analysis")
;; this will repeat the cards we've just back-filled, but in the steady state there should be none of those.
;; in the future, we will track versions, hashes, and timestamps to reduce the cost of this operation.
(analyze-stale-cards! analyze-fn))
;; empty out useless records
(log/info "Deleting analysis for archived cards")
(log/infof "Deleted analysis for %s cards" (delete-orphan-analysis!)))) | |
Backfill QueryField for cards created earlier. Runs once per instance. | (jobs/defjob ^{DisallowConcurrentExecution true
:doc }
SweepQueryAnalysis [_ctx]
(when (public-settings/query-analysis-enabled)
(sweep-query-analysis-loop!))) |
(defmethod task/init! ::SweepQueryAnalysis [_]
(let [job-key (jobs/key "metabase.task.backfill-query-fields.job")
job (jobs/build
(jobs/of-type SweepQueryAnalysis)
(jobs/with-identity job-key)
(jobs/store-durably))
trigger (triggers/build
(triggers/with-identity (triggers/key "metabase.task.backfill-query-fields.trigger"))
(triggers/with-schedule
(cron/schedule
(cron/cron-schedule
;; run every 4 hours at a random minute:
(format "0 %d 0/4 1/1 * ? *" (rand-int 60)))
(cron/with-misfire-handling-instruction-do-nothing))))]
(task/schedule-task! job trigger)
(task/trigger-now! job-key))) | |