Tasks related to running
| (ns metabase.pulse.task.send-pulses (:require [clojure.set :as set] [clojure.string :as str] [clojurewerkz.quartzite.conversion :as qc] [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.cron :as cron] [clojurewerkz.quartzite.triggers :as triggers] [metabase.driver :as driver] [metabase.models.task-history :as task-history] [metabase.pulse.models.pulse :as models.pulse] [metabase.pulse.send :as pulse.send] [metabase.query-processor.timezone :as qp.timezone] [metabase.task :as task] [metabase.util.cron :as u.cron] [metabase.util.log :as log] [metabase.util.malli :as mu] [toucan2.core :as t2]) (:import (java.util TimeZone) (org.quartz CronTrigger TriggerKey))) |
(set! *warn-on-reflection* true) | |
------------------------------------------------ Job: SendPulse ---------------------------------------------------- | |
(def ^:private send-pulse-job-key (jobs/key "metabase.task.send-pulses.send-pulse.job")) | |
(defn- send-pulse-trigger-key
^TriggerKey [pulse-id schedule-map]
(triggers/key (format "metabase.task.send-pulse.trigger.%d.%s"
pulse-id (-> schedule-map
u.cron/schedule-map->cron-string
(str/replace " " "_"))))) | |
(defn- send-pulse-trigger-key->info
[trigger-key]
(let [[_ pulse-id schedule-str] (re-matches #"metabase\.task\.send-pulse\.trigger\.(\d+)\.(.*)" trigger-key)]
{:pulse-id (parse-long pulse-id)
:schedule-map (-> schedule-str
(str/replace "_" " ")
u.cron/cron-string->schedule-map)})) | |
(defn- send-pulse!
[pulse-id channel-ids]
(try
(task-history/with-task-history {:task "send-pulse"
:task_details {:pulse-id pulse-id
:channel-ids (seq channel-ids)}}
(when-let [pulse (models.pulse/retrieve-notification pulse-id :archived false)]
(log/debugf "Starting Pulse Execution: %d" pulse-id)
(pulse.send/send-pulse! pulse :channel-ids channel-ids :async? true)
(log/debugf "Finished Pulse Execution: %d" pulse-id)
:done))
(catch Throwable e
(log/errorf e "Error sending Pulse %d to channel ids: %s" pulse-id (str/join ", " channel-ids))))) | |
(defn- send-trigger-timezone
[]
(or (driver/report-timezone)
(qp.timezone/system-timezone-id)
"UTC")) | |
Build a Quartz trigger to send a pulse to a list of channel-ids. | (mu/defn- send-pulse-trigger
^CronTrigger
([pulse-id :- pos-int?
schedule-map :- u.cron/ScheduleMap
pc-ids :- [:set pos-int?]
timezone :- :string]
(send-pulse-trigger pulse-id schedule-map pc-ids timezone 6))
([pulse-id :- pos-int?
schedule-map :- u.cron/ScheduleMap
pc-ids :- [:set pos-int?]
timezone :- :string
priority :- pos-int?]
(triggers/build
(triggers/with-identity (send-pulse-trigger-key pulse-id schedule-map))
(triggers/for-job send-pulse-job-key)
(triggers/using-job-data {"pulse-id" pulse-id
"channel-ids" pc-ids})
(triggers/with-schedule
(cron/schedule
(cron/cron-schedule (u.cron/schedule-map->cron-string schedule-map))
(cron/in-time-zone (TimeZone/getTimeZone ^String timezone))
;; If the trigger is misfired, fire it immediately and proceed with the next scheduled time.
;; TODO: upon testing, look like re-firing on startup is not working as expected
;;
;; See https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html for more info
(cron/with-misfire-handling-instruction-fire-and-proceed)))
(triggers/with-priority priority)))) |
Delete PulseChannels that have no recipients and no channel set for a pulse, returns the channel ids that were deleted. | (defn- clear-pulse-channels-no-recipients!
[pulse-id]
(when-let [ids-to-delete (seq
(for [channel (t2/select [:model/PulseChannel :id :details :channel_id :channel_type]
:pulse_id pulse-id
:id [:not-in {:select [[:pulse_channel_id :id]]
:from :pulse_channel_recipient
:group-by [:pulse_channel_id]
:having [:>= :%count.* [:raw 1]]}])
:when (case (:channel_type channel)
:email
(empty? (get-in channel [:details :emails]))
:slack
(empty? (get-in channel [:details :channel]))
:http
(nil? (:channel_id channel)))]
(:id channel)))]
(log/infof "Deleting %d PulseChannels with id: %s due to having no recipients" (count ids-to-delete) (str/join ", " ids-to-delete))
(t2/delete! :model/PulseChannel :id [:in ids-to-delete])
(set ids-to-delete))) |
Do several things: - Clear PulseChannels that have no recipients and no channel set for a pulse - Send a pulse to a list of channels | (defn- send-pulse!*
[pulse-id channel-ids]
(let [cleared-channel-ids (clear-pulse-channels-no-recipients! pulse-id)
to-send-channel-ids (set/difference channel-ids cleared-channel-ids)
to-send-enabled-channel-ids (t2/select-pks-set :model/PulseChannel :id [:in to-send-channel-ids] :enabled true)]
(if (seq to-send-enabled-channel-ids)
(send-pulse! pulse-id to-send-enabled-channel-ids)
(log/infof "Skip sending pulse %d because all channels have no recipients" pulse-id)))) |
Update the timezone of all SendPulse triggers if the report timezone changes. called in [driver/report-timezone] setter | (defn update-send-pulse-triggers-timezone!
[]
(let [triggers (-> send-pulse-job-key task/job-info :triggers)
new-timezone (send-trigger-timezone)]
(doseq [trigger triggers
:when (not= new-timezone (:timezone trigger))] ; skip if timezone is the same
(let [trigger-key (:key trigger)
channel-ids (get-in trigger [:data "channel-ids"])
{:keys [pulse-id
schedule-map]} (send-pulse-trigger-key->info trigger-key)]
(log/infof "Updating timezone of trigger %s to %s. Was: %s" trigger-key new-timezone (:timezone trigger))
(task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids new-timezone (:priority trigger))))))) |
Triggers that send a pulse to a list of channels at a specific time | (jobs/defjob
SendPulse
[context]
(let [{:strs [pulse-id channel-ids]} (qc/from-job-data context)]
(send-pulse!* pulse-id channel-ids))) |
(declare update-send-pulse-trigger-if-needed!) | |
Update send pulse triggers for all active pulses. Called once when Metabase starts up to create triggers for all existing PulseChannels | (defn init-dashboard-subscription-triggers!
[]
(assert (task/scheduler) "Scheduler must be started before initializing SendPulse triggers")
(task/delete-all-triggers-of-job! send-pulse-job-key)
(let [trigger-slot->pc-ids (as-> (t2/select :model/PulseChannel
{:select [:pc.*]
:from [[:pulse_channel :pc]]
:left-join [[:pulse :p] [:= :pc.pulse_id :p.id]
[:report_dashboard :d] [:= :p.dashboard_id :d.id]]
:where [:and
[:= :pc.enabled true]
;; only do this for dashboard subscriptions, alert has been
;; migrated to notifications
[:not= :p.dashboard_id nil]
[:= :d.archived false]]})
results
(group-by #(select-keys % [:pulse_id :schedule_type :schedule_day :schedule_hour :schedule_frame]) results)
(update-vals results #(map :id %)))]
(doseq [[{:keys [pulse_id] :as schedule-map} pc-ids] trigger-slot->pc-ids]
(update-send-pulse-trigger-if-needed! pulse_id schedule-map :add-pc-ids (set pc-ids))))) |
--------------------------------------------- Helpers ------------------------------------------- | |
Update send pulse trigger of a pulse for a specific schedule map with new pulse channel ids. Send Pulse triggers are grouped by pulse id and schedule time, meaning PulseChannels of a Pulse that scheduled to run at the same time of will be send together. This function will updates the corresponding trigger if PulseChannels changes.
called by PulseChannel hooks | (defn update-send-pulse-trigger-if-needed!
[pulse-id schedule-map & {:keys [add-pc-ids remove-pc-ids]}]
(let [schedule-map (update-vals schedule-map
#(if (keyword? %)
(name %)
%))
trigger-key (send-pulse-trigger-key pulse-id schedule-map)
;; there should be at most one existing trigger
existing-trigger (->> (-> send-pulse-job-key task/job-info :triggers)
(filter #(= (:key %) (.getName ^TriggerKey trigger-key)))
first)
existing-pc-ids (some-> existing-trigger :data (get "channel-ids") set)
new-pc-ids (if (some? existing-pc-ids)
(cond-> existing-pc-ids
(some? add-pc-ids) (set/union existing-pc-ids (set add-pc-ids))
(some? remove-pc-ids) (set/difference (set remove-pc-ids)))
(set add-pc-ids))]
(cond
;; no op when new-pc-ids doesnt't change
(= new-pc-ids existing-pc-ids) nil
;; delete if no new pc-ids and there is an existing trigger
(and (empty? new-pc-ids)
(some? existing-pc-ids))
(do
(log/infof "Deleting trigger %s for pulse %d" trigger-key pulse-id)
(task/delete-trigger! trigger-key))
;; delete then create if pc ids changes
(and (seq new-pc-ids)
(not= new-pc-ids existing-pc-ids))
(do
(log/infof "Updating Send Pulse trigger %s for pulse %d with new pc-ids: %s, was: %s " trigger-key pulse-id new-pc-ids existing-pc-ids)
(task/delete-trigger! trigger-key)
(task/add-trigger! (send-pulse-trigger pulse-id schedule-map new-pc-ids (send-trigger-timezone))))))) |
Find all active Dashboard Subscriptino channels, group them by pulse-id and schedule time and create a trigger for each. Do this every startup to make sure all active pulse channels are triggered correctly. | (jobs/defjob InitSendPulseTriggers [_context] (log/info "Initializing SendPulse triggers for dashboard subscriptions") (init-dashboard-subscription-triggers!)) |
-------------------------------------------------- Task init ------------------------------------------------ | |
(defmethod task/init! ::SendPulses [_]
(let [send-pulse-job (jobs/build
(jobs/with-identity send-pulse-job-key)
(jobs/with-description "Send Pulse")
(jobs/of-type SendPulse)
(jobs/store-durably))
init-job (jobs/build
(jobs/of-type InitSendPulseTriggers)
(jobs/with-identity (jobs/key "metabase.task.send-pulses.init-send-pulse-triggers.job"))
(jobs/store-durably))
init-trigger (triggers/build
;; run once on startup
(triggers/with-identity (triggers/key "metabase.task.send-pulses.init-send-pulse-triggers.trigger"))
(triggers/start-now))]
(task/add-job! send-pulse-job)
(task/schedule-task! init-job init-trigger))) | |