Code related to sending Pulses (Alerts or Dashboard Subscriptions). | (ns metabase.pulse.send (:require [metabase.models.interface :as mi] [metabase.pulse.models.pulse :as models.pulse] [metabase.util.cron :as u.cron] [metabase.util.log :as log] [toucan2.core :as t2])) |
(set! *warn-on-reflection* true) | |
+----------------------------------------------------------------------------------------------------------------+ | Creating Notifications To Send | +----------------------------------------------------------------------------------------------------------------+ | |
(defn- alert-or-pulse [pulse] (if (:dashboard_id pulse) :pulse :alert)) | |
(defn- channel-recipients [pulse-channel] (case (keyword (:channel_type pulse-channel)) :slack [{:type :notification-recipient/raw-value :details {:value (get-in pulse-channel [:details :channel])}}] :email (for [recipient (:recipients pulse-channel)] (if-not (:id recipient) {:type :notification-recipient/raw-value :details {:value (:email recipient)}} {:type :notification-recipient/user :user recipient})) :http [] (do (log/warnf "Unknown channel type %s" (:channel_type pulse-channel)) []))) | |
Given a pulse channel, return the channel object. Only supports HTTP channels for now, returns a map with type key for slack and email | (defn- pc->channel [{channel-type :channel_type :as pulse-channel}] (if (= :http (keyword channel-type)) (t2/select-one :model/Channel :id (:channel_id pulse-channel)) {:type (keyword "channel" (name channel-type))})) |
(defn- get-notification-handler [pulse-channel] (let [channel (pc->channel pulse-channel) channel-type (:type channel)] {:channel_type channel-type :channel channel :recipients (channel-recipients pulse-channel)})) | |
(defn- maybe-name [x] (some-> x name)) | |
(defn- notification-info [pulse dashboard pulse-channel] (if (= :pulse (alert-or-pulse pulse)) {:id (:id pulse) :payload_type :notification/dashboard :creator_id (:creator_id pulse) :dashboard_subscription {:id (:id pulse) :dashboard_id (:id dashboard) :parameters (:parameters pulse) :skip_if_empty (:skip_if_empty pulse) :dashboard_subscription_dashcards (map #(merge {:card_id (:id %)} (select-keys % [:include_xls :include_csv :pivot_results :format_rows])) (:cards pulse))} :handlers [(get-notification-handler pulse-channel)]} {:id (:id pulse) :payload_type :notification/card :creator_id (:creator_id pulse) :payload {:id (:id pulse) :card_id (:id (-> pulse :cards first)) :send_once (true? (:alert_first_only pulse)) :send_condition (cond (= "rows" (:alert_condition pulse)) :has_result (:alert_above_goal pulse) :goal_above :else :goal_below)} :subscriptions [{:type :notification-subscription/cron :cron_schedule (u.cron/schedule-map->cron-string (-> pulse-channel (update :schedule_type maybe-name) (update :schedule_day maybe-name) (update :schedule_frame maybe-name)))}] :handlers [(get-notification-handler pulse-channel)]})) | |
(def ^:private send-notification! (requiring-resolve 'metabase.notification.core/send-notification!)) | |
(defn- send-pulse!* [{:keys [channels channel-ids] :as pulse} dashboard async?] (let [;; `channel-ids` is the set of channels to send to now, so only send to those. Note the whole set of channels channels (if (seq channel-ids) (filter #((set channel-ids) (:id %)) channels) channels)] (doseq [pulse-channel channels] (try (send-notification! (notification-info pulse dashboard pulse-channel) :notification/sync? (not async?)) (catch Exception e (log/errorf e "[Pulse %d] Error sending to %s channel" (:id pulse) (:channel_type pulse-channel))))) nil)) | |
Execute and Send a
Example: (send-pulse! pulse) ; Send to all Channels (send-pulse! pulse :channel-ids [312]) ; Send only to Channel with :id = 312 | (defn send-pulse! [{:keys [dashboard_id], :as pulse} & {:keys [channel-ids async?] :or {async? false}}] {:pre [(map? pulse) (integer? (:creator_id pulse))]} (let [dashboard (t2/select-one :model/Dashboard :id dashboard_id) pulse (-> (mi/instance :model/Pulse pulse) ;; This is usually already done by this step, in the `send-pulses` task which uses `retrieve-pulse` ;; to fetch the Pulse. models.pulse/hydrate-notification (merge (when channel-ids {:channel-ids channel-ids})))] (when (not (:archived dashboard)) (send-pulse!* pulse dashboard async?)))) |