Utility functions for core.async-based async logic. | (ns metabase.util.async (:require [clojure.core.async :as a] [metabase.util.log :as log]) (:import (clojure.core.async.impl.buffers PromiseBuffer) (clojure.core.async.impl.channels ManyToManyChannel) (java.util.concurrent ThreadPoolExecutor))) |
(set! *warn-on-reflection* true) | |
TODO - most of this stuff can be removed now that we have the new-new reducible/async QP implementation of early 2020. No longer needed | |
Is core.async | (defn promise-chan? [chan] (and (instance? ManyToManyChannel chan) (instance? PromiseBuffer (.buf ^ManyToManyChannel chan)))) |
Exactly like 1) the result channel is a promise channel instead of a regular channel 2) Closing the result channel early will cancel the async thread call. | (defn cancelable-thread-call [thunk] ;; create two channels: ;; * `done-chan` will always get closed immediately after `(f)` is finished ;; * `result-chan` will get the result of `(f)`, *after* `done-chan` is closed (let [done-chan (a/promise-chan) result-chan (a/promise-chan) thunk* (^:once fn [] (let [result (try (thunk) (catch Throwable e (log/trace e "cancelable-thread-call: caught exception in f") e))] (a/close! done-chan) (when (some? result) (a/>!! result-chan result))) (a/close! result-chan)) futur (.submit ^ThreadPoolExecutor @#'a/thread-macro-executor ^Runnable (bound-fn* thunk*))] ;; if `result-chan` gets a result/closed *before* `done-chan`, it means it was closed by the caller, so we should ;; cancel the thread running `thunk*` (a/go (let [[_ port] (a/alts! [done-chan result-chan] :priority true)] (when (= port result-chan) (log/trace "cancelable-thread-call: result channel closed before f finished; canceling thread") (future-cancel futur)))) result-chan)) |
Exactly like 1) the result channel is a promise channel instead of a regular channel 2) Closing the result channel early will cancel the async thread call. | (defmacro cancelable-thread {:style/indent 0} [& body] `(cancelable-thread-call (^:once fn* [] ~@body))) |