Tasks related to running Pulses.

SendPulse job will send a pulse to all channels that are scheduled to run at the same time. For example if you have an Alert that has scheduled to send to both slack and emails at 6am, this job will be triggered and send the pulse to both channels.

(ns metabase.pulse.task.send-pulses
   [clojure.set :as set]
   [clojure.string :as str]
   [clojurewerkz.quartzite.conversion :as qc]
   [clojurewerkz.quartzite.jobs :as jobs]
   [clojurewerkz.quartzite.schedule.cron :as cron]
   [clojurewerkz.quartzite.triggers :as triggers]
   [metabase.driver :as driver]
   [metabase.models.task-history :as task-history]
   [metabase.pulse.models.pulse :as models.pulse]
   [metabase.pulse.send :as pulse.send]
   [metabase.query-processor.timezone :as qp.timezone]
   [metabase.task :as task]
   [metabase.util.cron :as u.cron]
   [metabase.util.log :as log]
   [metabase.util.malli :as mu]
   [toucan2.core :as t2])
   (java.util TimeZone)
   (org.quartz CronTrigger TriggerKey)))
(set! *warn-on-reflection* true)

------------------------------------------------ Job: SendPulse ----------------------------------------------------

(def ^:private send-pulse-job-key (jobs/key "metabase.task.send-pulses.send-pulse.job"))
(defn- send-pulse-trigger-key
  ^TriggerKey [pulse-id schedule-map]
  (triggers/key (format "metabase.task.send-pulse.trigger.%d.%s"
                        pulse-id (-> schedule-map
                                     (str/replace " " "_")))))
