(ns metabase.notification.send (:require [java-time.api :as t] [metabase.analytics.prometheus :as prometheus] [metabase.channel.core :as channel] [metabase.config :as config] [metabase.models.notification :as models.notification] [metabase.models.setting :as setting] [metabase.models.task-history :as task-history] [metabase.notification.payload.core :as notification.payload] [metabase.util :as u] [metabase.util.log :as log] [metabase.util.malli :as mu] [metabase.util.retry :as retry] [toucan2.core :as t2]) (:import (java.util.concurrent Callable Executors ExecutorService) (org.apache.commons.lang3.concurrent BasicThreadFactory$Builder))) | |
(set! *warn-on-reflection* true) | |
(defn- handler->channel-name [{:keys [channel_type channel_id]}] (if channel_id (str (u/qualified-name channel_type) " " channel_id) (u/qualified-name channel_type))) | |
The size of the thread pool used to send notifications. | (setting/defsetting notification-thread-pool-size :default 3 :export? false :type :integer :visibility :internal) |
(defonce ^:private pool (delay (Executors/newFixedThreadPool (notification-thread-pool-size) (.build (doto (BasicThreadFactory$Builder.) (.namingPattern "send-notification-thread-pool-%d")))))) | |
(def ^:private default-retry-config {:max-attempts (if config/is-dev? 2 7) :initial-interval-millis 500 :multiplier 2.0 :randomization-factor 0.1 :max-interval-millis 30000}) | |
(defn- should-retry-sending? [exception channel-type] (not (and (= :channel/slack channel-type) (contains? (:errors (ex-data exception)) :slack-token)))) | |
(defn- channel-send-retrying! [notification-id payload-type handler message] (let [channel (or (:channel handler) {:type (:channel_type handler)}) channel-type (:type channel)] (try (let [#_notification-id #_(:notification_id handler) retry-config default-retry-config retry-errors (volatile! []) retry-report (fn [] {:attempted_retries (count @retry-errors) ;; we want the last retry to be the most recent :retry_errors (reverse @retry-errors)}) send! (fn [] (try (channel/send! channel message) (catch Exception e (when (should-retry-sending? e (:type channel)) (vswap! retry-errors conj {:message (u/strip-error e) :timestamp (t/offset-date-time)}) (log/warnf e "[Notification %d] Failed to send to channel %s , retrying..." notification-id (handler->channel-name handler)) (throw e))))) retrier (retry/make retry-config)] (log/debugf "[Notification %d] Sending a message to channel %s" notification-id (handler->channel-name handler)) (task-history/with-task-history {:task "channel-send" :on-success-info (fn [update-map _result] (cond-> update-map (seq @retry-errors) (update :task_details merge (retry-report)))) :on-fail-info (fn [update-map _result] (update update-map :task_details merge (retry-report))) :task_details {:retry_config retry-config :channel_id (:id channel) :channel_type (:type channel) :template_id (:template_id handler) :notification_id notification-id :notification_type payload-type :recipient_ids (map :id (:recipients handler))}} (retrier send!) (log/debugf "[Notification %d] Sent to channel %s with %d retries" notification-id (handler->channel-name handler) (count @retry-errors)))) (prometheus/inc! :metabase-notification/channel-send-ok {:payload-type payload-type :channel-type channel-type}) (catch Throwable e (prometheus/inc! :metabase-notification/channel-send-error {:payload-type payload-type :channel-type channel-type}) (log/errorf e "[Notification %d] Error sending notification!" notification-id))))) | |
(defn- hydrate-notification [notification-info] (case (:payload_type notification-info) (:notification/system-event :notification/testing :notification/card) (cond-> notification-info (t2/instance? notification-info) models.notification/hydrate-notification) ;; :notification/dashboard is still on pulse, so we expect it to self-contained. see [[metabase.pulse.send]] notification-info)) | |
Performs post-notification actions based on the notification type. | (defmulti do-after-notification-sent {:arglists '([notification-info notification-payload])} (fn [notification-info _notification-payload] (:payload_type notification-info))) |
(defmethod do-after-notification-sent :default [_notification-info _notification-payload] nil) | |
(def ^:private payload-labels (for [payload-type (keys (methods notification.payload/payload))] {:payload-type payload-type})) (def ^:private payload-channel-labels (for [[channel-type payload-type] (keys (methods channel/render-notification))] {:payload-type payload-type :channel-type channel-type})) | |
(defmethod prometheus/known-labels :metabase-notification/send-ok [_] payload-labels) (defmethod prometheus/known-labels :metabase-notification/send-error [_] payload-labels) (defmethod prometheus/known-labels :metabase-notification/channel-send-ok [_] payload-channel-labels) (defmethod prometheus/known-labels :metabase-notification/channel-send-error [_] payload-channel-labels) | |
(defn- since-trigger-ms [notification-info] (some-> notification-info meta :notification/triggered-at-ns u/since-ms)) | |
Send the notification to all handlers synchronously. Do not use this directly, use send-notification! instead. | (mu/defn send-notification-sync! [{:keys [id payload_type] :as notification-info} :- ::notification.payload/Notification] (u/with-timer-ms [duration-ms-fn] (when-let [wait-time (since-trigger-ms notification-info)] (prometheus/observe! :metabase-notification/wait-duration-ms {:payload-type payload_type} wait-time)) (try (log/infof "[Notification %d] Sending" id) (prometheus/inc! :metabase-notification/concurrent-tasks) (let [hydrated-notification (hydrate-notification notification-info) handlers (:handlers hydrated-notification)] (task-history/with-task-history {:task "notification-send" :task_details {:notification_id id :notification_handlers (map #(select-keys % [:id :channel_type :channel_id :template_id]) handlers)}} (let [notification-payload (notification.payload/notification-payload (dissoc hydrated-notification :handlers))] (if (notification.payload/should-send-notification? notification-payload) (do (log/debugf "[Notification %d] Found %d handlers" id (count handlers)) (doseq [handler handlers] (try (let [channel-type (:channel_type handler) messages (channel/render-notification channel-type notification-payload (:template handler) (:recipients handler))] (log/debugf "[Notification %d] Got %d messages for channel %s with template %d" id (count messages) (handler->channel-name handler) (-> handler :template :id)) (doseq [message messages] (log/infof "[Notification %d] Sending message to channel %s" id (:channel_type handler)) (channel-send-retrying! id payload_type handler message))) (catch Exception e (log/warnf e "[Notification %d] Error sending to channel %s" id (handler->channel-name handler)))))) (log/infof "[Notification %d] Skipping" id)) (do-after-notification-sent notification-info notification-payload) (log/infof "[Notification %d] Sent successfully" id) (prometheus/inc! :metabase-notification/send-ok {:payload-type payload_type})))) (catch Exception e (log/errorf e "[Notification %d] Failed to send" id) (prometheus/inc! :metabase-notification/send-error {:payload-type payload_type}) (throw e)) (finally (prometheus/dec! :metabase-notification/concurrent-tasks))) (prometheus/observe! :metabase-notification/send-duration-ms {:payload-type payload_type} (duration-ms-fn)) (when-let [total-time (since-trigger-ms notification-info)] (prometheus/observe! :metabase-notification/total-duration-ms {:payload-type payload_type} total-time)) nil)) |
Send a notification asynchronously. | (mu/defn send-notification-async! [notification :- ::notification.payload/Notification] (.submit ^ExecutorService @pool ^Callable (fn [] (send-notification-sync! notification))) nil) |