(ns metabase.notification.send
  (:require
   [java-time.api :as t]
   [metabase.analytics.prometheus :as prometheus]
   [metabase.channel.core :as channel]
   [metabase.config :as config]
   [metabase.models.notification :as models.notification]
   [metabase.models.setting :as setting]
   [metabase.models.task-history :as task-history]
   [metabase.notification.payload.core :as notification.payload]
   [metabase.util :as u]
   [metabase.util.log :as log]
   [metabase.util.malli :as mu]
   [metabase.util.retry :as retry]
   [toucan2.core :as t2])
  (:import
   (java.util.concurrent Callable Executors ExecutorService)
   (org.apache.commons.lang3.concurrent BasicThreadFactory$Builder)))
(set! *warn-on-reflection* true)
(defn- handler->channel-name
  [{:keys [channel_type channel_id]}]
  (if channel_id
    (str (u/qualified-name channel_type) " " channel_id)
    (u/qualified-name channel_type)))

The size of the thread pool used to send notifications.

(setting/defsetting notification-thread-pool-size
  :default    3
  :export?    false
  :type       :integer
  :visibility :internal)
(defonce ^:private pool
  (delay (Executors/newFixedThreadPool
          (notification-thread-pool-size)
          (.build
           (doto (BasicThreadFactory$Builder.)
             (.namingPattern "send-notification-thread-pool-%d"))))))
(def ^:private default-retry-config
  {:max-attempts            (if config/is-dev? 2 7)
   :initial-interval-millis 500
   :multiplier              2.0
   :randomization-factor    0.1
   :max-interval-millis     30000})
(defn- should-retry-sending?
  [exception channel-type]
  (not (and (= :channel/slack channel-type)
            (contains? (:errors (ex-data exception)) :slack-token))))
(defn- channel-send-retrying!
  [notification-id payload-type handler message]
  (let [channel      (or (:channel handler)
                         {:type (:channel_type handler)})
        channel-type (:type channel)]
    (try
      (let [#_notification-id #_(:notification_id handler)
            retry-config    default-retry-config
            retry-errors    (volatile! [])
            retry-report    (fn []
                              {:attempted_retries (count @retry-errors)
                               ;; we want the last retry to be the most recent
                               :retry_errors       (reverse @retry-errors)})
            send!           (fn []
                              (try
                                (channel/send! channel message)
                                (catch Exception e
                                  (when (should-retry-sending? e (:type channel))
                                    (vswap! retry-errors conj {:message   (u/strip-error e)
                                                               :timestamp (t/offset-date-time)})
                                    (log/warnf e "[Notification %d] Failed to send to channel %s , retrying..."
                                               notification-id (handler->channel-name handler))
                                    (throw e)))))
            retrier         (retry/make retry-config)]
        (log/debugf "[Notification %d] Sending a message to channel %s" notification-id (handler->channel-name handler))
        (task-history/with-task-history {:task            "channel-send"
                                         :on-success-info (fn [update-map _result]
                                                            (cond-> update-map
                                                              (seq @retry-errors)
                                                              (update :task_details merge (retry-report))))
                                         :on-fail-info    (fn [update-map _result]
                                                            (update update-map :task_details merge (retry-report)))
                                         :task_details    {:retry_config      retry-config
                                                           :channel_id        (:id channel)
                                                           :channel_type      (:type channel)
                                                           :template_id       (:template_id handler)
                                                           :notification_id   notification-id
                                                           :notification_type payload-type
                                                           :recipient_ids     (map :id (:recipients handler))}}
          (retrier send!)
          (log/debugf "[Notification %d] Sent to channel %s with %d retries"
                      notification-id (handler->channel-name handler) (count @retry-errors))))
      (prometheus/inc! :metabase-notification/channel-send-ok {:payload-type payload-type
                                                               :channel-type channel-type})
      (catch Throwable e
        (prometheus/inc! :metabase-notification/channel-send-error {:payload-type payload-type
                                                                    :channel-type channel-type})
        (log/errorf e "[Notification %d] Error sending notification!" notification-id)))))
(defn- hydrate-notification
  [notification-info]
  (case (:payload_type notification-info)
    (:notification/system-event :notification/testing :notification/card)
    (cond-> notification-info
      (t2/instance? notification-info)
      models.notification/hydrate-notification)
    ;; :notification/dashboard is still on pulse, so we expect it to self-contained. see [[metabase.pulse.send]]
    notification-info))

