(ns metabase.search.appdb.index
  (:require
   [clojure.string :as str]
   [honey.sql :as sql]
   [honey.sql.helpers :as sql.helpers]
   [metabase.config :as config]
   [metabase.db :as mdb]
   [metabase.models.search-index-metadata :as search-index-metadata]
   [metabase.search.appdb.specialization.api :as specialization]
   [metabase.search.appdb.specialization.h2 :as h2]
   [metabase.search.appdb.specialization.postgres :as postgres]
   [metabase.search.config :as search.config]
   [metabase.search.engine :as search.engine]
   [metabase.search.spec :as search.spec]
   [metabase.util :as u]
   [metabase.util.json :as json]
   [metabase.util.log :as log]
   [toucan2.core :as t2])
  (:import
   (clojure.lang ExceptionInfo)
   (org.postgresql.util PSQLException)))
(comment
  h2/keep-me
  postgres/keep-me)
(set! *warn-on-reflection* true)
(def ^:private insert-batch-size 150)
(def ^:private sync-tracking-period (long (* 5 #_minutes 60e9)))
(defonce ^:dynamic ^:private *index-version-id*
  (if config/is-prod?
    (:hash config/mb-version-info)
    (str (random-uuid))))
(defonce ^:private next-sync-at (atom nil))
(defonce ^:dynamic ^:private *indexes* (atom {:active nil, :pending nil}))
(def ^:private ^:dynamic *mocking-tables* false)
(defmethod search.engine/reset-tracking! :search.engine/appdb [_]
  (reset! *indexes* nil))
(declare exists?)
(defn- sync-tracking-atoms! []
  (reset! *indexes* (into {}
                          (for [[status table-name] (search-index-metadata/indexes :appdb *index-version-id*)]
                            (if (exists? table-name)
                              [status (keyword table-name)]
                                ;; For debugging, make it clear why we are not tracking the given metadata.
                              [(keyword (name status) "not-found") (keyword table-name)])))))

This exists only to be mocked.

(defn- now [] (System/nanoTime))
(defn- sync-tracking-atoms-if-stale! []
  (when-not *mocking-tables*
    (when (or (not @next-sync-at) (> (now) @next-sync-at))
      (reset! next-sync-at (+ (now) sync-tracking-period))
      (sync-tracking-atoms!))))

The table against which we should currently make search queries.

(defn active-table
  []
  (sync-tracking-atoms-if-stale!)
  (:active @*indexes*))

A partially populated table that will take over from [[active-table]] when it is done.

(defn- pending-table
  []
  (sync-tracking-atoms-if-stale!)
  (:pending @*indexes*))

Generate a unique table name to use as a search index table.

(defn gen-table-name
  []
  (keyword (str/replace (str "search_index__" (random-uuid)) #"-" "_")))
(defn- table-name [kw]
  (cond-> (name kw)
    (= :h2 (mdb/db-type)) u/upper-case-en))
(defn- exists? [table]
  (when table
    (t2/exists? :information_schema.tables :table_name (table-name table))))
(defn- drop-table! [table]
  (boolean
   (when (and table (exists? table))
     (t2/query (sql.helpers/drop-table (keyword (table-name table)))))))
(defn- orphan-indexes []
  (map (comp keyword u/lower-case-en :table_name)
       (t2/query {:select [:table_name]
                  :from   :information_schema.tables
                  :where  [:and
                           [:= :table_schema :%current_schema]
                           [:or
                            [:like [:lower :table_name] [:inline "search\\_index\\_\\_%"]]
                            ;; legacy table names
                            [:in [:lower :table_name]
                             (mapv #(vector :inline %) ["search_index" "search_index_next" "search_index_retired"])]]
                           [:not-in [:lower :table_name]
                            [:raw
                             (str "("
                                  (first (sql/format {:select [:index_name]
                                                      :from   [[(t2/table-name :model/SearchIndexMetadata) :metadata]]
                                                      :where  [:= :metadata.engine [:inline "appdb"]]}))
                                  ")")]]]})))
(defn- delete-obsolete-tables! []
  ;; Delete metadata around indexes that are no longer needed.
  (search-index-metadata/delete-obsolete! *index-version-id*)
  ;; Drop any indexes that are no longer referenced.
  (let [dropped (volatile! 0)]
    (doseq [table (orphan-indexes)]
      (try
        (t2/query (sql.helpers/drop-table table))
        (vswap! dropped inc)
        ;; Deletion could fail if it races with other instances
        (catch ExceptionInfo _)))
    (log/infof "Dropped %d stale indexes" @dropped)))
(defn- ->db-type [t]
  (get {:pk :int, :timestamp :timestamp-with-time-zone} t t))
(defn- ->db-column [c]
  (or (get {:id         :model_id
            :created-at :model_created_at
            :updated-at :model_updated_at}
           c)
      (keyword (u/->snake_case_en (name c)))))
(def ^:private not-null
  #{:archived :name})
(def ^:private default
  {:archived false})

If this fails, we'll need to increase the size of :model below

(assert (>= 32 (transduce (map (comp count name)) max 0 search.config/all-models)))
(def ^:private base-schema
  (into [[:model [:varchar 32] :not-null]
         [:display_data :text :not-null]
         [:legacy_input :text :not-null]
         ;; useful for tracking the speed and age of the index
         [:created_at :timestamp-with-time-zone
          [:default [:raw "CURRENT_TIMESTAMP"]]
          :not-null]
         [:updated_at :timestamp-with-time-zone :not-null]]
        (keep (fn [[k t]]
                (when t
                  (into [(->db-column k) (->db-type t)]
                        (concat
                         (when (not-null k)
                           [:not-null])
                         (when-some [d (default k)]
                           [[:default d]]))))))
        search.spec/attr-types))

Create an index table with the given name. Should fail if it already exists.

(defn create-table!
  [table-name]
  (-> (sql.helpers/create-table table-name)
      (sql.helpers/with-columns (specialization/table-schema base-schema))
      t2/query)
  (let [table-name (name table-name)]
    (doseq [stmt (specialization/post-create-statements table-name table-name)]
      (t2/query stmt))))

Create a search index table if one doesn't exist. Record and return the name of the table, regardless.

(defn maybe-create-pending!
  []
  (if *mocking-tables*
    ;; The atoms are the only source of truth, create a new table if necessary.
    (or (pending-table)
        (let [table-name (gen-table-name)]
          (create-table! table-name)
          (swap! *indexes* assoc :pending table-name)))
    ;; The database is the source of truth
    (let [{:keys [pending]} (sync-tracking-atoms!)]
      (or pending
          (let [table-name (gen-table-name)]
            ;; We may fail to insert a new metadata row if we lose a race with another instance.
            (when (search-index-metadata/create-pending! :appdb *index-version-id* table-name)
              (create-table! table-name))
            (:pending (sync-tracking-atoms!)))))))

Make the pending index active if it exists. Returns true if it did so.

(defn activate-table!
  []
  (if *mocking-tables*
    ;; The atoms are the only source of truth, we must not update the metadata.
    (boolean
     (when-let [pending (:pending @*indexes*)]
       (reset! *indexes* {:pending nil, :active pending})))
    ;; Ensure the metadata is updated and pruned.
    (let [{:keys [pending]} (sync-tracking-atoms!)]
      (when pending
        (reset! *indexes* {:pending nil
                           :active  (keyword (search-index-metadata/active-pending! :appdb *index-version-id*))}))
      ;; Clean up while we're here
      (delete-obsolete-tables!)
      ;; Did *we* do a rotation?
      (boolean pending))))
(defn- document->entry [entity]
  (-> entity
      (select-keys
       ;; remove attrs that get aliased
       (remove #{:id :created_at :updated_at :native_query}
               (conj search.spec/attr-columns :model :display_data :legacy_input)))
      (update :display_data json/encode)
      (update :legacy_input json/encode)
      (assoc
       :updated_at       :%now
       :model_id         (:id entity)
       :model_created_at (:created_at entity)
       :model_updated_at (:updated_at entity))
      (merge (specialization/extra-entry-fields entity))))

Remove any entries corresponding directly to a given model instance.

(defn delete!
  [id search-models]
  ;; In practice, we expect this to be 1-1, but the data model does not preclude it.
  (when (seq search-models)
    (doseq [table-name [(active-table) (pending-table)] :when table-name]
      (t2/delete! table-name :model_id id :model [:in search-models]))))
(defn- safe-batch-upsert! [table-name entries]
  ;; For convenience, no-op if we are not tracking any table.
  (when table-name
    (try
      (specialization/batch-upsert! table-name entries)
      (catch Exception e
        ;; TODO we should handle the H2, MySQL, and MariaDB flavors here too
        (if (instance? PSQLException (ex-cause e))
          ;; Suppress database errors, which are likely due to stale tracking data.
          (sync-tracking-atoms!)
          (throw e))))))

Create the given search index entries in bulk

(defn- batch-update!
  [documents]
  ;; Protect against tests that nuke the appdb
  (when config/is-test?
    (when-let [table (active-table)]
      (when (not (exists? table))
        (log/warnf "Unable to find table %s and no longer tracking it as active", table)
        (swap! *indexes* assoc :active nil)))
    (when-let [table (pending-table)]
      (when (not (exists? table))
        (log/warnf "Unable to find table %s and no longer tracking it as pending", table)
        (swap! *indexes* assoc :pending nil))))
  (let [entries          (map document->entry documents)
        ;; Optimization idea: if the updates are coming from the re-indexing worker, skip updating the active table.
        ;;                    this should give a close to 2x speed-up as insertion is the bottleneck, and most of the
        ;;                    updates will be no-ops in any case.
        active-updated?  (safe-batch-upsert! (active-table) entries)
        pending-updated? (safe-batch-upsert! (pending-table) entries)]
    (when (or active-updated? pending-updated?)
      (->> entries (map :model) frequencies))))
(defmethod search.engine/consume! :search.engine/appdb [_engine document-reducible]
  (transduce (comp (partition-all insert-batch-size)
                   (map batch-update!))
             (partial merge-with +)
             document-reducible))

Query fragment for all models corresponding to a query parameter :search-term.

(defn search-query
  ([search-term search-ctx]
   (search-query search-term search-ctx [:model_id :model]))
  ([search-term search-ctx select-items]
   (when-let [index-table (active-table)]
     (specialization/base-query index-table search-term search-ctx select-items))))

Use the index table to search for records.

(defn search
  [search-term & [search-ctx]]
  (map (juxt :model :name)
       (t2/query (search-query search-term search-ctx [:model :name]))))

Ensure we have a blank slate; in case the table schema or stored data format has changed.

(defn reset-index!
  []
  ;; stop tracking any pending table
  (when-let [table-name (pending-table)]
    (when-not *mocking-tables*
      (search-index-metadata/delete-index! :appdb *index-version-id* table-name))
    (swap! *indexes* assoc :pending nil))
  (maybe-create-pending!)
  (activate-table!))

Ensure the index is ready to be populated. Return false if it was already ready.

(defn ensure-ready!
  [& {:keys [force-reset?]}]
  ;; Be extra careful against races on initializing the setting
  (locking *indexes*
    (when-not *mocking-tables*
      (when (nil? (active-table))
        (sync-tracking-atoms!)))
    (when (or force-reset? (not (exists? (active-table))))
      (reset-index!))))

Create a temporary index table for the duration of the body.

#_{:clj-kondo/ignore [:metabase/test-helpers-use-non-thread-safe-functions]}
(defmacro with-temp-index-table
  [& body]
  `(let [table-name# (gen-table-name)]
     (binding [*mocking-tables* true
               *indexes*        (atom {:active table-name#})]
       (try
         (create-table! table-name#)
         ~@body
         (finally
           (#'drop-table! table-name#))))))