(ns metabase.query-processor.pipeline (:require [clojure.core.async :as a] [metabase.driver :as driver] [metabase.driver.util :as driver.u] [metabase.query-processor.error-type :as qp.error-type] [metabase.util :as u] [metabase.util.i18n :as i18n] [metabase.util.log :as log])) | |
If this channel is bound, you can send it a message to cancel the query. You can check if it has received a message to see if the query has been canceled. This should be bound to a [[clojure.core.async/promise-chan]] so it can be polled freely without messages being consumed. | (def ^:dynamic ^clojure.core.async.impl.channels.ManyToManyChannel *canceled-chan* nil) |
Whether the current query execution has been canceled. This is usually triggered by an HTTP connection closing when running queries from the REST API; you should check this before or while doing something expensive (such as before running the query against a data warehouse) to avoid doing work for queries that have been canceled. | (defn canceled? [] (some-> *canceled-chan* a/poll!)) |
Default implementation for result. | (defn default-result-handler [result] (if (instance? Throwable result) (throw result) result)) |
Called exactly once with the final result, which is the result of either [[reduce]] (if query completed successfully), or an Exception (if it did not). | (defn ^:dynamic *result* [result] (default-result-handler result)) |
Called by [[run]] to have driver run query. By default, [[metabase.driver/execute-reducible-query]]. (respond results-metadata reducible-rows) The implementation should call | (defn ^:dynamic *execute* [driver query respond] (when-not (canceled?) ;; the context map that gets passed to [[driver/execute-reducible-query]] is for backwards compatibility for ;; pre-#35465 code (let [context {:canceled-chan *canceled-chan*}] (driver/execute-reducible-query driver query context respond)))) |
Called by [[run]] (inside the | (defn ^:dynamic *reduce* [rff metadata reducible-rows] (when-not (canceled?) (let [[status rf-or-e] (try [::ready-to-reduce (rff metadata)] (catch Throwable e [::error (ex-info (i18n/tru "Error building query results reducing function: {0}" (ex-message e)) {:type qp.error-type/qp, :rff rff} e)])) [status result] (case status ::ready-to-reduce (try [::success (transduce (fn [rf] (fn wrapper ([] (rf)) ([acc] (some-> *canceled-chan* a/close!) (rf acc)) ([acc row] (rf acc row)))) rf-or-e reducible-rows)] (catch Throwable e [::error (ex-info (i18n/tru "Error reducing result rows: {0}" (ex-message e)) {:type qp.error-type/qp} e)])) ::error [status rf-or-e])] (case status ::success (*result* result) ::error (throw result))))) |
If Throwable | (defn- interrupted-exception? [e] (or (instance? InterruptedException e) (some-> (ex-cause e) interrupted-exception?))) |
Function for running the query. Calls [[execute]], then [[reduce]] on the results. | (defn ^:dynamic *run* [query rff] (when-not (canceled?) (letfn [(respond [metadata reducible-rows] (*reduce* rff metadata reducible-rows))] (try (*execute* driver/*driver* query respond) (catch Throwable e ;; rethrow e if it's not an InterruptedException, we're not interested in it. (when-not (interrupted-exception? e) (throw e)) ;; ok, at this point we know it's an InterruptedException. (log/trace e "Caught InterruptedException when executing query, this means the query was canceled. Ignoring exception.") ;; just to be extra safe and sure that the canceled chan has gotten a message. It's a promise channel so ;; duplicate messages don't matter (some-> *canceled-chan* (a/>!! ::cancel)) ::cancel))))) |
Maximum amount of time query is allowed to run, in ms. | (def ^:dynamic ^Long *query-timeout-ms* (u/minutes->ms (driver.u/db-query-timeout-minutes))) |