(ns metabase.task.notification
  (:require
   [clojure.data :refer [diff]]
   [clojurewerkz.quartzite.conversion :as qc]
   [clojurewerkz.quartzite.jobs :as jobs]
   [clojurewerkz.quartzite.schedule.cron :as cron]
   [clojurewerkz.quartzite.triggers :as triggers]
   [metabase.driver :as driver]
   [metabase.models.task-history :as task-history]
   [metabase.notification.core :as notification]
   [metabase.query-processor.timezone :as qp.timezone]
   [metabase.task :as task]
   [metabase.util.log :as log]
   [toucan2.core :as t2])
  (:import
   (java.util TimeZone)
   (org.quartz CronTrigger TriggerKey)))
(set! *warn-on-reflection* true)
(def ^:private send-notification-job-key (jobs/key "metabase.task.notification.send.job"))
(defn- send-notification-timezone
  []
  (or (driver/report-timezone)
      (qp.timezone/system-timezone-id)
      "UTC"))
(defn- send-notification-trigger-key
  ^TriggerKey [subscription-id]
  (triggers/key (format "metabase.task.notification.trigger.subscription.%d"
                        subscription-id)))
(defn- send-notification-trigger-key->subscription-id
  [trigger-key]
  (when-let [[_ m] (re-matches #"metabase\.task\.notification\.trigger\.subscription\.(\d+)" (str trigger-key))]
    (parse-long m)))

Build a Quartz Trigger for database and task-info if a schedule exists.

(defn- build-trigger
  ^CronTrigger [subscription-id cron-schedule]
  (triggers/build
   (triggers/with-description (format "Notification Subscription %d" subscription-id))
   (triggers/with-identity (send-notification-trigger-key subscription-id))
   (triggers/using-job-data {"subscription-id" subscription-id})
   (triggers/for-job send-notification-job-key)
   (triggers/start-now)
   (triggers/with-schedule
    (cron/schedule
     (cron/cron-schedule cron-schedule)
     (cron/in-time-zone (TimeZone/getTimeZone ^String (send-notification-timezone)))
      ;; We want to fire the trigger once even if the previous triggers  missed
     (cron/with-misfire-handling-instruction-fire-and-proceed)))
   ;; higher than sync
   (triggers/with-priority 6)))
(defn- create-new-trigger!
  [{:keys [id cron_schedule] :as _notification-subscription}]
  (task/add-trigger! (build-trigger id cron_schedule)))

Update the trigger for a notification subscription if it exists and needs to be updated.

(defn update-subscription-trigger!
  [{:keys [id type cron_schedule] :as notification-subscription}]
  (let [existing-trigger (first (task/existing-triggers send-notification-job-key (send-notification-trigger-key id)))]
    (cond
     ;; delete trigger if type changes
      (and
       (not= type :notification-subscription/cron)
       existing-trigger)
      (do
        (log/infof "Deleting trigger for subscription %d because of type changes" id)
        (task/delete-trigger! (-> existing-trigger :key triggers/key)))
     ;; do nothing if type is not cron
      (not= type :notification-subscription/cron)
      nil
     ;; create new if there is no existing trigger
      (not existing-trigger)
      (do
        (log/infof "Creating new trigger for subscription %d with schedule %s" id cron_schedule)
        (create-new-trigger! notification-subscription))
      (not= cron_schedule (:schedule existing-trigger))
      (do
        (log/infof "Rescheduling trigger for subscription %d from %s to %s" id (:schedule existing-trigger) cron_schedule)
        (task/delete-trigger! (-> existing-trigger :key triggers/key))
        (create-new-trigger! notification-subscription))
      :else
      (log/infof "No changes to trigger for subscription %d" id))))

Delete the trigger for a notification subscription.

(defn delete-trigger-for-subscription!
  [notification-subscription-id]
  (when-first [trigger (task/existing-triggers send-notification-job-key (send-notification-trigger-key notification-subscription-id))]
    (log/infof "Deleting trigger for subscription %d" notification-subscription-id)
    (task/delete-trigger! (-> trigger :key triggers/key))))
(defn- send-notification*
  [subscription-id]
  (let [subscription    (t2/select-one :model/NotificationSubscription subscription-id)
        notification-id (:notification_id subscription)
        notification (t2/select-one :model/Notification notification-id)]
    (cond
      (:active notification)
      (try
        (log/infof "Sending notification %d for subscription %d" notification-id subscription-id)
        (task-history/with-task-history {:task         "notification-trigger"
                                         :task_details {:trigger_type                 :notification-subscription/cron
                                                        :notification_subscription_id subscription-id
                                                        :cron_schedule                (:cron_schedule subscription)
                                                        :notification_ids             [notification-id]}}
          (notification/send-notification! notification :notification/sync? true))
        (log/infof "Sent notification %d for subscription %d" notification-id subscription-id)
        (catch Exception e
          (log/errorf e "Failed to send notification %d for subscription %d" notification-id subscription-id)
          (throw e)))
      (nil? notification)
      (do
        (log/warnf "Skipping and deleting trigger for subscription %d because it does not exist." subscription-id)
        (delete-trigger-for-subscription! subscription-id))
      (not (:active notification))
      (do
        (log/warnf "Skipping and deleting trigger for subscription %d because the notification is deactivated" subscription-id)
        (delete-trigger-for-subscription! subscription-id)))))

Update the timezone of all SendPulse triggers if the report timezone changes.

called in [driver/report-timezone] setter

(defn update-send-notification-triggers-timezone!
  []
  (let [triggers     (-> send-notification-job-key task/job-info :triggers)
        new-timezone (send-notification-timezone)
        subscription-id->cron (t2/select-fn->fn :id :cron_schedule :model/NotificationSubscription :type :notification-subscription/cron)]
    (doseq [trigger triggers
            :when (not= new-timezone (:timezone trigger))] ; skip if timezone is the same
      (let [trigger-key     (:key trigger)
            subscription-id (send-notification-trigger-key->subscription-id trigger-key)]
        (log/infof "Updating timezone of trigger %s to %s. Was: %s" trigger-key new-timezone (:timezone trigger))
        (task/reschedule-trigger! (build-trigger subscription-id (get subscription-id->cron subscription-id)))))))

Triggers that send a notification for a subscription.

(jobs/defjob 
  SendNotification
  [context]
  (let [{:strs [subscription-id]} (qc/from-job-data context)]
    (send-notification* subscription-id)))

Initialize all notification subscription triggers.

Called when starting the instance.

(defn init-send-notification-triggers!
  []
  (assert (task/scheduler) "Scheduler must be started before initializing SendPulse triggers")
  ;; Get all existing triggers and subscription IDs
  (let [existing-triggers                  (:triggers (task/job-info send-notification-job-key))
        existing-triggers-subscription-ids (map #(get-in % [:data "subscription-id"]) existing-triggers)
        subscription-id->cron              (t2/select-pk->fn identity :model/NotificationSubscription :type :notification-subscription/cron)
        db-subscription-ids                (keys subscription-id->cron)
        [to-delete to-create _to-skip]     (diff existing-triggers-subscription-ids db-subscription-ids)]
    (doseq [subscription-id to-delete]
      (delete-trigger-for-subscription! subscription-id))
    (doseq [subscription-id to-create]
      (create-new-trigger! (get subscription-id->cron subscription-id)))))

Find all notification subscriptions with cron schedules and create a trigger for each. Run once on startup.

(jobs/defjob
  InitNotificationTriggers
  [_context]
  (log/info "Initializing SendNotification triggers")
  (init-send-notification-triggers!))
(defmethod task/init! ::SendNotifications [_]
  (let [send-notification-job              (jobs/build
                                            (jobs/with-identity send-notification-job-key)
                                            (jobs/with-description "Send Notification")
                                            (jobs/of-type SendNotification)
                                            (jobs/store-durably))
        init-notification-triggers-job     (jobs/build
                                            (jobs/of-type InitNotificationTriggers)
                                            (jobs/with-identity (jobs/key "metabase.task.notification.init-notification-triggers.job"))
                                            (jobs/store-durably))
        init-notification-triggers-trigger (triggers/build
                                            (triggers/with-identity (triggers/key "metabase.task.notification.init-notification-triggers.trigger"))
                                            ;; run once on startup
                                            (triggers/start-now))]
    (task/add-job! send-notification-job)
    (task/schedule-task! init-notification-triggers-job init-notification-triggers-trigger)))