Utility functions and macros to abstract away some common patterns and operations across the sync processes, such as logging start/end messages. | (ns metabase.sync.util (:require [clojure.math.numeric-tower :as math] [clojure.string :as str] [java-time.api :as t] [medley.core :as m] [metabase.driver :as driver] [metabase.driver.util :as driver.u] [metabase.events :as events] [metabase.models.interface :as mi] [metabase.models.task-history :as task-history] [metabase.query-processor.interface :as qp.i] [metabase.sync.interface :as i] [metabase.util :as u] [metabase.util.date-2 :as u.date] [metabase.util.log :as log] [metabase.util.malli :as mu] [metabase.util.malli.registry :as mr] [metabase.util.malli.schema :as ms] [toucan2.core :as t2] [toucan2.realize :as t2.realize]) (:import (java.time.temporal Temporal))) |
(set! *warn-on-reflection* true) | |
(derive ::event :metabase/event) | |
(def ^:private sync-event-topics
#{:event/sync-begin
:event/sync-end
:event/analyze-begin
:event/analyze-end
:event/refingerprint-begin
:event/refingerprint-end
:event/cache-field-values-begin
:event/cache-field-values-end
:event/sync-metadata-begin
:event/sync-metadata-end}) | |
(doseq [topic sync-event-topics] (derive topic ::event)) | |
(def ^:private Topic
[:and
events/Topic
[:fn
{:error/message "Sync event deriving from :metabase.sync.util/event"}
#(isa? % ::event)]]) | |
+----------------------------------------------------------------------------------------------------------------+ | SYNC OPERATION "MIDDLEWARE" | +----------------------------------------------------------------------------------------------------------------+ | |
When using the These basically operate in a middleware pattern, where the various different steps take a function, and return a new function that will execute the original in whatever context or with whatever side effects appropriate for that step. | |
This looks something like {:sync #{1 2}, :cache #{2 3}} when populated.
Key is a type of sync operation, e.g. TODO - as @salsakran mentioned it would be nice to do this via the DB so we could better support multi-instance setups in the future | (defonce ^:private operation->db-ids (atom {})) |
Run ;; Only one | (defn with-duplicate-ops-prevented
{:style/indent [:form]}
[operation database-or-id f]
(fn []
(when-not (contains? (@operation->db-ids operation) (u/the-id database-or-id))
(try
;; mark this database as currently syncing so we can prevent duplicate sync attempts (#2337)
(swap! operation->db-ids update operation #(conj (or % #{}) (u/the-id database-or-id)))
(log/debug "Sync operations in flight:" (m/filter-vals seq @operation->db-ids))
;; do our work
(f)
;; always take the ID out of the set when we are through
(finally
(swap! operation->db-ids update operation #(disj % (u/the-id database-or-id)))))))) |
Publish events related to beginning and ending a sync-like process, e.g. | (mu/defn- with-sync-events
{:style/indent [:form]}
;; we can do everyone a favor and infer the name of the individual begin and sync events
([event-name-prefix database-or-id f]
(letfn [(event-keyword [prefix suffix]
(keyword (or (namespace event-name-prefix) "event")
(str (name prefix) suffix)))]
(with-sync-events
(event-keyword event-name-prefix "-begin")
(event-keyword event-name-prefix "-end")
database-or-id
f)))
([begin-event-name :- Topic
end-event-name :- Topic
database-or-id
f]
(fn []
(let [start-time (System/nanoTime)
tracking-hash (str (random-uuid))]
(events/publish-event! begin-event-name {:database_id (u/the-id database-or-id), :custom_id tracking-hash})
(let [return (f)
total-time-ms (int (/ (- (System/nanoTime) start-time)
1000000.0))]
(events/publish-event! end-event-name {:database_id (u/the-id database-or-id)
:custom_id tracking-hash
:running_time total-time-ms})
return))))) |
Logs start/finish messages using | (defn- with-start-and-finish-logging*
{:style/indent [:form]}
[log-fn message f]
(let [start-time (System/nanoTime)
_ (log-fn (u/format-color 'magenta "STARTING: %s" message))
result (f)]
(log-fn (u/format-color 'magenta "FINISHED: %s (%s)"
message
(u/format-nanoseconds (- (System/nanoTime) start-time))))
result)) |
Log | (defn- with-start-and-finish-logging
{:style/indent [:form]}
[message f]
(fn []
(with-start-and-finish-logging* #(log/info %) message f))) |
Similar to | (defn- do-with-start-and-finish-debug-logging
{:style/indent [:form]}
[message f]
(with-start-and-finish-logging* #(log/info %) message f)) |
Disable all QP and DB logging when running BODY. (This should be done for all sync-like processes to avoid cluttering the logs.) | (defn- with-db-logging-disabled
{:style/indent [:form]}
[f]
(fn []
(binding [qp.i/*disable-qp-logging* true]
(f)))) |
Pass the sync operation defined by | (defn- sync-in-context
[database f]
(fn []
(driver/sync-in-context (driver.u/database->driver database) database
f))) |
TODO: future, expand this to | (doseq [klass [java.net.ConnectException
java.net.NoRouteToHostException
java.net.UnknownHostException
com.mchange.v2.resourcepool.CannotAcquireResourceException
javax.net.ssl.SSLHandshakeException]]
(derive klass ::exception-class-not-to-retry)) |
Whether to log exceptions during a sync step and proceed with the rest of the sync process. This is the default behavior. You can disable this for debugging or test purposes. | (def ^:dynamic *log-exceptions-and-continue?* true) |
Internal implementation of [[with-error-handling]]; use that instead of calling this directly. | (defn do-with-error-handling
([f]
(do-with-error-handling "Error running sync step" f))
([message f]
(try
(f)
(catch Throwable e
(if *log-exceptions-and-continue?*
(do
(log/warn e message)
e)
(throw e)))))) |
Execute The exception classes deriving from | (defmacro with-error-handling
{:style/indent 1}
[message & body]
`(do-with-error-handling ~message (fn [] ~@body))) |
Internal implementation of [[sync-operation]]; use that instead of calling this directly. | (mu/defn do-sync-operation
[operation :- :keyword ; something like `:sync-metadata` or `:refingerprint`
database :- (ms/InstanceOf :model/Database)
message :- ms/NonBlankString
f :- fn?]
((with-duplicate-ops-prevented
operation database
(with-sync-events
operation database
(with-start-and-finish-logging
message
(with-db-logging-disabled
(sync-in-context database
(partial do-with-error-handling (format "Error in sync step %s" message) f)))))))) |
Perform the operations in | (defmacro sync-operation
{:style/indent 3}
[operation database message & body]
`(do-sync-operation ~operation ~database ~message (fn [] ~@body))) |
+----------------------------------------------------------------------------------------------------------------+ | EMOJI PROGRESS METER | +----------------------------------------------------------------------------------------------------------------+ | |
This is primarily provided because it makes sync more fun to look at. The functions below make it fairly simple to log a progress bar with a corresponding emoji when iterating over a sequence of objects during sync, e.g. syncing all the Tables in a given Database. | |
(def ^:private ^:const ^Integer emoji-meter-width 50) | |
(def ^:private progress-emoji ["馃槺" ; face screaming in fear "馃槩" ; crying face "馃槥" ; disappointed face "馃槖" ; unamused face "馃槙" ; confused face "馃槓" ; neutral face "馃槵" ; grimacing face "馃槍" ; relieved face "馃槒" ; smirking face "馃構" ; face savouring delicious food "馃槉" ; smiling face with smiling eyes "馃槏" ; smiling face with heart shaped eyes "馃槑"]) ; smiling face with sunglasses | |
(defn- percent-done->emoji [percent-done] (progress-emoji (int (math/round (* percent-done (dec (count progress-emoji))))))) | |
Create a string that shows progress for something, e.g. a database sync process. (emoji-progress-bar 10 40) -> "[****路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路路] 馃槖 25% | (defn emoji-progress-bar
[completed total log-every-n]
(let [percent-done (float (/ completed total))
filleds (int (* percent-done emoji-meter-width))
blanks (- emoji-meter-width filleds)]
(when (or (zero? (mod completed log-every-n))
(= completed total))
(str "["
(str/join (repeat filleds "*"))
(str/join (repeat blanks "路"))
(format "] %s %3.0f%%" (u/emoji (percent-done->emoji percent-done)) (* percent-done 100.0)))))) |
Run BODY with access to a function that makes using our amazing emoji-progress-bar easy like Sunday morning. Calling the function will return the approprate string output for logging and automatically increment an internal counter as needed. (with-emoji-progress-bar [progress-bar 10] (dotimes [i 10] (println (progress-bar)))) | (defmacro with-emoji-progress-bar
{:style/indent 1}
[[emoji-progress-fn-binding total-count] & body]
`(let [finished-count# (atom 0)
total-count# ~total-count
log-every-n# (Math/ceil (/ total-count# 10))
~emoji-progress-fn-binding (fn [] (emoji-progress-bar (swap! finished-count# inc) total-count# log-every-n#))]
~@body)) |
+----------------------------------------------------------------------------------------------------------------+ | INITIAL SYNC STATUS | +----------------------------------------------------------------------------------------------------------------+ | |
If this is the first sync of a database, we need to update the | |
Marks initial sync as complete for this table so that it becomes usable in the UI, if not already set | (defn set-initial-table-sync-complete!
[table]
(when (not= (:initial_sync_status table) "complete")
(t2/update! :model/Table (u/the-id table) {:initial_sync_status "complete"}))) |
(def ^:private sync-tables-kv-args
{:active true
:visibility_type nil}) | |
Marks initial sync for all tables in | (defn set-initial-table-sync-complete-for-db!
[database-or-id]
(t2/update! :model/Table (merge sync-tables-kv-args {:db_id (u/the-id database-or-id)})
{:initial_sync_status "complete"})) |
Marks initial sync as complete for this database so that this is reflected in the UI, if not already set | (defn set-initial-database-sync-complete!
[database]
(when (not= (:initial_sync_status database) "complete")
(t2/update! :model/Database (u/the-id database) {:initial_sync_status "complete"}))) |
Marks initial sync as aborted for this database so that an error can be displayed on the UI | (defn set-initial-database-sync-aborted!
[database]
(when (not= (:initial_sync_status database) "complete")
(t2/update! :model/Database (u/the-id database) {:initial_sync_status "aborted"}))) |
+----------------------------------------------------------------------------------------------------------------+ | OTHER SYNC UTILITY FUNCTIONS | +----------------------------------------------------------------------------------------------------------------+ | |
Returns a clause that can be used inside a HoneySQL :where clause to select all the Tables that should be synced | (def sync-tables-clause
(into [:and] (for [[k v] sync-tables-kv-args]
[:= k v]))) |
Returns a reducible of all the Tables that should go through the sync processes for | (defn reducible-sync-tables
[database-or-id & {:keys [schema-names table-names]}]
(eduction (map t2.realize/realize)
(t2/reducible-select :model/Table
:db_id (u/the-id database-or-id)
{:where [:and sync-tables-clause
(when (seq schema-names) [:in :schema schema-names])
(when (seq table-names) [:in :name table-names])]}))) |
The count of all tables that should be synced for | (defn sync-tables-count
[database-or-id]
(t2/count :model/Table :db_id (u/the-id database-or-id) {:where sync-tables-clause})) |
A reducible collection of all the Tables that should go through the sync processes for | (defn refingerprint-reducible-sync-tables
[database-or-id]
(eduction (map t2.realize/realize)
(t2/reducible-select :model/Table
{:select [:t.*]
:from [[(t2/table-name :model/Table) :t]]
:left-join [[{:select [:table_id
[[:min :last_analyzed] :earliest_last_analyzed]]
:from [(t2/table-name :model/Field)]
:group-by [:table_id]} :sub]
[:= :t.id :sub.table_id]]
:where [:and sync-tables-clause [:= :t.db_id (u/the-id database-or-id)]]
:order-by [[:sub.earliest_last_analyzed :asc]]}))) |
Returns all the Schemas that have their metadata sync'd for | (defn sync-schemas
[database-or-id]
(vec (map :schema (t2/query {:select-distinct [:schema]
:from [:metabase_table]
:where [:and sync-tables-clause [:= :db_id (u/the-id database-or-id)]]})))) |
Return an appropriate string for logging an object in sync logging messages. Should be something like "postgres Database 'test-data'" This function is used all over the sync code to make sure we have easy access to consistently formatted descriptions of various objects. | (defmulti name-for-logging
{:arglists '([instance])}
mi/model) |
(defmethod name-for-logging :model/Database
[{database-name :name, id :id, engine :engine}]
(format "%s Database %s ''%s''" (name engine) (str (or id "")) database-name)) | |
Return an appropriate string for logging a table in sync logging messages. | (defn table-name-for-logging
[& {:keys [id schema name]}]
(format "Table %s ''%s''" (or (str id) "") (str (when (seq schema) (str schema ".")) name))) |
(defmethod name-for-logging :model/Table [table] (table-name-for-logging table)) | |
Return an appropriate string for logging a field in sync logging messages. | (defn field-name-for-logging
[& {:keys [id name]}]
(format "Field %s ''%s''" (or (str id) "") name)) |
(defmethod name-for-logging :model/Field [field] (field-name-for-logging field)) | |
this is used for result metadata stuff. | (defmethod name-for-logging :default [{field-name :name}]
(format "Field ''%s''" field-name)) |
(mu/defn calculate-duration-str :- :string "Given two datetimes, caculate the time between them, return the result as a string" [begin-time :- (ms/InstanceOfClass Temporal) end-time :- (ms/InstanceOfClass Temporal)] (u/format-nanoseconds (.toNanos (t/duration begin-time end-time)))) | |
Metadata common to both sync steps and an entire sync/analyze operation run | (def ^:private TimedSyncMetadata
[:map
[:start-time (ms/InstanceOfClass Temporal)]
[:end-time {:optional true} (ms/InstanceOfClass Temporal)]]) |
(mr/def ::StepRunMetadata
[:merge
TimedSyncMetadata
[:map
[:log-summary-fn [:maybe [:=> [:cat [:ref ::StepRunMetadata]] :string]]]]]) | |
Map with metadata about the step. Contains both generic information like | (def ^:private StepRunMetadata [:ref ::StepRunMetadata]) |
(mr/def ::StepNameWithMetadata [:tuple ;; step name :string ;; step metadata StepRunMetadata]) | |
Pair with the step name and metadata about the completed step run | (def StepNameWithMetadata [:ref ::StepNameWithMetadata]) |
Timing and step information for the entire sync or analyze run | (def ^:private SyncOperationMetadata
[:merge
TimedSyncMetadata
[:map
[:steps [:maybe [:sequential StepNameWithMetadata]]]]]) |
A log summary function takes a | (def ^:private LogSummaryFunction [:=> [:cat StepRunMetadata] :string]) |
Defines a step. | (def ^:private StepDefinition [:map [:sync-fn [:=> [:cat StepRunMetadata] i/DatabaseInstance]] [:step-name :string] [:log-summary-fn [:maybe LogSummaryFunction]]]) |
Creates and returns a step suitable for | (defn create-sync-step
([step-name sync-fn]
(create-sync-step step-name sync-fn nil))
([step-name sync-fn log-summary-fn]
{:sync-fn sync-fn
:step-name step-name
:log-summary-fn (when log-summary-fn
(comp str log-summary-fn))})) |
(mu/defn- run-step-with-metadata :- StepNameWithMetadata
"Runs `step` on `database` returning metadata from the run"
[database :- i/DatabaseInstance
{:keys [step-name sync-fn log-summary-fn] :as _step} :- StepDefinition]
(let [start-time (t/zoned-date-time)
results (do-with-start-and-finish-debug-logging
(format "step ''%s'' for %s"
step-name
(name-for-logging database))
(fn [& args]
(try
(task-history/with-task-history
{:task step-name
:db_id (u/the-id database)
:on-success-info (fn [update-map result]
(if (instance? Throwable result)
(throw result)
(assoc update-map :task_details (dissoc result :start-time :end-time :log-summary-fn))))}
(apply sync-fn database args))
(catch Throwable e
(if *log-exceptions-and-continue?*
(do
(log/warnf e "Error running step ''%s'' for %s" step-name (name-for-logging database))
{:throwable e})
(throw e))))))
end-time (t/zoned-date-time)]
[step-name (assoc results
:start-time start-time
:end-time end-time
:log-summary-fn log-summary-fn)])) | |
The logging logic from | (mu/defn- make-log-sync-summary-str
[operation :- :string
database :- i/DatabaseInstance
{:keys [start-time end-time steps]} :- SyncOperationMetadata]
(str
(apply format
(str "\n#################################################################\n"
"# %s\n"
"# %s\n"
"# %s\n"
"# %s\n")
[(format "Completed %s on %s" operation (:name database))
(format "Start: %s" (u.date/format start-time))
(format "End: %s" (u.date/format end-time))
(format "Duration: %s" (calculate-duration-str start-time end-time))])
(apply str (for [[step-name {:keys [start-time end-time log-summary-fn] :as step-info}] steps]
(apply format (str "# ---------------------------------------------------------------\n"
"# %s\n"
"# %s\n"
"# %s\n"
"# %s\n"
(when log-summary-fn
(format "# %s\n" (log-summary-fn step-info))))
[(format "Completed step ''%s''" step-name)
(format "Start: %s" (u.date/format start-time))
(format "End: %s" (u.date/format end-time))
(format "Duration: %s" (calculate-duration-str start-time end-time))])))
"#################################################################\n")) |
Log a sync/analyze summary message with info from each step | (mu/defn- log-sync-summary [operation :- :string database :- i/DatabaseInstance sync-metadata :- SyncOperationMetadata] ;; Note this needs to either stay nested in the `debug` macro call or be guarded by an log/enabled? ;; call. Constructing the log below requires some work, no need to incur that cost debug logging isn't enabled (log/debug (make-log-sync-summary-str operation database sync-metadata))) |
(defn- do-not-retry-exception? [e]
(or (isa? (class e) ::exception-class-not-to-retry)
(some-> (ex-cause e) recur))) | |
Given the results of a sync step, returns truthy if a non-recoverable exception occurred | (defn abandon-sync?
[step-results]
(when-let [caught-exception (:throwable step-results)]
(do-not-retry-exception? caught-exception))) |
Run | (mu/defn run-sync-operation
[operation :- :string
database :- i/DatabaseInstance
sync-steps :- [:maybe [:sequential StepDefinition]]]
(task-history/with-task-history {:task operation
:db_id (u/the-id database)}
(let [start-time (t/zoned-date-time)
step-metadata (loop [[step-defn & rest-defns] sync-steps
result []]
(let [[step-name r] (run-step-with-metadata database step-defn)
new-result (conj result [step-name r])]
(cond (abandon-sync? r) new-result
(not (seq rest-defns)) new-result
:else (recur rest-defns new-result))))
end-time (t/zoned-date-time)
sync-metadata {:start-time start-time
:end-time end-time
:steps step-metadata}]
(log-sync-summary operation database sync-metadata)
sync-metadata))) |
Similar to a 2-arg call to | (defn sum-numbers
[f coll]
(reduce + (for [item coll
:let [result (f item)]
:when (number? result)]
result))) |
Impl for | (defn sum-for* [results] (reduce + (filter number? results))) |
Basically the same as As an added bonus, unlike normal | (defmacro sum-for
{:style/indent 1}
[[item-binding coll & more-for-bindings] & body]
`(sum-for* (for [~item-binding ~coll
~@more-for-bindings]
(do ~@body)))) |
Can this type be a category or list? | (defn can-be-category-or-list?
[base-type semantic-type]
(not
(or (isa? base-type :type/Temporal)
(isa? base-type :type/Collection)
(isa? base-type :type/Float)
;; Don't let IDs become list Fields (they already can't become categories, because they already have a semantic
;; type). It just doesn't make sense to cache a sequence of numbers since they aren't inherently meaningful
(isa? semantic-type :type/PK)
(isa? semantic-type :type/FK)))) |