(ns metabase.task.search-index
  (:require
   [clojurewerkz.quartzite.jobs :as jobs]
   [clojurewerkz.quartzite.schedule.simple :as simple]
   [clojurewerkz.quartzite.triggers :as triggers]
   [metabase.analytics.prometheus :as prometheus]
   [metabase.search.core :as search]
   [metabase.task :as task]
   [metabase.util :as u]
   [metabase.util.log :as log])
  (:import
   (java.time Instant)
   (java.util Date)
   (org.quartz DisallowConcurrentExecution JobDetail Trigger)))
(set! *warn-on-reflection* true)
(def ^:private init-stem "metabase.task.search-index.init")
(def ^:private reindex-stem "metabase.task.search-index.reindex")
(def ^:private update-stem "metabase.task.search-index.update")

Key used to define and trigger a job that ensures there is an active index.

(def init-job-key
  (jobs/key (str init-stem ".job")))

Key used to define and trigger a job that rebuilds the entire index from scratch.

(def reindex-job-key
  (jobs/key (str reindex-stem ".job")))

Key used to define and trigger a job that makes incremental updates to the search index.

(def update-job-key
  (jobs/key (str update-stem ".job")))

We define the job bodies outside the defrecord, so that we can redefine them live from the REPL

(defn- report->prometheus! [duration report]
  (prometheus/inc! :metabase-search/index-ms duration)
  (doseq [[model cnt] report]
    (prometheus/inc! :metabase-search/index {:model model} cnt)))

Create a new index, if necessary

(defn init!
  []
  (when (search/supports-index?)
    (let [timer    (u/start-timer)
          report   (search/init-index! {:force-reset? false, :re-populate? false})
          duration (u/since-ms timer)]
      (if (seq report)
        (do (report->prometheus! duration report)
            (log/infof "Done indexing in %.0fms %s" duration (sort-by (comp - val) report))
            true)
        (log/info "Found existing search index, and using it.")))))

Reindex the whole AppDB

(defn reindex!
  []
  (when (search/supports-index?)
    (log/info "Reindexing searchable entities")
    (let [timer    (u/start-timer)
          report   (search/reindex!)
          duration (u/since-ms timer)]
      (report->prometheus! duration report)
      (log/infof "Done reindexing in %.0fms %s" duration (sort-by (comp - val) report))
      report)))
(defn- update-index! []
  (when (search/supports-index?)
    (while true
      (let [timer    (u/start-timer)
            report   (search/process-next-batch! Long/MAX_VALUE 100)
            duration (u/since-ms timer)]
        (when (seq report)
          (report->prometheus! duration report)
          (log/debugf "Indexed search entries in %.0fms %s" duration (sort-by (comp - val) report)))))))
(defn- force-scheduled-task! [^JobDetail job ^Trigger trigger]
  ;; For some reason, using the schedule-task! with a non-durable job causes it to only fire on the first trigger.
  #_(task/schedule-task! job trigger)
  (task/delete-task! (.getKey job) (.getKey trigger))
  (task/add-job! job)
  (task/add-trigger! trigger))

Ensure a Search Index exists

(jobs/defjob 
  SearchIndexInit [_ctx]
  (init!))

Populate a new Search Index

(jobs/defjob ^{DisallowConcurrentExecution true
               :doc                        }
  SearchIndexReindex [_ctx]
  (reindex!))

Keep Search Index updated

(jobs/defjob ^{DisallowConcurrentExecution true
               :doc                        }
  SearchIndexUpdate [_ctx]
  (update-index!))
(defmethod task/init! ::SearchIndexInit [_]
  (let [job (jobs/build
             (jobs/of-type SearchIndexInit)
             (jobs/store-durably)
             (jobs/with-identity init-job-key))]
    (task/add-job! job)
    (task/trigger-now! init-job-key)))
(defmethod task/init! ::SearchIndexReindex [_]
  (let [job         (jobs/build
                     (jobs/of-type SearchIndexReindex)
                     (jobs/store-durably)
                     (jobs/with-identity reindex-job-key))
        trigger-key (triggers/key (str reindex-stem ".trigger"))
        trigger     (triggers/build
                     (triggers/with-identity trigger-key)
                     (triggers/for-job reindex-job-key)
                     (triggers/start-at (Date/from (.plusSeconds (Instant/now) 3600)))
                     (triggers/with-schedule
                      (simple/schedule
                       (simple/with-interval-in-hours 1)
                       (simple/repeat-forever))))]
    (task/schedule-task! job trigger)))
(defmethod task/init! ::SearchIndexUpdate [_]
  (let [job         (jobs/build
                     (jobs/of-type SearchIndexUpdate)
                     (jobs/store-durably)
                     (jobs/with-identity update-job-key))
        trigger-key (triggers/key (str update-stem ".trigger"))
        trigger     (triggers/build
                     (triggers/with-identity trigger-key)
                     (triggers/for-job update-job-key)
                     (triggers/start-now)
                     ;; This schedule is only here to restart the task if it dies for some reason.
                     (triggers/with-schedule (simple/schedule (simple/with-interval-in-seconds 1))))]
    (force-scheduled-task! job trigger)))
(comment
  (task/job-exists? reindex-job-key)
  (task/job-exists? update-job-key))