(ns metabase.search.task.search-index
  (:require
   [clojurewerkz.quartzite.jobs :as jobs]
   [clojurewerkz.quartzite.schedule.simple :as simple]
   [clojurewerkz.quartzite.triggers :as triggers]
   [metabase.analytics.core :as analytics]
   [metabase.search.core :as search]
   [metabase.task :as task]
   [metabase.util :as u]
   [metabase.util.log :as log])
  (:import
   (java.time Instant)
   (java.util Date)
   (org.quartz DisallowConcurrentExecution JobExecutionContext)))
(set! *warn-on-reflection* true)
(def ^:private init-stem "metabase.task.search-index.init")
(def ^:private reindex-stem "metabase.task.search-index.reindex")
(def ^:private update-stem "metabase.task.search-index.update")

Key used to define and trigger a job that ensures there is an active index.

(def init-job-key
  (jobs/key (str init-stem ".job")))

Key used to define and trigger a job that rebuilds the entire index from scratch.

(def reindex-job-key
  (jobs/key (str reindex-stem ".job")))

Key used to define and trigger a job that makes incremental updates to the search index.

(def update-job-key
  (jobs/key (str update-stem ".job")))

We define the job bodies outside the defrecord, so that we can redefine them live from the REPL

(defn- report->prometheus! [duration report]
  (analytics/inc! :metabase-search/index-ms duration)
  (doseq [[model cnt] report]
    (analytics/inc! :metabase-search/index {:model model} cnt)))

Create a new index, if necessary

(defn init!
  []
  (when (search/supports-index?)
    (try
      (let [timer    (u/start-timer)
            report   (search/init-index! {:force-reset? false, :re-populate? false})
            duration (u/since-ms timer)]
        (if (seq report)
          (do (report->prometheus! duration report)
              (log/infof "Done indexing in %.0fms %s" duration (sort-by (comp - val) report))
              true)
          (log/info "Found existing search index, and using it.")))
      (catch Exception e
        (analytics/inc! :metabase-search/index-error)
        (throw e)))))

Reindex the whole AppDB

(defn reindex!
  []
  (when (search/supports-index?)
    (try
      (log/info "Reindexing searchable entities")
      (let [timer    (u/start-timer)
            report   (search/reindex!)
            duration (u/since-ms timer)]
        (report->prometheus! duration report)
        (log/infof "Done reindexing in %.0fms %s" duration (sort-by (comp - val) report))
        report)
      (catch Exception e
        (analytics/inc! :metabase-search/index-error)
        (throw e)))))
(defn- update-index! [^JobExecutionContext ctx]
  (when (search/supports-index?)
    (log/info "Starting Realtime Search Index Update worker")
    (task/rerun-on-error ctx
      (while true
        (try
          (let [batch    (search/get-next-batch! Long/MAX_VALUE 100)
                _        (log/trace "Processing batch" batch)
                timer    (u/start-timer)
                report   (search/bulk-ingest! batch)
                duration (u/since-ms timer)]
            (when (seq report)
              (report->prometheus! duration report)
              (log/debugf "Indexed search entries in %.0fms %s" duration (sort-by (comp - val) report))))
          (catch Exception e
            (analytics/inc! :metabase-search/index-error)
            (throw e)))))))

Ensure a Search Index exists

(jobs/defjob 
  SearchIndexInit [_ctx]
  (init!))

Populate a new Search Index

(jobs/defjob ^{DisallowConcurrentExecution true
               :doc                        }
  SearchIndexReindex [_ctx]
  (reindex!))

Keep Search Index updated

(jobs/defjob 
  SearchIndexUpdate [ctx]
  (update-index! ctx))
(defmethod task/init! ::SearchIndexInit [_]
  (let [job (jobs/build
             (jobs/of-type SearchIndexInit)
             (jobs/store-durably)
             (jobs/with-identity init-job-key))]
    (task/add-job! job)
    (task/trigger-now! init-job-key)))
(defmethod task/init! ::SearchIndexReindex [_]
  (let [job         (jobs/build
                     (jobs/of-type SearchIndexReindex)
                     (jobs/store-durably)
                     (jobs/with-identity reindex-job-key))
        trigger-key (triggers/key (str reindex-stem ".trigger"))
        trigger     (triggers/build
                     (triggers/with-identity trigger-key)
                     (triggers/for-job reindex-job-key)
                     (triggers/start-at (Date/from (.plusSeconds (Instant/now) 3600)))
                     (triggers/with-schedule
                      (simple/schedule
                       (simple/with-interval-in-hours 1)
                       (simple/repeat-forever))))]
    (task/schedule-task! job trigger)))
(defmethod task/init! ::SearchIndexUpdate [_]
  (let [job         (jobs/build
                     (jobs/of-type SearchIndexUpdate)
                     (jobs/with-identity update-job-key))
        trigger-key (triggers/key (str update-stem ".trigger"))
        trigger     (triggers/build
                     (triggers/with-identity trigger-key)
                     (triggers/for-job update-job-key)
                     (triggers/start-now))]
    (task/schedule-task! job trigger)))
(comment
  (task/job-exists? reindex-job-key)
  (task/job-exists? update-job-key))