(defn- send-pulse-trigger-key->info
  (let [[_ pulse-id schedule-str] (re-matches #"metabase\.task\.send-pulse\.trigger\.(\d+)\.(.*)" trigger-key)]
    {:pulse-id     (parse-long pulse-id)
     :schedule-map (-> schedule-str
                       (str/replace "_" " ")
(defn- send-pulse!
  [pulse-id channel-ids]
    (task-history/with-task-history {:task         "send-pulse"
                                     :task_details {:pulse-id    pulse-id
                                                    :channel-ids (seq channel-ids)}}
      (when-let [pulse (models.pulse/retrieve-notification pulse-id :archived false)]
        (log/debugf "Starting Pulse Execution: %d" pulse-id)
        (pulse.send/send-pulse! pulse :channel-ids channel-ids :async? true)
        (log/debugf "Finished Pulse Execution: %d" pulse-id)
    (catch Throwable e
      (log/errorf e "Error sending Pulse %d to channel ids: %s" pulse-id (str/join ", " channel-ids)))))
(defn- send-trigger-timezone
  (or (driver/report-timezone)

Build a Quartz trigger to send a pulse to a list of channel-ids.

(mu/defn- send-pulse-trigger
  ([pulse-id     :- pos-int?
    schedule-map :- u.cron/ScheduleMap
    pc-ids       :- [:set pos-int?]
    timezone     :- :string]
   (send-pulse-trigger pulse-id schedule-map pc-ids timezone 6))
  ([pulse-id     :- pos-int?
    schedule-map :- u.cron/ScheduleMap
    pc-ids       :- [:set pos-int?]
    timezone     :- :string
    priority     :- pos-int?]
    (triggers/with-identity (send-pulse-trigger-key pulse-id schedule-map))
    (triggers/for-job send-pulse-job-key)
    (triggers/using-job-data {"pulse-id"    pulse-id
                              "channel-ids" pc-ids})
      (cron/cron-schedule (u.cron/schedule-map->cron-string schedule-map))
      (cron/in-time-zone (TimeZone/getTimeZone ^String timezone))
      ;; If the trigger is misfired, fire it immediately and proceed with the next scheduled time.
      ;; TODO: upon testing, look like re-firing on startup is not working as expected
      ;; See https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html for more info
    (triggers/with-priority priority))))

Delete PulseChannels that have no recipients and no channel set for a pulse, returns the channel ids that were deleted.

(defn- clear-pulse-channels-no-recipients!
  (when-let [ids-to-delete (seq
                            (for [channel (t2/select [:model/PulseChannel :id :details :channel_id :channel_type]
                                                     :pulse_id pulse-id
                                                     :id [:not-in {:select   [[:pulse_channel_id :id]]
                                                                   :from     :pulse_channel_recipient
                                                                   :group-by [:pulse_channel_id]
                                                                   :having   [:>= :%count.* [:raw 1]]}])
                                  :when  (case (:channel_type channel)
                                           (empty? (get-in channel [:details :emails]))
                                           (empty? (get-in channel [:details :channel]))
                                           (nil? (:channel_id channel)))]
                              (:id channel)))]
    (log/infof "Deleting %d PulseChannels with id: %s due to having no recipients" (count ids-to-delete) (str/join ", " ids-to-delete))
    (t2/delete! :model/PulseChannel :id [:in ids-to-delete])
    (set ids-to-delete)))

Do several things: - Clear PulseChannels that have no recipients and no channel set for a pulse - Send a pulse to a list of channels

(defn- send-pulse!*
  [pulse-id channel-ids]
  (let [cleared-channel-ids         (clear-pulse-channels-no-recipients! pulse-id)
        to-send-channel-ids         (set/difference channel-ids cleared-channel-ids)
        to-send-enabled-channel-ids (t2/select-pks-set :model/PulseChannel :id [:in to-send-channel-ids] :enabled true)]
    (if (seq to-send-enabled-channel-ids)
      (send-pulse! pulse-id to-send-enabled-channel-ids)
      (log/infof "Skip sending pulse %d because all channels have no recipients" pulse-id))))

Update the timezone of all SendPulse triggers if the report timezone changes.

called in [driver/report-timezone] setter

(defn update-send-pulse-triggers-timezone!
  (let [triggers     (-> send-pulse-job-key task/job-info :triggers)
        new-timezone (send-trigger-timezone)]
    (doseq [trigger triggers
            :when (not= new-timezone (:timezone trigger))] ; skip if timezone is the same
      (let [trigger-key            (:key trigger)
            channel-ids            (get-in trigger [:data "channel-ids"])
            {:keys [pulse-id
                    schedule-map]} (send-pulse-trigger-key->info trigger-key)]
        (log/infof "Updating timezone of trigger %s to %s. Was: %s" trigger-key new-timezone (:timezone trigger))
        (task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids new-timezone (:priority trigger)))))))

Triggers that send a pulse to a list of channels at a specific time

  (let [{:strs [pulse-id channel-ids]} (qc/from-job-data context)]
    (send-pulse!* pulse-id channel-ids)))
(declare update-send-pulse-trigger-if-needed!)

Update send pulse triggers for all active pulses. Called once when Metabase starts up to create triggers for all existing PulseChannels

(defn init-dashboard-subscription-triggers!
  (assert (task/scheduler) "Scheduler must be started before initializing SendPulse triggers")
  (task/delete-all-triggers-of-job! send-pulse-job-key)
  (let [trigger-slot->pc-ids (as-> (t2/select :model/PulseChannel
                                              {:select    [:pc.*]
                                               :from      [[:pulse_channel :pc]]
                                               :left-join [[:pulse :p] [:= :pc.pulse_id :p.id]
                                                           [:report_dashboard :d] [:= :p.dashboard_id :d.id]]
                                               :where     [:and
                                                           [:= :pc.enabled true]
                                                           ;; only do this for dashboard subscriptions, alert has been
                                                           ;; migrated to notifications
                                                           [:not= :p.dashboard_id nil]
                                                           [:= :d.archived false]]})
                               (group-by #(select-keys % [:pulse_id :schedule_type :schedule_day :schedule_hour :schedule_frame]) results)
                               (update-vals results #(map :id %)))]
    (doseq [[{:keys [pulse_id] :as schedule-map} pc-ids] trigger-slot->pc-ids]
      (update-send-pulse-trigger-if-needed! pulse_id schedule-map :add-pc-ids (set pc-ids)))))

--------------------------------------------- Helpers -------------------------------------------

Update send pulse trigger of a pulse for a specific schedule map with new pulse channel ids.

Send Pulse triggers are grouped by pulse id and schedule time, meaning PulseChannels of a Pulse that scheduled to run at the same time of will be send together. This function will updates the corresponding trigger if PulseChannels changes.

  • To add 2 pulse channels to a trigger (update-send-pulse-trigger-if-needed! pulse-id schedule-map :add-pc-ids #{1 2}))

  • To remove 2 pulse channels from a trigger (update-send-pulse-trigger-if-needed! pulse-id schedule-map :remove-pc-ids #{1 2}))

called by PulseChannel hooks

(defn update-send-pulse-trigger-if-needed!
  [pulse-id schedule-map & {:keys [add-pc-ids remove-pc-ids]}]
  (let [schedule-map     (update-vals schedule-map
                                      #(if (keyword? %)
                                         (name %)
        trigger-key      (send-pulse-trigger-key pulse-id schedule-map)
        ;; there should be at most one existing trigger
        existing-trigger (->> (-> send-pulse-job-key task/job-info :triggers)
                              (filter #(= (:key %) (.getName ^TriggerKey trigger-key)))
        existing-pc-ids (some-> existing-trigger :data (get "channel-ids") set)
        new-pc-ids      (if (some? existing-pc-ids)
                          (cond-> existing-pc-ids
                            (some? add-pc-ids)    (set/union existing-pc-ids (set add-pc-ids))
                            (some? remove-pc-ids) (set/difference (set remove-pc-ids)))
                          (set add-pc-ids))]
     ;; no op when new-pc-ids doesnt't change
      (= new-pc-ids existing-pc-ids) nil
     ;; delete if no new pc-ids and there is an existing trigger
      (and (empty? new-pc-ids)
           (some? existing-pc-ids))
        (log/infof "Deleting trigger %s for pulse %d" trigger-key pulse-id)
        (task/delete-trigger! trigger-key))
     ;; delete then create if pc ids changes
      (and (seq new-pc-ids)
           (not= new-pc-ids existing-pc-ids))
        (log/infof "Updating Send Pulse trigger %s for pulse %d with new pc-ids: %s, was: %s " trigger-key pulse-id new-pc-ids existing-pc-ids)
        (task/delete-trigger! trigger-key)
        (task/add-trigger! (send-pulse-trigger pulse-id schedule-map new-pc-ids (send-trigger-timezone)))))))

Find all active Dashboard Subscriptino channels, group them by pulse-id and schedule time and create a trigger for each. Do this every startup to make sure all active pulse channels are triggered correctly.

  (log/info "Initializing SendPulse triggers for dashboard subscriptions")

-------------------------------------------------- Task init ------------------------------------------------

(defmethod task/init! ::SendPulses [_]
  (let [send-pulse-job (jobs/build
                        (jobs/with-identity send-pulse-job-key)
                        (jobs/with-description "Send Pulse")
                        (jobs/of-type SendPulse)
        init-job       (jobs/build
                        (jobs/of-type InitSendPulseTriggers)
                        (jobs/with-identity (jobs/key "metabase.task.send-pulses.init-send-pulse-triggers.job"))
        init-trigger   (triggers/build
                        ;; run once on startup
                        (triggers/with-identity (triggers/key "metabase.task.send-pulses.init-send-pulse-triggers.trigger"))
    (task/add-job! send-pulse-job)
    (task/schedule-task! init-job init-trigger)))