(ns metabase.task.notification (:require [clojure.data :refer [diff]] [clojurewerkz.quartzite.conversion :as qc] [clojurewerkz.quartzite.jobs :as jobs] [clojurewerkz.quartzite.schedule.cron :as cron] [clojurewerkz.quartzite.triggers :as triggers] [metabase.driver :as driver] [metabase.models.task-history :as task-history] [metabase.notification.core :as notification] [metabase.query-processor.timezone :as qp.timezone] [metabase.task :as task] [metabase.util.log :as log] [toucan2.core :as t2]) (:import (java.util TimeZone) (org.quartz CronTrigger TriggerKey))) | |
(set! *warn-on-reflection* true) | |
(def ^:private send-notification-job-key (jobs/key "metabase.task.notification.send.job")) | |
(defn- send-notification-timezone [] (or (driver/report-timezone) (qp.timezone/system-timezone-id) "UTC")) | |
(defn- send-notification-trigger-key ^TriggerKey [subscription-id] (triggers/key (format "metabase.task.notification.trigger.subscription.%d" subscription-id))) | |
(defn- send-notification-trigger-key->subscription-id [trigger-key] (when-let [[_ m] (re-matches #"metabase\.task\.notification\.trigger\.subscription\.(\d+)" (str trigger-key))] (parse-long m))) | |
Build a Quartz Trigger for | (defn- build-trigger ^CronTrigger [subscription-id cron-schedule] (triggers/build (triggers/with-description (format "Notification Subscription %d" subscription-id)) (triggers/with-identity (send-notification-trigger-key subscription-id)) (triggers/using-job-data {"subscription-id" subscription-id}) (triggers/for-job send-notification-job-key) (triggers/start-now) (triggers/with-schedule (cron/schedule (cron/cron-schedule cron-schedule) (cron/in-time-zone (TimeZone/getTimeZone ^String (send-notification-timezone))) ;; We want to fire the trigger once even if the previous triggers missed (cron/with-misfire-handling-instruction-fire-and-proceed))) ;; higher than sync (triggers/with-priority 6))) |
(defn- create-new-trigger! [{:keys [id cron_schedule] :as _notification-subscription}] (task/add-trigger! (build-trigger id cron_schedule))) | |
Update the trigger for a notification subscription if it exists and needs to be updated. | (defn update-subscription-trigger! [{:keys [id type cron_schedule] :as notification-subscription}] (let [existing-trigger (first (task/existing-triggers send-notification-job-key (send-notification-trigger-key id)))] (cond ;; delete trigger if type changes (and (not= type :notification-subscription/cron) existing-trigger) (do (log/infof "Deleting trigger for subscription %d because of type changes" id) (task/delete-trigger! (-> existing-trigger :key triggers/key))) ;; do nothing if type is not cron (not= type :notification-subscription/cron) nil ;; create new if there is no existing trigger (not existing-trigger) (do (log/infof "Creating new trigger for subscription %d with schedule %s" id cron_schedule) (create-new-trigger! notification-subscription)) (not= cron_schedule (:schedule existing-trigger)) (do (log/infof "Rescheduling trigger for subscription %d from %s to %s" id (:schedule existing-trigger) cron_schedule) (task/delete-trigger! (-> existing-trigger :key triggers/key)) (create-new-trigger! notification-subscription)) :else (log/infof "No changes to trigger for subscription %d" id)))) |
Delete the trigger for a notification subscription. | (defn delete-trigger-for-subscription! [notification-subscription-id] (when-first [trigger (task/existing-triggers send-notification-job-key (send-notification-trigger-key notification-subscription-id))] (log/infof "Deleting trigger for subscription %d" notification-subscription-id) (task/delete-trigger! (-> trigger :key triggers/key)))) |
(defn- send-notification* [subscription-id] (let [subscription (t2/select-one :model/NotificationSubscription subscription-id) notification-id (:notification_id subscription) notification (t2/select-one :model/Notification notification-id)] (cond (:active notification) (try (log/infof "Sending notification %d for subscription %d" notification-id subscription-id) (task-history/with-task-history {:task "notification-trigger" :task_details {:trigger_type :notification-subscription/cron :notification_subscription_id subscription-id :cron_schedule (:cron_schedule subscription) :notification_ids [notification-id]}} (notification/send-notification! notification :notification/sync? true)) (log/infof "Sent notification %d for subscription %d" notification-id subscription-id) (catch Exception e (log/errorf e "Failed to send notification %d for subscription %d" notification-id subscription-id) (throw e))) (nil? notification) (do (log/warnf "Skipping and deleting trigger for subscription %d because it does not exist." subscription-id) (delete-trigger-for-subscription! subscription-id)) (not (:active notification)) (do (log/warnf "Skipping and deleting trigger for subscription %d because the notification is deactivated" subscription-id) (delete-trigger-for-subscription! subscription-id))))) | |
Update the timezone of all SendPulse triggers if the report timezone changes. called in [driver/report-timezone] setter | (defn update-send-notification-triggers-timezone! [] (let [triggers (-> send-notification-job-key task/job-info :triggers) new-timezone (send-notification-timezone) subscription-id->cron (t2/select-fn->fn :id :cron_schedule :model/NotificationSubscription :type :notification-subscription/cron)] (doseq [trigger triggers :when (not= new-timezone (:timezone trigger))] ; skip if timezone is the same (let [trigger-key (:key trigger) subscription-id (send-notification-trigger-key->subscription-id trigger-key)] (log/infof "Updating timezone of trigger %s to %s. Was: %s" trigger-key new-timezone (:timezone trigger)) (task/reschedule-trigger! (build-trigger subscription-id (get subscription-id->cron subscription-id))))))) |
Triggers that send a notification for a subscription. | (jobs/defjob SendNotification [context] (let [{:strs [subscription-id]} (qc/from-job-data context)] (send-notification* subscription-id))) |
Initialize all notification subscription triggers. Called when starting the instance. | (defn init-send-notification-triggers! [] (assert (task/scheduler) "Scheduler must be started before initializing SendPulse triggers") ;; Get all existing triggers and subscription IDs (let [existing-triggers (:triggers (task/job-info send-notification-job-key)) existing-triggers-subscription-ids (map #(get-in % [:data "subscription-id"]) existing-triggers) subscription-id->cron (t2/select-pk->fn identity :model/NotificationSubscription :type :notification-subscription/cron) db-subscription-ids (keys subscription-id->cron) [to-delete to-create _to-skip] (diff existing-triggers-subscription-ids db-subscription-ids)] (doseq [subscription-id to-delete] (delete-trigger-for-subscription! subscription-id)) (doseq [subscription-id to-create] (create-new-trigger! (get subscription-id->cron subscription-id))))) |
Find all notification subscriptions with cron schedules and create a trigger for each. Run once on startup. | (jobs/defjob InitNotificationTriggers [_context] (log/info "Initializing SendNotification triggers") (init-send-notification-triggers!)) |
(defmethod task/init! ::SendNotifications [_] (let [send-notification-job (jobs/build (jobs/with-identity send-notification-job-key) (jobs/with-description "Send Notification") (jobs/of-type SendNotification) (jobs/store-durably)) init-notification-triggers-job (jobs/build (jobs/of-type InitNotificationTriggers) (jobs/with-identity (jobs/key "metabase.task.notification.init-notification-triggers.job")) (jobs/store-durably)) init-notification-triggers-trigger (triggers/build (triggers/with-identity (triggers/key "metabase.task.notification.init-notification-triggers.trigger")) ;; run once on startup (triggers/start-now))] (task/add-job! send-notification-job) (task/schedule-task! init-notification-triggers-job init-notification-triggers-trigger))) | |