Utility functions and macros to abstract away some common patterns and operations across the sync processes, such as logging start/end messages.

(ns metabase.sync.util
  (:require
   [clojure.math.numeric-tower :as math]
   [clojure.string :as str]
   [java-time.api :as t]
   [medley.core :as m]
   [metabase.driver :as driver]
   [metabase.driver.util :as driver.u]
   [metabase.events :as events]
   [metabase.models.interface :as mi]
   [metabase.models.task-history :as task-history]
   [metabase.query-processor.interface :as qp.i]
   [metabase.sync.interface :as i]
   [metabase.util :as u]
   [metabase.util.date-2 :as u.date]
   [metabase.util.log :as log]
   [metabase.util.malli :as mu]
   [metabase.util.malli.registry :as mr]
   [metabase.util.malli.schema :as ms]
   [toucan2.core :as t2]
   [toucan2.realize :as t2.realize])
  (:import
   (java.time.temporal Temporal)))
(set! *warn-on-reflection* true)
(derive ::event :metabase/event)
(def ^:private sync-event-topics
  #{:event/sync-begin
    :event/sync-end
    :event/analyze-begin
    :event/analyze-end
    :event/refingerprint-begin
    :event/refingerprint-end
    :event/cache-field-values-begin
    :event/cache-field-values-end
    :event/sync-metadata-begin
    :event/sync-metadata-end})
(doseq [topic sync-event-topics]
  (derive topic ::event))