Performs post-notification actions based on the notification type.

(defmulti do-after-notification-sent
  {:arglists '([notification-info notification-payload])}
  (fn [notification-info _notification-payload]
    (:payload_type notification-info)))
(defmethod do-after-notification-sent :default [_notification-info _notification-payload] nil)
(def ^:private payload-labels         (for [payload-type (keys (methods notification.payload/payload))]
                                        {:payload-type payload-type}))
(def ^:private payload-channel-labels (for [[channel-type payload-type] (keys (methods channel/render-notification))]
                                        {:payload-type payload-type
                                         :channel-type channel-type}))
(defmethod prometheus/known-labels :metabase-notification/send-ok [_] payload-labels)
(defmethod prometheus/known-labels :metabase-notification/send-error [_] payload-labels)
(defmethod prometheus/known-labels :metabase-notification/channel-send-ok [_] payload-channel-labels)
(defmethod prometheus/known-labels :metabase-notification/channel-send-error [_] payload-channel-labels)
(defn- since-trigger-ms
  [notification-info]
  (some-> notification-info meta :notification/triggered-at-ns u/since-ms))

Send the notification to all handlers synchronously. Do not use this directly, use send-notification! instead.

(mu/defn send-notification-sync!
  [{:keys [id payload_type] :as notification-info} :- ::notification.payload/Notification]
  (u/with-timer-ms
    [duration-ms-fn]
    (when-let [wait-time (since-trigger-ms notification-info)]
      (prometheus/observe! :metabase-notification/wait-duration-ms {:payload-type payload_type} wait-time))
    (try
      (log/infof "[Notification %d] Sending" id)
      (prometheus/inc! :metabase-notification/concurrent-tasks)
      (let [hydrated-notification (hydrate-notification notification-info)
            handlers              (:handlers hydrated-notification)]
        (task-history/with-task-history {:task          "notification-send"
                                         :task_details {:notification_id       id
                                                        :notification_handlers (map #(select-keys % [:id :channel_type :channel_id :template_id]) handlers)}}
          (let [notification-payload (notification.payload/notification-payload (dissoc hydrated-notification :handlers))]
            (if (notification.payload/should-send-notification? notification-payload)
              (do
                (log/debugf "[Notification %d] Found %d handlers" id (count handlers))
                (doseq [handler handlers]
                  (try
                    (let [channel-type (:channel_type handler)
                          messages     (channel/render-notification
                                        channel-type
                                        notification-payload
                                        (:template handler)
                                        (:recipients handler))]
                      (log/debugf "[Notification %d] Got %d messages for channel %s with template %d"
                                  id (count messages)
                                  (handler->channel-name handler)
                                  (-> handler :template :id))
                      (doseq [message messages]
                        (log/infof "[Notification %d] Sending message to channel %s"
                                   id (:channel_type handler))
                        (channel-send-retrying! id payload_type handler message)))
                    (catch Exception e
                      (log/warnf e "[Notification %d] Error sending to channel %s"
                                 id (handler->channel-name handler))))))
              (log/infof "[Notification %d] Skipping" id))
            (do-after-notification-sent notification-info notification-payload)
            (log/infof "[Notification %d] Sent successfully" id)
            (prometheus/inc! :metabase-notification/send-ok {:payload-type payload_type}))))
      (catch Exception e
        (log/errorf e "[Notification %d] Failed to send" id)
        (prometheus/inc! :metabase-notification/send-error {:payload-type payload_type})
        (throw e))
      (finally
        (prometheus/dec! :metabase-notification/concurrent-tasks)))
    (prometheus/observe! :metabase-notification/send-duration-ms {:payload-type payload_type} (duration-ms-fn))
    (when-let [total-time (since-trigger-ms notification-info)]
      (prometheus/observe! :metabase-notification/total-duration-ms {:payload-type payload_type} total-time))
    nil))

Send a notification asynchronously.

(mu/defn send-notification-async!
  [notification :- ::notification.payload/Notification]
  (.submit ^ExecutorService @pool ^Callable
           (fn []
             (send-notification-sync! notification)))
  nil)