Background task scheduling via Quartzite. Individual tasks are defined in Regarding Task Initialization:The most appropriate way to initialize tasks in any Quartz JavaDocFind the JavaDoc for Quartz here: http://www.quartz-scheduler.org/api/2.3.0/index.html | (ns metabase.task (:require [clojure.string :as str] [clojurewerkz.quartzite.scheduler :as qs] [environ.core :as env] [metabase.db :as mdb] [metabase.plugins.classloader :as classloader] [metabase.util :as u] [metabase.util.log :as log] [metabase.util.malli :as mu] [metabase.util.malli.schema :as ms]) (:import (org.quartz CronTrigger JobDetail JobKey JobPersistenceException ObjectAlreadyExistsException Scheduler Trigger TriggerKey))) |
(set! *warn-on-reflection* true) | |
+----------------------------------------------------------------------------------------------------------------+ | SCHEDULER INSTANCE | +----------------------------------------------------------------------------------------------------------------+ | |
Override the global Quartz scheduler by binding this var. | (defonce ^:dynamic *quartz-scheduler* (atom nil)) |
Fetch the instance of our Quartz scheduler. | (defn- scheduler ^Scheduler [] @*quartz-scheduler*) |
+----------------------------------------------------------------------------------------------------------------+ | FINDING & LOADING TASKS | +----------------------------------------------------------------------------------------------------------------+ | |
Initialize (i.e., schedule) Job(s) with a given name. All implementations of this method are called once and only
once when the Quartz task scheduler is initialized. Task namespaces ( The dispatch value for this function can be any unique keyword, but by convention is a namespaced keyword version of the name of the Job being initialized; for sake of consistency with the Job name itself, the keyword should be left CamelCased. (defmethod task/init! ::SendPulses [_] (task/schedule-task! my-job my-trigger)) | (defmulti init! {:arglists '([job-name-string])} keyword) |
Search Classpath for namespaces that start with | (defn- find-and-load-task-namespaces! [] (doseq [ns-symb u/metabase-namespace-symbols :when (.startsWith (name ns-symb) "metabase.task.")] (try (log/debug "Loading tasks namespace:" (u/format-color 'blue ns-symb)) (classloader/require ns-symb) (catch Throwable e (log/errorf e "Error loading tasks namespace %s" ns-symb))))) |
Call all implementations of | (defn- init-tasks! [] (doseq [[k f] (methods init!)] (try ;; don't bother logging namespace for now, maybe in the future if there's tasks of the same name in multiple ;; namespaces we can log it (log/info "Initializing task" (u/format-color 'green (name k)) (u/emoji "📆")) (f k) (catch Throwable e (log/errorf e "Error initializing task %s" k))))) |
+----------------------------------------------------------------------------------------------------------------+ | Quartz Scheduler Connection Provider | +----------------------------------------------------------------------------------------------------------------+ | |
Custom | |
(defrecord ^:private ConnectionProvider [] org.quartz.utils.ConnectionProvider (initialize [_]) (getConnection [_] ;; get a connection from our application DB connection pool. Quartz will close it (i.e., return it to the pool) ;; when it's done ;; ;; very important! Fetch a new connection from the connection pool rather than using currently bound Connection if ;; one already exists -- because Quartz will close this connection when done, we don't want to screw up the ;; calling block ;; ;; in a perfect world we could just check whether we're creating a new Connection or not, and if using an existing ;; Connection, wrap it in a delegating proxy wrapper that makes `.close()` a no-op but forwards all other methods. ;; Now that would be a useful macro! (.getConnection (mdb/app-db))) (shutdown [_])) | |
(when-not *compile-files* (System/setProperty "org.quartz.dataSource.db.connectionProvider.class" (.getName ConnectionProvider))) | |
+----------------------------------------------------------------------------------------------------------------+ | Quartz Scheduler Class Load Helper | +----------------------------------------------------------------------------------------------------------------+ | |
(defn- load-class ^Class [^String class-name] (Class/forName class-name true (classloader/the-classloader))) | |
(defrecord ^:private ClassLoadHelper [] org.quartz.spi.ClassLoadHelper (initialize [_]) (getClassLoader [_] (classloader/the-classloader)) (loadClass [_ class-name] (load-class class-name)) (loadClass [_ class-name _] (load-class class-name))) | |
(when-not *compile-files* (System/setProperty "org.quartz.scheduler.classLoadHelper.class" (.getName ClassLoadHelper))) | |
+----------------------------------------------------------------------------------------------------------------+ | STARTING/STOPPING SCHEDULER | +----------------------------------------------------------------------------------------------------------------+ | |
Set the appropriate system properties needed so Quartz can connect to the JDBC backend. (Since we don't know our DB
connection properties ahead of time, we'll need to set these at runtime rather than setting them in the
| (defn- set-jdbc-backend-properties! [] (when (= (mdb/db-type) :postgres) (System/setProperty "org.quartz.jobStore.driverDelegateClass" "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate"))) |
Delete any jobs that have been scheduled but whose class is no longer available. | (defn- delete-jobs-with-no-class! [] (when-let [scheduler (scheduler)] (doseq [job-key (.getJobKeys scheduler nil)] (try (qs/get-job scheduler job-key) (catch JobPersistenceException e (when (instance? ClassNotFoundException (.getCause e)) (log/infof "Deleting job %s due to class not found" (.getName ^JobKey job-key)) (qs/delete-job scheduler job-key))))))) |
Initialize our Quartzite scheduler which allows jobs to be submitted and triggers to scheduled. Puts scheduler in standby mode. Call [[start-scheduler!]] to begin running scheduled tasks. | (defn- init-scheduler! [] (classloader/the-classloader) (when-not @*quartz-scheduler* (set-jdbc-backend-properties!) (let [new-scheduler (qs/initialize)] (when (compare-and-set! *quartz-scheduler* nil new-scheduler) (find-and-load-task-namespaces!) (qs/standby new-scheduler) (log/info "Task scheduler initialized into standby mode.") (delete-jobs-with-no-class!) (init-tasks!))))) |
this is a function mostly to facilitate testing. | (defn- disable-scheduler? [] (some-> (env/env :mb-disable-scheduler) Boolean/parseBoolean)) |
Start the task scheduler. Tasks do not run before calling this function. | (defn start-scheduler! [] (if (disable-scheduler?) (log/warn "Metabase task scheduler disabled. Scheduled tasks will not be ran.") (do (init-scheduler!) (qs/start (scheduler)) (log/info "Task scheduler started")))) |
Stop our Quartzite scheduler and shutdown any running executions. | (defn stop-scheduler! [] (let [[old-scheduler] (reset-vals! *quartz-scheduler* nil)] (when old-scheduler (qs/shutdown old-scheduler)))) |
+----------------------------------------------------------------------------------------------------------------+ | SCHEDULING/DELETING TASKS | +----------------------------------------------------------------------------------------------------------------+ | |
Assuming that [[job]] is already registered, ensure that [[new-trigger]] is scheduled to trigger it. | (mu/defn- reschedule-task! [job :- (ms/InstanceOfClass JobDetail) new-trigger :- (ms/InstanceOfClass Trigger)] (try (when-let [scheduler (scheduler)] (let [job-key (.getKey ^JobDetail job) new-trigger-key (.getKey ^Trigger new-trigger) triggers (qs/get-triggers-of-job scheduler job-key) matching-trigger (first (filter (comp #{new-trigger-key} #(.getKey ^Trigger %)) triggers)) replaced-trigger (or matching-trigger (first triggers))] (when replaced-trigger (log/debugf "Rescheduling job %s" (.getName job-key)) (let [replaced-key (.getKey ^Trigger replaced-trigger)] (when-not matching-trigger (log/warnf "Replacing trigger %s with trigger %s%s" (.getName replaced-key) (.getName new-trigger-key) (when (> (count triggers) 1) ;; We probably want more intuitive rescheduling semantics for multi-trigger jobs... ;; Ideally we would pass *all* the new triggers at once, so we can match them up atomically. ;; The current behavior is especially confounding if replacing N triggers with M ones. (str " (chosen randomly from " (count triggers) " existing ones)"))) matching-trigger) (.rescheduleJob scheduler (.getKey ^Trigger matching-trigger) new-trigger))))) (catch Throwable e (log/error e "Error rescheduling job")))) |
Reschedule a trigger with the same key as the given trigger. Used to update trigger properties like priority. | (mu/defn reschedule-trigger! [trigger :- (ms/InstanceOfClass Trigger)] (when-let [scheduler (scheduler)] (.rescheduleJob scheduler (.getKey ^Trigger trigger) trigger))) |
Add a given job and trigger to our scheduler. | (mu/defn schedule-task! [job :- (ms/InstanceOfClass JobDetail) trigger :- (ms/InstanceOfClass Trigger)] (when-let [scheduler (scheduler)] (try (qs/schedule scheduler job trigger) (catch ObjectAlreadyExistsException _ (log/debug "Job already exists:" (-> ^JobDetail job .getKey .getName)) (reschedule-task! job trigger))))) |
Immediately trigger execution of task | (mu/defn trigger-now! [job-key :- (ms/InstanceOfClass JobKey)] (try (when-let [scheduler (scheduler)] (.triggerJob scheduler job-key)) (catch Throwable e (log/errorf e "Failed to trigger immediate execution of task %s" job-key)))) |
Delete a task from the scheduler | (mu/defn delete-task! [job-key :- (ms/InstanceOfClass JobKey) trigger-key :- (ms/InstanceOfClass TriggerKey)] (when-let [scheduler (scheduler)] (qs/delete-trigger scheduler trigger-key) (qs/delete-job scheduler job-key))) |
Add a job separately from a trigger, replace if the job is already there | (mu/defn add-job! [job :- (ms/InstanceOfClass JobDetail)] (when-let [scheduler (scheduler)] (qs/add-job scheduler job true))) |
Add a trigger. Assumes the trigger is already associated to a job (i.e. | (mu/defn add-trigger! [trigger :- (ms/InstanceOfClass Trigger)] (when-let [scheduler (scheduler)] (qs/add-trigger scheduler trigger))) |
Remove | (mu/defn delete-trigger! [trigger-key :- (ms/InstanceOfClass TriggerKey)] (when-let [scheduler (scheduler)] (qs/delete-trigger scheduler trigger-key))) |
+----------------------------------------------------------------------------------------------------------------+ | Scheduler Info | +----------------------------------------------------------------------------------------------------------------+ | |
(defn- job-detail->info [^JobDetail job-detail] {:key (-> (.getKey job-detail) .getName) :class (-> (.getJobClass job-detail) .getCanonicalName) :description (.getDescription job-detail) :concurrent-execution-disallowed? (.isConcurrentExectionDisallowed job-detail) :durable? (.isDurable job-detail) :requests-recovery? (.requestsRecovery job-detail)}) | |
(defmulti ^:private trigger->info {:arglists '([trigger])} class) | |
(defmethod trigger->info Trigger [^Trigger trigger] {:description (.getDescription trigger) :end-time (.getEndTime trigger) :final-fire-time (.getFinalFireTime trigger) :key (-> (.getKey trigger) .getName) :state (some->> (.getKey trigger) (.getTriggerState (scheduler)) str) :next-fire-time (.getNextFireTime trigger) :previous-fire-time (.getPreviousFireTime trigger) :priority (.getPriority trigger) :start-time (.getStartTime trigger) :may-fire-again? (.mayFireAgain trigger) :data (into {} (.getJobDataMap trigger))}) | |
(defmethod trigger->info CronTrigger [^CronTrigger trigger] (assoc ((get-method trigger->info Trigger) trigger) :schedule (.getCronExpression trigger) :timezone (.getID (.getTimeZone trigger)) :misfire-instruction ;; not 100% sure why `case` doesn't work here... (condp = (.getMisfireInstruction trigger) CronTrigger/MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY "IGNORE_MISFIRE_POLICY" CronTrigger/MISFIRE_INSTRUCTION_SMART_POLICY "SMART_POLICY" CronTrigger/MISFIRE_INSTRUCTION_FIRE_ONCE_NOW "FIRE_ONCE_NOW" CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING "DO_NOTHING" (format "UNKNOWN: %d" (.getMisfireInstruction trigger))))) | |
(defn- ->job-key ^JobKey [x] (cond (instance? JobKey x) x (string? x) (JobKey. ^String x))) | |
Check whether there is a Job with the given key. | (defn job-exists? [job-key] (boolean (let [s (scheduler)] (when (and s (not (.isShutdown s))) (qs/get-job s (->job-key job-key)))))) |
Get info about a specific Job ( (task/job-info "metabase.task.sync-and-analyze.job") | (defn job-info [job-key] (when-let [scheduler (scheduler)] (let [job-key (->job-key job-key)] (try (assoc (job-detail->info (qs/get-job scheduler job-key)) :triggers (for [trigger (sort-by #(-> ^Trigger % .getKey .getName) (qs/get-triggers-of-job scheduler job-key))] (trigger->info trigger))) (catch ClassNotFoundException _ (log/infof "Class not found for Quartz Job %s. This probably means that this job was removed or renamed." (.getName job-key))) (catch Throwable e (log/warnf e "Error fetching details for Quartz Job: %s" (.getName job-key))))))) |
(defn- jobs-info [] (->> (some-> (scheduler) (.getJobKeys nil)) (sort-by #(.getName ^JobKey %)) (map job-info) (filter some?))) | |
Get the existing triggers for a job by key name, if it exists. | (defn existing-triggers [job-key trigger-key] (filter #(= (:key %) (.getName ^TriggerKey trigger-key)) (:triggers (job-info job-key)))) |
Return raw data about all the scheduler and scheduled tasks (i.e. Jobs and Triggers). Primarily for debugging purposes. | (defn scheduler-info [] {:scheduler (some-> (scheduler) .getMetaData .getSummary str/split-lines) :jobs (jobs-info)}) |