Background task scheduling via Quartzite. Individual tasks are defined in Regarding Task Initialization:The most appropriate way to initialize tasks in any Quartz JavaDocFind the JavaDoc for Quartz here: http://www.quartz-scheduler.org/api/2.3.0/index.html | (ns metabase.task
(:require
[clojure.string :as str]
[clojurewerkz.quartzite.scheduler :as qs]
[environ.core :as env]
[metabase.db :as mdb]
[metabase.plugins.classloader :as classloader]
[metabase.task.bootstrap]
[metabase.util :as u]
[metabase.util.log :as log]
[metabase.util.malli :as mu]
[metabase.util.malli.schema :as ms])
(:import
(org.quartz CronTrigger JobDetail JobExecutionContext JobExecutionException JobKey JobPersistenceException
ObjectAlreadyExistsException Scheduler Trigger TriggerKey))) |
(set! *warn-on-reflection* true) | |
+----------------------------------------------------------------------------------------------------------------+ | SCHEDULER INSTANCE | +----------------------------------------------------------------------------------------------------------------+ | |
Override the global Quartz scheduler by binding this var. | (defonce ^:dynamic *quartz-scheduler* (atom nil)) |
Fetch the instance of our Quartz scheduler. | (defn scheduler ^Scheduler [] @*quartz-scheduler*) |
+----------------------------------------------------------------------------------------------------------------+ | FINDING & LOADING TASKS | +----------------------------------------------------------------------------------------------------------------+ | |
Initialize (i.e., schedule) Job(s) with a given name. All implementations of this method are called once and only
once when the Quartz task scheduler is initialized. Task namespaces ( The dispatch value for this function can be any unique keyword, but by convention is a namespaced keyword version of the name of the Job being initialized; for sake of consistency with the Job name itself, the keyword should be left CamelCased. (defmethod task/init! ::SendPulses [_] (task/schedule-task! my-job my-trigger)) | (defmulti init!
{:arglists '([job-name-string])}
keyword) |
Call all implementations of | (defn- init-tasks!
[]
(doseq [[k f] (methods init!)]
(try
;; don't bother logging namespace for now, maybe in the future if there's tasks of the same name in multiple
;; namespaces we can log it
(log/info "Initializing task" (u/format-color 'green (name k)) (u/emoji "📆"))
(f k)
(catch Throwable e
(log/errorf e "Error initializing task %s" k))))) |
+----------------------------------------------------------------------------------------------------------------+ | STARTING/STOPPING SCHEDULER | +----------------------------------------------------------------------------------------------------------------+ | |
(defn- set-jdbc-backend-properties! [] (metabase.task.bootstrap/set-jdbc-backend-properties! (mdb/db-type))) | |
Delete any jobs that have been scheduled but whose class is no longer available. | (defn- delete-jobs-with-no-class!
[]
(when-let [scheduler (scheduler)]
(doseq [job-key (.getJobKeys scheduler nil)]
(try
(qs/get-job scheduler job-key)
(catch JobPersistenceException e
(when (instance? ClassNotFoundException (.getCause e))
(log/warnf "Deleting job %s due to class not found (%s)"
(.getName ^JobKey job-key)
(ex-message (.getCause e)))
(qs/delete-job scheduler job-key))))))) |
Initialize our Quartzite scheduler which allows jobs to be submitted and triggers to scheduled. Puts scheduler in standby mode. Call [[start-scheduler!]] to begin running scheduled tasks. | (defn- init-scheduler!
[]
(classloader/the-classloader)
(when-not @*quartz-scheduler*
(set-jdbc-backend-properties!)
(let [new-scheduler (qs/initialize)]
(when (compare-and-set! *quartz-scheduler* nil new-scheduler)
(qs/standby new-scheduler)
(log/info "Task scheduler initialized into standby mode.")
(delete-jobs-with-no-class!)
(init-tasks!))))) |
this is a function mostly to facilitate testing. | (defn- disable-scheduler? [] (some-> (env/env :mb-disable-scheduler) Boolean/parseBoolean)) |
Start the task scheduler. Tasks do not run before calling this function. | (defn start-scheduler!
[]
(if (disable-scheduler?)
(log/warn "Metabase task scheduler disabled. Scheduled tasks will not be ran.")
(do (init-scheduler!)
(qs/start (scheduler))
(log/info "Task scheduler started")))) |
Stop our Quartzite scheduler and shutdown any running executions. | (defn stop-scheduler!
[]
(let [[old-scheduler] (reset-vals! *quartz-scheduler* nil)]
(when old-scheduler
(qs/shutdown old-scheduler)))) |
+----------------------------------------------------------------------------------------------------------------+ | SCHEDULING/DELETING TASKS | +----------------------------------------------------------------------------------------------------------------+ | |
Assuming that [[job]] is already registered, ensure that [[new-trigger]] is scheduled to trigger it. | (mu/defn- reschedule-task!
[job :- (ms/InstanceOfClass JobDetail)
new-trigger :- (ms/InstanceOfClass Trigger)]
(try
(when-let [scheduler (scheduler)]
(let [job-key (.getKey ^JobDetail job)
new-trigger-key (.getKey ^Trigger new-trigger)
triggers (try (qs/get-triggers-of-job scheduler job-key) (catch Exception _))
matching-trigger (first (filter (comp #{new-trigger-key} #(.getKey ^Trigger %)) triggers))
replaced-trigger (or matching-trigger (first triggers))]
(log/debugf "Rescheduling job %s" (.getName job-key))
(if-not replaced-trigger
(.scheduleJob scheduler new-trigger)
(let [replaced-key (.getKey ^Trigger replaced-trigger)]
(when-not matching-trigger
(log/warnf "Replacing trigger %s with trigger %s%s"
(.getName replaced-key)
(.getName new-trigger-key)
(when (> (count triggers) 1)
;; We probably want more intuitive rescheduling semantics for multi-trigger jobs...
;; Ideally we would pass *all* the new triggers at once, so we can match them up atomically.
;; The current behavior is especially confounding if replacing N triggers with M ones.
(str " (chosen randomly from " (count triggers) " existing ones)"))))
(.rescheduleJob scheduler replaced-key new-trigger)))))
(catch Throwable e
(log/error e "Error rescheduling job")))) |
Reschedule a trigger with the same key as the given trigger. Used to update trigger properties like priority. | (mu/defn reschedule-trigger!
[trigger :- (ms/InstanceOfClass Trigger)]
(when-let [scheduler (scheduler)]
(.rescheduleJob scheduler (.getKey ^Trigger trigger) trigger))) |
Add a given job and trigger to our scheduler. | (mu/defn schedule-task!
[job :- (ms/InstanceOfClass JobDetail) trigger :- (ms/InstanceOfClass Trigger)]
(when-let [scheduler (scheduler)]
(try
(qs/schedule scheduler job trigger)
(catch ObjectAlreadyExistsException _
(log/debug "Job already exists:" (-> ^JobDetail job .getKey .getName))
(reschedule-task! job trigger))))) |
Immediately trigger execution of task | (mu/defn trigger-now!
[job-key :- (ms/InstanceOfClass JobKey)]
(try
(when-let [scheduler (scheduler)]
(.triggerJob scheduler job-key))
(catch Throwable e
(log/errorf e "Failed to trigger immediate execution of task %s" job-key)))) |
Delete a task from the scheduler | (mu/defn delete-task!
[job-key :- (ms/InstanceOfClass JobKey) trigger-key :- (ms/InstanceOfClass TriggerKey)]
(when-let [scheduler (scheduler)]
(qs/delete-trigger scheduler trigger-key)
(qs/delete-job scheduler job-key))) |
Add a job separately from a trigger, replace if the job is already there | (mu/defn add-job!
[job :- (ms/InstanceOfClass JobDetail)]
(when-let [scheduler (scheduler)]
(qs/add-job scheduler job true))) |
Add a trigger. Assumes the trigger is already associated to a job (i.e. | (mu/defn add-trigger!
[trigger :- (ms/InstanceOfClass Trigger)]
(when-let [scheduler (scheduler)]
(qs/add-trigger scheduler trigger))) |
Remove | (mu/defn delete-trigger!
[trigger-key :- (ms/InstanceOfClass TriggerKey)]
(when-let [scheduler (scheduler)]
(qs/delete-trigger scheduler trigger-key))) |
Delete all triggers for a given job key. | (mu/defn delete-all-triggers-of-job!
[job-key :- (ms/InstanceOfClass JobKey)]
(when-let [scheduler (scheduler)]
(qs/delete-triggers scheduler (map #(.getKey ^Trigger %) (qs/get-triggers-of-job scheduler job-key))))) |
+----------------------------------------------------------------------------------------------------------------+ | Scheduler Info | +----------------------------------------------------------------------------------------------------------------+ | |
(defn- job-detail->info [^JobDetail job-detail]
{:key (-> (.getKey job-detail) .getName)
:class (-> (.getJobClass job-detail) .getCanonicalName)
:description (.getDescription job-detail)
:concurrent-execution-disallowed? (.isConcurrentExectionDisallowed job-detail)
:durable? (.isDurable job-detail)
:requests-recovery? (.requestsRecovery job-detail)}) | |
(defmulti ^:private trigger->info
{:arglists '([trigger])}
class) | |
(defmethod trigger->info Trigger
[^Trigger trigger]
{:description (.getDescription trigger)
:end-time (.getEndTime trigger)
:final-fire-time (.getFinalFireTime trigger)
:key (-> (.getKey trigger) .getName)
:state (some->> (.getKey trigger) (.getTriggerState (scheduler)) str)
:next-fire-time (.getNextFireTime trigger)
:previous-fire-time (.getPreviousFireTime trigger)
:priority (.getPriority trigger)
:start-time (.getStartTime trigger)
:may-fire-again? (.mayFireAgain trigger)
:data (into {} (.getJobDataMap trigger))}) | |
(defmethod trigger->info CronTrigger
[^CronTrigger trigger]
(assoc
((get-method trigger->info Trigger) trigger)
:schedule
(.getCronExpression trigger)
:timezone
(.getID (.getTimeZone trigger))
:misfire-instruction
;; not 100% sure why `case` doesn't work here...
(condp = (.getMisfireInstruction trigger)
CronTrigger/MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY "IGNORE_MISFIRE_POLICY"
CronTrigger/MISFIRE_INSTRUCTION_SMART_POLICY "SMART_POLICY"
CronTrigger/MISFIRE_INSTRUCTION_FIRE_ONCE_NOW "FIRE_ONCE_NOW"
CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING "DO_NOTHING"
(format "UNKNOWN: %d" (.getMisfireInstruction trigger))))) | |
(defn- ->job-key ^JobKey [x]
(cond
(instance? JobKey x) x
(string? x) (JobKey. ^String x))) | |
Check whether there is a Job with the given key. | (defn job-exists?
[job-key]
(boolean
(let [s (scheduler)]
(when (and s (not (.isShutdown s)))
(qs/get-job s (->job-key job-key)))))) |
Get info about a specific Job ( (task/job-info "metabase.task.sync-and-analyze.job") | (defn job-info
[job-key]
(when-let [scheduler (scheduler)]
(let [job-key (->job-key job-key)]
(try
(assoc (job-detail->info (qs/get-job scheduler job-key))
:triggers (for [trigger (sort-by #(-> ^Trigger % .getKey .getName)
(qs/get-triggers-of-job scheduler job-key))]
(trigger->info trigger)))
(catch ClassNotFoundException _
(log/infof "Class not found for Quartz Job %s. This probably means that this job was removed or renamed." (.getName job-key)))
(catch Throwable e
(log/warnf e "Error fetching details for Quartz Job: %s" (.getName job-key))))))) |
(defn- jobs-info []
(->> (some-> (scheduler) (.getJobKeys nil))
(sort-by #(.getName ^JobKey %))
(map job-info)
(filter some?))) | |
Get the existing triggers for a job by key name, if it exists. | (defn existing-triggers [job-key trigger-key] (filter #(= (:key %) (.getName ^TriggerKey trigger-key)) (:triggers (job-info job-key)))) |
Return raw data about all the scheduler and scheduled tasks (i.e. Jobs and Triggers). Primarily for debugging purposes. | (defn scheduler-info
[]
{:scheduler (some-> (scheduler) .getMetaData .getSummary str/split-lines)
:jobs (jobs-info)}) |
Retry the current Job if an exception is thrown by the enclosed code. | (defmacro rerun-on-error
{:style/indent 1}
[^JobExecutionContext ctx & body]
`(let [msg# (str (.getName (.getKey (.getJobDetail ~ctx))) " failed, but we will try it again.")]
(try
~@body
(catch Exception e#
(log/error e# msg#)
(throw (JobExecutionException. msg# e# true)))))) |