Background task scheduling via Quartzite. Individual tasks are defined in Regarding Task Initialization:The most appropriate way to initialize tasks in any Quartz JavaDocFind the JavaDoc for Quartz here: http://www.quartz-scheduler.org/api/2.3.0/index.html | (ns metabase.task (:require [clojure.string :as str] [clojurewerkz.quartzite.scheduler :as qs] [environ.core :as env] [metabase.db :as mdb] [metabase.plugins.classloader :as classloader] [metabase.task.bootstrap] [metabase.util :as u] [metabase.util.log :as log] [metabase.util.malli :as mu] [metabase.util.malli.schema :as ms]) (:import (org.quartz CronTrigger JobDetail JobExecutionContext JobExecutionException JobKey JobPersistenceException ObjectAlreadyExistsException Scheduler Trigger TriggerKey))) |
(set! *warn-on-reflection* true) | |
+----------------------------------------------------------------------------------------------------------------+ | SCHEDULER INSTANCE | +----------------------------------------------------------------------------------------------------------------+ | |
Override the global Quartz scheduler by binding this var. | (defonce ^:dynamic *quartz-scheduler* (atom nil)) |
Fetch the instance of our Quartz scheduler. | (defn scheduler ^Scheduler [] @*quartz-scheduler*) |
+----------------------------------------------------------------------------------------------------------------+ | FINDING & LOADING TASKS | +----------------------------------------------------------------------------------------------------------------+ | |
Initialize (i.e., schedule) Job(s) with a given name. All implementations of this method are called once and only
once when the Quartz task scheduler is initialized. Task namespaces ( The dispatch value for this function can be any unique keyword, but by convention is a namespaced keyword version of the name of the Job being initialized; for sake of consistency with the Job name itself, the keyword should be left CamelCased. (defmethod task/init! ::SendPulses [_] (task/schedule-task! my-job my-trigger)) | (defmulti init! {:arglists '([job-name-string])} keyword) |
Call all implementations of | (defn- init-tasks! [] (doseq [[k f] (methods init!)] (try ;; don't bother logging namespace for now, maybe in the future if there's tasks of the same name in multiple ;; namespaces we can log it (log/info "Initializing task" (u/format-color 'green (name k)) (u/emoji "📆")) (f k) (catch Throwable e (log/errorf e "Error initializing task %s" k))))) |
+----------------------------------------------------------------------------------------------------------------+ | STARTING/STOPPING SCHEDULER | +----------------------------------------------------------------------------------------------------------------+ | |
(defn- set-jdbc-backend-properties! [] (metabase.task.bootstrap/set-jdbc-backend-properties! (mdb/db-type))) | |
Delete any jobs that have been scheduled but whose class is no longer available. | (defn- delete-jobs-with-no-class! [] (when-let [scheduler (scheduler)] (doseq [job-key (.getJobKeys scheduler nil)] (try (qs/get-job scheduler job-key) (catch JobPersistenceException e (when (instance? ClassNotFoundException (.getCause e)) (log/warnf "Deleting job %s due to class not found (%s)" (.getName ^JobKey job-key) (ex-message (.getCause e))) (qs/delete-job scheduler job-key))))))) |
Initialize our Quartzite scheduler which allows jobs to be submitted and triggers to scheduled. Puts scheduler in standby mode. Call [[start-scheduler!]] to begin running scheduled tasks. | (defn- init-scheduler! [] (classloader/the-classloader) (when-not @*quartz-scheduler* (set-jdbc-backend-properties!) (let [new-scheduler (qs/initialize)] (when (compare-and-set! *quartz-scheduler* nil new-scheduler) (qs/standby new-scheduler) (log/info "Task scheduler initialized into standby mode.") (delete-jobs-with-no-class!) (init-tasks!))))) |
this is a function mostly to facilitate testing. | (defn- disable-scheduler? [] (some-> (env/env :mb-disable-scheduler) Boolean/parseBoolean)) |
Start the task scheduler. Tasks do not run before calling this function. | (defn start-scheduler! [] (if (disable-scheduler?) (log/warn "Metabase task scheduler disabled. Scheduled tasks will not be ran.") (do (init-scheduler!) (qs/start (scheduler)) (log/info "Task scheduler started")))) |
Stop our Quartzite scheduler and shutdown any running executions. | (defn stop-scheduler! [] (let [[old-scheduler] (reset-vals! *quartz-scheduler* nil)] (when old-scheduler (qs/shutdown old-scheduler)))) |
+----------------------------------------------------------------------------------------------------------------+ | SCHEDULING/DELETING TASKS | +----------------------------------------------------------------------------------------------------------------+ | |
Assuming that [[job]] is already registered, ensure that [[new-trigger]] is scheduled to trigger it. | (mu/defn- reschedule-task! [job :- (ms/InstanceOfClass JobDetail) new-trigger :- (ms/InstanceOfClass Trigger)] (try (when-let [scheduler (scheduler)] (let [job-key (.getKey ^JobDetail job) new-trigger-key (.getKey ^Trigger new-trigger) triggers (try (qs/get-triggers-of-job scheduler job-key) (catch Exception _)) matching-trigger (first (filter (comp #{new-trigger-key} #(.getKey ^Trigger %)) triggers)) replaced-trigger (or matching-trigger (first triggers))] (log/debugf "Rescheduling job %s" (.getName job-key)) (if-not replaced-trigger (.scheduleJob scheduler new-trigger) (let [replaced-key (.getKey ^Trigger replaced-trigger)] (when-not matching-trigger (log/warnf "Replacing trigger %s with trigger %s%s" (.getName replaced-key) (.getName new-trigger-key) (when (> (count triggers) 1) ;; We probably want more intuitive rescheduling semantics for multi-trigger jobs... ;; Ideally we would pass *all* the new triggers at once, so we can match them up atomically. ;; The current behavior is especially confounding if replacing N triggers with M ones. (str " (chosen randomly from " (count triggers) " existing ones)")))) (.rescheduleJob scheduler replaced-key new-trigger))))) (catch Throwable e (log/error e "Error rescheduling job")))) |
Reschedule a trigger with the same key as the given trigger. Used to update trigger properties like priority. | (mu/defn reschedule-trigger! [trigger :- (ms/InstanceOfClass Trigger)] (when-let [scheduler (scheduler)] (.rescheduleJob scheduler (.getKey ^Trigger trigger) trigger))) |
Add a given job and trigger to our scheduler. | (mu/defn schedule-task! [job :- (ms/InstanceOfClass JobDetail) trigger :- (ms/InstanceOfClass Trigger)] (when-let [scheduler (scheduler)] (try (qs/schedule scheduler job trigger) (catch ObjectAlreadyExistsException _ (log/debug "Job already exists:" (-> ^JobDetail job .getKey .getName)) (reschedule-task! job trigger))))) |
Immediately trigger execution of task | (mu/defn trigger-now! [job-key :- (ms/InstanceOfClass JobKey)] (try (when-let [scheduler (scheduler)] (.triggerJob scheduler job-key)) (catch Throwable e (log/errorf e "Failed to trigger immediate execution of task %s" job-key)))) |
Delete a task from the scheduler | (mu/defn delete-task! [job-key :- (ms/InstanceOfClass JobKey) trigger-key :- (ms/InstanceOfClass TriggerKey)] (when-let [scheduler (scheduler)] (qs/delete-trigger scheduler trigger-key) (qs/delete-job scheduler job-key))) |
Add a job separately from a trigger, replace if the job is already there | (mu/defn add-job! [job :- (ms/InstanceOfClass JobDetail)] (when-let [scheduler (scheduler)] (qs/add-job scheduler job true))) |
Add a trigger. Assumes the trigger is already associated to a job (i.e. | (mu/defn add-trigger! [trigger :- (ms/InstanceOfClass Trigger)] (when-let [scheduler (scheduler)] (qs/add-trigger scheduler trigger))) |
Remove | (mu/defn delete-trigger! [trigger-key :- (ms/InstanceOfClass TriggerKey)] (when-let [scheduler (scheduler)] (qs/delete-trigger scheduler trigger-key))) |
Delete all triggers for a given job key. | (mu/defn delete-all-triggers-of-job! [job-key :- (ms/InstanceOfClass JobKey)] (when-let [scheduler (scheduler)] (qs/delete-triggers scheduler (map #(.getKey ^Trigger %) (qs/get-triggers-of-job scheduler job-key))))) |
+----------------------------------------------------------------------------------------------------------------+ | Scheduler Info | +----------------------------------------------------------------------------------------------------------------+ | |
(defn- job-detail->info [^JobDetail job-detail] {:key (-> (.getKey job-detail) .getName) :class (-> (.getJobClass job-detail) .getCanonicalName) :description (.getDescription job-detail) :concurrent-execution-disallowed? (.isConcurrentExectionDisallowed job-detail) :durable? (.isDurable job-detail) :requests-recovery? (.requestsRecovery job-detail)}) | |
(defmulti ^:private trigger->info {:arglists '([trigger])} class) | |
(defmethod trigger->info Trigger [^Trigger trigger] {:description (.getDescription trigger) :end-time (.getEndTime trigger) :final-fire-time (.getFinalFireTime trigger) :key (-> (.getKey trigger) .getName) :state (some->> (.getKey trigger) (.getTriggerState (scheduler)) str) :next-fire-time (.getNextFireTime trigger) :previous-fire-time (.getPreviousFireTime trigger) :priority (.getPriority trigger) :start-time (.getStartTime trigger) :may-fire-again? (.mayFireAgain trigger) :data (into {} (.getJobDataMap trigger))}) | |
(defmethod trigger->info CronTrigger [^CronTrigger trigger] (assoc ((get-method trigger->info Trigger) trigger) :schedule (.getCronExpression trigger) :timezone (.getID (.getTimeZone trigger)) :misfire-instruction ;; not 100% sure why `case` doesn't work here... (condp = (.getMisfireInstruction trigger) CronTrigger/MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY "IGNORE_MISFIRE_POLICY" CronTrigger/MISFIRE_INSTRUCTION_SMART_POLICY "SMART_POLICY" CronTrigger/MISFIRE_INSTRUCTION_FIRE_ONCE_NOW "FIRE_ONCE_NOW" CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING "DO_NOTHING" (format "UNKNOWN: %d" (.getMisfireInstruction trigger))))) | |
(defn- ->job-key ^JobKey [x] (cond (instance? JobKey x) x (string? x) (JobKey. ^String x))) | |
Check whether there is a Job with the given key. | (defn job-exists? [job-key] (boolean (let [s (scheduler)] (when (and s (not (.isShutdown s))) (qs/get-job s (->job-key job-key)))))) |
Get info about a specific Job ( (task/job-info "metabase.task.sync-and-analyze.job") | (defn job-info [job-key] (when-let [scheduler (scheduler)] (let [job-key (->job-key job-key)] (try (assoc (job-detail->info (qs/get-job scheduler job-key)) :triggers (for [trigger (sort-by #(-> ^Trigger % .getKey .getName) (qs/get-triggers-of-job scheduler job-key))] (trigger->info trigger))) (catch ClassNotFoundException _ (log/infof "Class not found for Quartz Job %s. This probably means that this job was removed or renamed." (.getName job-key))) (catch Throwable e (log/warnf e "Error fetching details for Quartz Job: %s" (.getName job-key))))))) |
(defn- jobs-info [] (->> (some-> (scheduler) (.getJobKeys nil)) (sort-by #(.getName ^JobKey %)) (map job-info) (filter some?))) | |
Get the existing triggers for a job by key name, if it exists. | (defn existing-triggers [job-key trigger-key] (filter #(= (:key %) (.getName ^TriggerKey trigger-key)) (:triggers (job-info job-key)))) |
Return raw data about all the scheduler and scheduled tasks (i.e. Jobs and Triggers). Primarily for debugging purposes. | (defn scheduler-info [] {:scheduler (some-> (scheduler) .getMetaData .getSummary str/split-lines) :jobs (jobs-info)}) |
Retry the current Job if an exception is thrown by the enclosed code. | (defmacro rerun-on-error {:style/indent 1} [^JobExecutionContext ctx & body] `(let [msg# (str (.getName (.getKey (.getJobDetail ~ctx))) " failed, but we will try it again.")] (try ~@body (catch Exception e# (log/error e# msg#) (throw (JobExecutionException. msg# e# true)))))) |