(ns metabase.search.task.search-index (:require [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.simple :as simple] [clojurewerkz.quartzite.triggers :as triggers] [metabase.analytics.core :as analytics] [metabase.search.core :as search] [metabase.task :as task] [metabase.util :as u] [metabase.util.log :as log]) (:import (java.time Instant) (java.util Date) (org.quartz DisallowConcurrentExecution JobExecutionContext))) | |
(set! *warn-on-reflection* true) | |
(def ^:private init-stem "metabase.task.search-index.init") (def ^:private reindex-stem "metabase.task.search-index.reindex") (def ^:private update-stem "metabase.task.search-index.update") | |
Key used to define and trigger a job that ensures there is an active index. | (def init-job-key (jobs/key (str init-stem ".job"))) |
Key used to define and trigger a job that rebuilds the entire index from scratch. | (def reindex-job-key (jobs/key (str reindex-stem ".job"))) |
Key used to define and trigger a job that makes incremental updates to the search index. | (def update-job-key (jobs/key (str update-stem ".job"))) |
We define the job bodies outside the defrecord, so that we can redefine them live from the REPL | |
(defn- report->prometheus! [duration report]
(analytics/inc! :metabase-search/index-ms duration)
(doseq [[model cnt] report]
(analytics/inc! :metabase-search/index {:model model} cnt))) | |
Create a new index, if necessary | (defn init!
[]
(when (search/supports-index?)
(try
(let [timer (u/start-timer)
report (search/init-index! {:force-reset? false, :re-populate? false})
duration (u/since-ms timer)]
(if (seq report)
(do (report->prometheus! duration report)
(log/infof "Done indexing in %.0fms %s" duration (sort-by (comp - val) report))
true)
(log/info "Found existing search index, and using it.")))
(catch Exception e
(analytics/inc! :metabase-search/index-error)
(throw e))))) |
Reindex the whole AppDB | (defn reindex!
[]
(when (search/supports-index?)
(try
(log/info "Reindexing searchable entities")
(let [timer (u/start-timer)
report (search/reindex!)
duration (u/since-ms timer)]
(report->prometheus! duration report)
(log/infof "Done reindexing in %.0fms %s" duration (sort-by (comp - val) report))
report)
(catch Exception e
(analytics/inc! :metabase-search/index-error)
(throw e))))) |
(defn- update-index! [^JobExecutionContext ctx]
(when (search/supports-index?)
(log/info "Starting Realtime Search Index Update worker")
(task/rerun-on-error ctx
(while true
(try
(let [batch (search/get-next-batch! Long/MAX_VALUE 100)
_ (log/trace "Processing batch" batch)
timer (u/start-timer)
report (search/bulk-ingest! batch)
duration (u/since-ms timer)]
(when (seq report)
(report->prometheus! duration report)
(log/debugf "Indexed search entries in %.0fms %s" duration (sort-by (comp - val) report))))
(catch Exception e
(analytics/inc! :metabase-search/index-error)
(throw e))))))) | |
Ensure a Search Index exists | (jobs/defjob SearchIndexInit [_ctx] (init!)) |
Populate a new Search Index | (jobs/defjob ^{DisallowConcurrentExecution true
:doc }
SearchIndexReindex [_ctx]
(reindex!)) |
Keep Search Index updated | (jobs/defjob SearchIndexUpdate [ctx] (update-index! ctx)) |
(defmethod task/init! ::SearchIndexInit [_]
(let [job (jobs/build
(jobs/of-type SearchIndexInit)
(jobs/store-durably)
(jobs/with-identity init-job-key))]
(task/add-job! job)
(task/trigger-now! init-job-key))) | |
(defmethod task/init! ::SearchIndexReindex [_]
(let [job (jobs/build
(jobs/of-type SearchIndexReindex)
(jobs/store-durably)
(jobs/with-identity reindex-job-key))
trigger-key (triggers/key (str reindex-stem ".trigger"))
trigger (triggers/build
(triggers/with-identity trigger-key)
(triggers/for-job reindex-job-key)
(triggers/start-at (Date/from (.plusSeconds (Instant/now) 3600)))
(triggers/with-schedule
(simple/schedule
(simple/with-interval-in-hours 1)
(simple/repeat-forever))))]
(task/schedule-task! job trigger))) | |
(defmethod task/init! ::SearchIndexUpdate [_]
(let [job (jobs/build
(jobs/of-type SearchIndexUpdate)
(jobs/with-identity update-job-key))
trigger-key (triggers/key (str update-stem ".trigger"))
trigger (triggers/build
(triggers/with-identity trigger-key)
(triggers/for-job update-job-key)
(triggers/start-now))]
(task/schedule-task! job trigger))) | |
(comment (task/job-exists? reindex-job-key) (task/job-exists? update-job-key)) | |