(def ^:private Topic
  [:and
   events/Topic
   [:fn
    {:error/message "Sync event deriving from :metabase.sync.util/event"}
    #(isa? % ::event)]])

+----------------------------------------------------------------------------------------------------------------+ | SYNC OPERATION "MIDDLEWARE" | +----------------------------------------------------------------------------------------------------------------+

When using the sync-operation macro below the BODY of the macro will be executed in the context of several different functions below that do things like prevent duplicate operations from being ran simultaneously and taking care of things like event publishing, error handling, and logging.

These basically operate in a middleware pattern, where the various different steps take a function, and return a new function that will execute the original in whatever context or with whatever side effects appropriate for that step.

This looks something like {:sync #{1 2}, :cache #{2 3}} when populated. Key is a type of sync operation, e.g. :sync or :cache; vals are sets of DB IDs undergoing that operation.

TODO - as @salsakran mentioned it would be nice to do this via the DB so we could better support multi-instance setups in the future

(defonce ^:private operation->db-ids (atom {}))

Run f in a way that will prevent it from simultaneously being ran more for a single database more than once for a given operation. This prevents duplicate sync-like operations from taking place for a given DB, e.g. if a user hits the Sync button in the admin panel multiple times.

;; Only one sync-db! for database-id will be allowed at any given moment; duplicates will be ignored (with-duplicate-ops-prevented :sync database-id #(sync-db! database-id))

(defn with-duplicate-ops-prevented
  {:style/indent [:form]}
  [operation database-or-id f]
  (fn []
    (when-not (contains? (@operation->db-ids operation) (u/the-id database-or-id))
      (try
        ;; mark this database as currently syncing so we can prevent duplicate sync attempts (#2337)
        (swap! operation->db-ids update operation #(conj (or % #{}) (u/the-id database-or-id)))
        (log/debug "Sync operations in flight:" (m/filter-vals seq @operation->db-ids))
        ;; do our work
        (f)
        ;; always take the ID out of the set when we are through
        (finally
          (swap! operation->db-ids update operation #(disj % (u/the-id database-or-id))))))))

Publish events related to beginning and ending a sync-like process, e.g. :sync-database or :cache-values, for a database-id. f is executed between the logging of the two events.

(mu/defn- with-sync-events
  {:style/indent [:form]}
  ;; we can do everyone a favor and infer the name of the individual begin and sync events
  ([event-name-prefix database-or-id f]
   (letfn [(event-keyword [prefix suffix]
             (keyword (or (namespace event-name-prefix) "event")
                      (str (name prefix) suffix)))]
     (with-sync-events
      (event-keyword event-name-prefix "-begin")
      (event-keyword event-name-prefix "-end")
      database-or-id
      f)))
  ([begin-event-name :- Topic
    end-event-name   :- Topic
    database-or-id
    f]
   (fn []
     (let [start-time    (System/nanoTime)
           tracking-hash (str (random-uuid))]
       (events/publish-event! begin-event-name {:database_id (u/the-id database-or-id), :custom_id tracking-hash})
       (let [return        (f)
             total-time-ms (int (/ (- (System/nanoTime) start-time)
                                   1000000.0))]
         (events/publish-event! end-event-name {:database_id  (u/the-id database-or-id)
                                                :custom_id    tracking-hash
                                                :running_time total-time-ms})
         return)))))

Logs start/finish messages using log-fn, timing f

(defn- with-start-and-finish-logging*
  {:style/indent [:form]}
  [log-fn message f]
  (let [start-time (System/nanoTime)
        _          (log-fn (u/format-color 'magenta "STARTING: %s" message))
        result     (f)]
    (log-fn (u/format-color 'magenta "FINISHED: %s (%s)"
                            message
                            (u/format-nanoseconds (- (System/nanoTime) start-time))))
    result))

Log message about a process starting, then run f, and then log a message about it finishing. (The final message includes a summary of how long it took to run f.)

(defn- with-start-and-finish-logging
  {:style/indent [:form]}
  [message f]
  (fn []
    (with-start-and-finish-logging* #(log/info %) message f)))

Similar to with-start-and-finish-logging except invokesf` and returns its result and logs at the debug level

(defn- do-with-start-and-finish-debug-logging
  {:style/indent [:form]}
  [message f]
  (with-start-and-finish-logging* #(log/info %) message f))

Disable all QP and DB logging when running BODY. (This should be done for all sync-like processes to avoid cluttering the logs.)

(defn- with-db-logging-disabled
  {:style/indent [:form]}
  [f]
  (fn []
    (binding [qp.i/*disable-qp-logging* true]
      (f))))

Pass the sync operation defined by body to the database's driver's implementation of sync-in-context. This method is used to do things like establish a connection or other driver-specific steps needed for sync operations.

(defn- sync-in-context
  [database f]
  (fn []
    (driver/sync-in-context (driver.u/database->driver database) database
                            f)))

TODO: future, expand this to driver level, where the drivers themselves can add to the list of exception classes (like, driver-specific exceptions)

(doseq [klass [java.net.ConnectException
               java.net.NoRouteToHostException
               java.net.UnknownHostException
               com.mchange.v2.resourcepool.CannotAcquireResourceException
               javax.net.ssl.SSLHandshakeException]]
  (derive klass ::exception-class-not-to-retry))

Whether to log exceptions during a sync step and proceed with the rest of the sync process. This is the default behavior. You can disable this for debugging or test purposes.

(def ^:dynamic *log-exceptions-and-continue?*
  true)

Internal implementation of [[with-error-handling]]; use that instead of calling this directly.

(defn do-with-error-handling
  ([f]
   (do-with-error-handling "Error running sync step" f))
  ([message f]
   (try
     (f)
     (catch Throwable e
       (if *log-exceptions-and-continue?*
         (do
           (log/warn e message)
           e)
         (throw e))))))

Execute body in a way that catches and logs any Exceptions thrown, and returns nil if they do so. Pass a message to help provide information about what failed for the log message.

The exception classes deriving from :metabase.sync.util/exception-class-not-to-retry are a list of classes tested against exceptions thrown. If there is a match found, the sync is aborted as that error is not considered recoverable for this sync run.

(defmacro with-error-handling
  {:style/indent 1}
  [message & body]
  `(do-with-error-handling ~message (fn [] ~@body)))

Internal implementation of [[sync-operation]]; use that instead of calling this directly.

(mu/defn do-sync-operation
  [operation :- :keyword                ; something like `:sync-metadata` or `:refingerprint`
   database  :- (ms/InstanceOf :model/Database)
   message   :- ms/NonBlankString
   f         :- fn?]
  ((with-duplicate-ops-prevented
    operation database
    (with-sync-events
     operation database
     (with-start-and-finish-logging
      message
      (with-db-logging-disabled
       (sync-in-context database
                        (partial do-with-error-handling (format "Error in sync step %s" message) f))))))))

Perform the operations in body as a sync operation, which wraps the code in several special macros that do things like error handling, logging, duplicate operation prevention, and event publishing. Intended for use with the various top-level sync operations, such as sync-metadata or analyze.

(defmacro sync-operation
  {:style/indent 3}
  [operation database message & body]
  `(do-sync-operation ~operation ~database ~message (fn [] ~@body)))

+----------------------------------------------------------------------------------------------------------------+ | EMOJI PROGRESS METER | +----------------------------------------------------------------------------------------------------------------+

This is primarily provided because it makes sync more fun to look at. The functions below make it fairly simple to log a progress bar with a corresponding emoji when iterating over a sequence of objects during sync, e.g. syncing all the Tables in a given Database.

(def ^:private ^:const ^Integer emoji-meter-width 50)
(def ^:private progress-emoji
  ["馃槺"   ; face screaming in fear
   "馃槩"   ; crying face
   "馃槥"   ; disappointed face
   "馃槖"   ; unamused face
   "馃槙"   ; confused face
   "馃槓"   ; neutral face
   "馃槵"   ; grimacing face
   "馃槍"   ; relieved face
   "馃槒"   ; smirking face
   "馃構"   ; face savouring delicious food
   "馃槉"   ; smiling face with smiling eyes
   "馃槏"   ; smiling face with heart shaped eyes
   "馃槑"]) ; smiling face with sunglasses
(defn- percent-done->emoji [percent-done]
  (progress-emoji (int (math/round (* percent-done (dec (count progress-emoji)))))))

Create a string that shows progress for something, e.g. a database sync process.

(emoji-progress-bar 10 40) -> "[****路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路] 馃槖 25%

(defn emoji-progress-bar
  [completed total log-every-n]
  (let [percent-done (float (/ completed total))
        filleds      (int (* percent-done emoji-meter-width))
        blanks       (- emoji-meter-width filleds)]
    (when (or (zero? (mod completed log-every-n))
              (= completed total))
      (str "["
           (str/join (repeat filleds "*"))
           (str/join (repeat blanks "路"))
           (format "] %s  %3.0f%%" (u/emoji (percent-done->emoji percent-done)) (* percent-done 100.0))))))

Run BODY with access to a function that makes using our amazing emoji-progress-bar easy like Sunday morning. Calling the function will return the approprate string output for logging and automatically increment an internal counter as needed.

(with-emoji-progress-bar [progress-bar 10] (dotimes [i 10] (println (progress-bar))))

(defmacro with-emoji-progress-bar
  {:style/indent 1}
  [[emoji-progress-fn-binding total-count] & body]
  `(let [finished-count#            (atom 0)
         total-count#               ~total-count
         log-every-n#               (Math/ceil (/ total-count# 10))
         ~emoji-progress-fn-binding (fn [] (emoji-progress-bar (swap! finished-count# inc) total-count# log-every-n#))]
     ~@body))

+----------------------------------------------------------------------------------------------------------------+ | INITIAL SYNC STATUS | +----------------------------------------------------------------------------------------------------------------+

If this is the first sync of a database, we need to update the initial_sync_status field on individual tables when they have finished syncing, as well as the corresponding field on the database itself when the entire sync is complete (excluding analysis). This powers a UX that displays the progress of the initial sync to the admin who added the database, and enables individual tables when they become usable for queries.

Marks initial sync as complete for this table so that it becomes usable in the UI, if not already set

(defn set-initial-table-sync-complete!
  [table]
  (when (not= (:initial_sync_status table) "complete")
    (t2/update! :model/Table (u/the-id table) {:initial_sync_status "complete"})))
(def ^:private sync-tables-kv-args
  {:active          true
   :visibility_type nil})

Marks initial sync for all tables in db as complete so that it becomes usable in the UI, if not already set.

(defn set-initial-table-sync-complete-for-db!
  [database-or-id]
  (t2/update! :model/Table (merge sync-tables-kv-args {:db_id (u/the-id database-or-id)})
              {:initial_sync_status "complete"}))

Marks initial sync as complete for this database so that this is reflected in the UI, if not already set

(defn set-initial-database-sync-complete!
  [database]
  (when (not= (:initial_sync_status database) "complete")
    (t2/update! :model/Database (u/the-id database) {:initial_sync_status "complete"})))

Marks initial sync as aborted for this database so that an error can be displayed on the UI

(defn set-initial-database-sync-aborted!
  [database]
  (when (not= (:initial_sync_status database) "complete")
    (t2/update! :model/Database (u/the-id database) {:initial_sync_status "aborted"})))

+----------------------------------------------------------------------------------------------------------------+ | OTHER SYNC UTILITY FUNCTIONS | +----------------------------------------------------------------------------------------------------------------+

Returns a clause that can be used inside a HoneySQL :where clause to select all the Tables that should be synced

(def sync-tables-clause
  (into [:and] (for [[k v] sync-tables-kv-args]
                 [:= k v])))

Returns a reducible of all the Tables that should go through the sync processes for database-or-id.

(defn reducible-sync-tables
  [database-or-id & {:keys [schema-names table-names]}]
  (eduction (map t2.realize/realize)
            (t2/reducible-select :model/Table
                                 :db_id (u/the-id database-or-id)
                                 {:where [:and sync-tables-clause
                                          (when (seq schema-names) [:in :schema schema-names])
                                          (when (seq table-names) [:in :name table-names])]})))

The count of all tables that should be synced for database-or-id.

(defn sync-tables-count
  [database-or-id]
  (t2/count :model/Table :db_id (u/the-id database-or-id) {:where sync-tables-clause}))

A reducible collection of all the Tables that should go through the sync processes for database-or-id, in the order they should be refingerprinted (by earliest last_analyzed timestamp).

(defn refingerprint-reducible-sync-tables
  [database-or-id]
  (eduction (map t2.realize/realize)
            (t2/reducible-select :model/Table
                                 {:select    [:t.*]
                                  :from      [[(t2/table-name :model/Table) :t]]
                                  :left-join [[{:select   [:table_id
                                                           [[:min :last_analyzed] :earliest_last_analyzed]]
                                                :from     [(t2/table-name :model/Field)]
                                                :group-by [:table_id]} :sub]
                                              [:= :t.id :sub.table_id]]
                                  :where     [:and sync-tables-clause [:= :t.db_id (u/the-id database-or-id)]]
                                  :order-by  [[:sub.earliest_last_analyzed :asc]]})))

Returns all the Schemas that have their metadata sync'd for database-or-id. Assumes the database supports schemas.

(defn sync-schemas
  [database-or-id]
  (vec (map :schema (t2/query {:select-distinct [:schema]
                               :from            [:metabase_table]
                               :where           [:and sync-tables-clause [:= :db_id (u/the-id database-or-id)]]}))))

Return an appropriate string for logging an object in sync logging messages. Should be something like

"postgres Database 'test-data'"

This function is used all over the sync code to make sure we have easy access to consistently formatted descriptions of various objects.

(defmulti name-for-logging
  {:arglists '([instance])}
  mi/model)
(defmethod name-for-logging :model/Database
  [{database-name :name, id :id, engine :engine}]
  (format "%s Database %s ''%s''" (name engine) (str (or id "")) database-name))

Return an appropriate string for logging a table in sync logging messages.

(defn table-name-for-logging
  [& {:keys [id schema name]}]
  (format "Table %s ''%s''" (or (str id) "") (str (when (seq schema) (str schema ".")) name)))
(defmethod name-for-logging :model/Table [table]
  (table-name-for-logging table))

Return an appropriate string for logging a field in sync logging messages.

(defn field-name-for-logging
  [& {:keys [id name]}]
  (format "Field %s ''%s''" (or (str id) "") name))
(defmethod name-for-logging :model/Field [field]
  (field-name-for-logging field))

this is used for result metadata stuff.

(defmethod name-for-logging :default [{field-name :name}]
  (format "Field ''%s''" field-name))
(mu/defn calculate-duration-str :- :string
  "Given two datetimes, caculate the time between them, return the result as a string"
  [begin-time :- (ms/InstanceOfClass Temporal)
   end-time   :- (ms/InstanceOfClass Temporal)]
  (u/format-nanoseconds (.toNanos (t/duration begin-time end-time))))

Metadata common to both sync steps and an entire sync/analyze operation run

(def ^:private TimedSyncMetadata
  [:map
   [:start-time                  (ms/InstanceOfClass Temporal)]
   [:end-time   {:optional true} (ms/InstanceOfClass Temporal)]])
(mr/def ::StepRunMetadata
  [:merge
   TimedSyncMetadata
   [:map
    [:log-summary-fn [:maybe [:=> [:cat [:ref ::StepRunMetadata]] :string]]]]])

Map with metadata about the step. Contains both generic information like start-time and end-time and step specific information

(def ^:private StepRunMetadata
  [:ref ::StepRunMetadata])
(mr/def ::StepNameWithMetadata
  [:tuple
   ;; step name
   :string
   ;; step metadata
   StepRunMetadata])

Pair with the step name and metadata about the completed step run

(def StepNameWithMetadata
  [:ref ::StepNameWithMetadata])

Timing and step information for the entire sync or analyze run

(def ^:private SyncOperationMetadata
  [:merge
   TimedSyncMetadata
   [:map
    [:steps [:maybe [:sequential StepNameWithMetadata]]]]])

A log summary function takes a StepRunMetadata and returns a string with a step-specific log message

(def ^:private LogSummaryFunction
  [:=> [:cat StepRunMetadata] :string])

Defines a step. :sync-fn runs the step, returns a map that contains step specific metadata. log-summary-fn takes that metadata and turns it into a string for logging

(def ^:private StepDefinition
  [:map
   [:sync-fn        [:=> [:cat StepRunMetadata] i/DatabaseInstance]]
   [:step-name      :string]
   [:log-summary-fn [:maybe LogSummaryFunction]]])

Creates and returns a step suitable for run-step-with-metadata. See StepDefinition for more info.

(defn create-sync-step
  ([step-name sync-fn]
   (create-sync-step step-name sync-fn nil))
  ([step-name sync-fn log-summary-fn]
   {:sync-fn        sync-fn
    :step-name      step-name
    :log-summary-fn (when log-summary-fn
                      (comp str log-summary-fn))}))
(mu/defn- run-step-with-metadata :- StepNameWithMetadata
  "Runs `step` on `database` returning metadata from the run"
  [database :- i/DatabaseInstance
   {:keys [step-name sync-fn log-summary-fn] :as _step} :- StepDefinition]
  (let [start-time (t/zoned-date-time)
        results    (do-with-start-and-finish-debug-logging
                    (format "step ''%s'' for %s"
                            step-name
                            (name-for-logging database))
                    (fn [& args]
                      (try
                        (task-history/with-task-history
                         {:task            step-name
                          :db_id           (u/the-id database)
                          :on-success-info (fn [update-map result]
                                             (if (instance? Throwable result)
                                               (throw result)
                                               (assoc update-map :task_details (dissoc result :start-time :end-time :log-summary-fn))))}
                          (apply sync-fn database args))
                        (catch Throwable e
                          (if *log-exceptions-and-continue?*
                            (do
                              (log/warnf e "Error running step ''%s'' for %s" step-name (name-for-logging database))
                              {:throwable e})
                            (throw e))))))
        end-time   (t/zoned-date-time)]
    [step-name (assoc results
                      :start-time start-time
                      :end-time end-time
                      :log-summary-fn log-summary-fn)]))

The logging logic from log-sync-summary. Separated for testing purposes as the log/debug macro won't invoke this function unless the logging level is at debug (or higher).

(mu/defn- make-log-sync-summary-str
  [operation :- :string
   database :- i/DatabaseInstance
   {:keys [start-time end-time steps]} :- SyncOperationMetadata]
  (str
   (apply format
          (str "\n#################################################################\n"
               "# %s\n"
               "# %s\n"
               "# %s\n"
               "# %s\n")
          [(format "Completed %s on %s" operation (:name database))
           (format "Start: %s" (u.date/format start-time))
           (format "End: %s" (u.date/format end-time))
           (format "Duration: %s" (calculate-duration-str start-time end-time))])
   (apply str (for [[step-name {:keys [start-time end-time log-summary-fn] :as step-info}] steps]
                (apply format (str "# ---------------------------------------------------------------\n"
                                   "# %s\n"
                                   "# %s\n"
                                   "# %s\n"
                                   "# %s\n"
                                   (when log-summary-fn
                                     (format "# %s\n" (log-summary-fn step-info))))
                       [(format "Completed step ''%s''" step-name)
                        (format "Start: %s" (u.date/format start-time))
                        (format "End: %s" (u.date/format end-time))
                        (format "Duration: %s" (calculate-duration-str start-time end-time))])))
   "#################################################################\n"))

Log a sync/analyze summary message with info from each step

(mu/defn- log-sync-summary
  [operation :- :string
   database :- i/DatabaseInstance
   sync-metadata :- SyncOperationMetadata]
  ;; Note this needs to either stay nested in the `debug` macro call or be guarded by an log/enabled?
  ;; call. Constructing the log below requires some work, no need to incur that cost debug logging isn't enabled
  (log/debug (make-log-sync-summary-str operation database sync-metadata)))
(defn- do-not-retry-exception? [e]
  (or (isa? (class e) ::exception-class-not-to-retry)
      (some-> (ex-cause e) recur)))

Given the results of a sync step, returns truthy if a non-recoverable exception occurred

(defn abandon-sync?
  [step-results]
  (when-let [caught-exception (:throwable step-results)]
    (do-not-retry-exception? caught-exception)))

Run sync-steps and log a summary message

(mu/defn run-sync-operation
  [operation :- :string
   database :- i/DatabaseInstance
   sync-steps :- [:maybe [:sequential StepDefinition]]]
  (task-history/with-task-history {:task  operation
                                   :db_id (u/the-id database)}
    (let [start-time    (t/zoned-date-time)
          step-metadata (loop [[step-defn & rest-defns] sync-steps
                               result                   []]
                          (let [[step-name r] (run-step-with-metadata database step-defn)
                                new-result    (conj result [step-name r])]
                            (cond (abandon-sync? r) new-result
                                  (not (seq rest-defns)) new-result
                                  :else (recur rest-defns new-result))))
          end-time      (t/zoned-date-time)
          sync-metadata {:start-time start-time
                         :end-time   end-time
                         :steps      step-metadata}]
      (log-sync-summary operation database sync-metadata)
      sync-metadata)))

Similar to a 2-arg call to map, but will add all numbers that result from the invocations of f. Used mainly for logging purposes, such as to count and log the number of Fields updated by a sync operation. See also sum-for, a for-style macro version.

(defn sum-numbers
  [f coll]
  (reduce + (for [item coll
                  :let [result (f item)]
                  :when (number? result)]
              result)))

Impl for sum-for macro; see its docstring;

(defn sum-for*
  [results]
  (reduce + (filter number? results)))

Basically the same as for, but sums the results of each iteration of body that returned a number. See also sum-numbers.

As an added bonus, unlike normal for, this wraps body in an implicit do, so you can have more than one form inside the loop. Nice

(defmacro sum-for
  {:style/indent 1}
  [[item-binding coll & more-for-bindings] & body]
  `(sum-for* (for [~item-binding ~coll
                   ~@more-for-bindings]
               (do ~@body))))