(ns metabase.async.streaming-response (:require [clojure.core.async :as a] [clojure.walk :as walk] [compojure.response] [metabase.async.streaming-response.thread-pool :as thread-pool] [metabase.async.util :as async.u] [metabase.server.protocols :as server.protocols] [metabase.util :as u] [metabase.util.json :as json] [metabase.util.log :as log] [potemkin.types :as p.types] [pretty.core :as pretty] [ring.util.jakarta.servlet :as servlet] [ring.util.response :as response]) (:import (jakarta.servlet AsyncContext) (jakarta.servlet.http HttpServletResponse) (java.io BufferedWriter OutputStream OutputStreamWriter) (java.nio ByteBuffer) (java.nio.channels ClosedChannelException SocketChannel) (java.nio.charset StandardCharsets) (java.util.zip GZIPOutputStream) (org.eclipse.jetty.io EofException SocketChannelEndPoint) (org.eclipse.jetty.server Request))) | |
(set! *warn-on-reflection* true) | |
(defn- write-to-output-stream! ([^OutputStream os x] (if (int? x) (.write os ^int x) (.write os ^bytes x))) ([^OutputStream os ^bytes ba ^Integer offset ^Integer len] (.write os ba offset len))) | |
(defn- ex-status-code [e] (or (some #((some-fn :status-code :status) (ex-data %)) (take-while some? (iterate ex-cause e))) 500)) | |
(defn- format-exception [e] (assoc (Throwable->map e) :_status (ex-status-code e))) | |
Write an error to the output stream, formatting it nicely. Closes output stream afterwards. | (defn write-error! [^OutputStream os obj export-format] (cond (some #(instance? % obj) [InterruptedException EofException]) (log/trace "Error is an InterruptedException or EofException, not writing to output stream") (instance? Throwable obj) (recur os (format-exception obj) export-format) :else (with-open [os os] (log/trace (u/pprint-to-str (list 'write-error! obj))) (try (let [obj (-> (if (not= :api export-format) (walk/prewalk (fn [x] (if (map? x) (apply dissoc x [:json_query :preprocessed]) x)) obj) obj) (dissoc :export-format))] (with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))] (json/encode-to obj writer {}))) (catch EofException _) (catch Throwable e (log/error e "Error writing error to output stream" obj)))))) |
(defn- do-f* [f ^OutputStream os _finished-chan canceled-chan] (try (f os canceled-chan) (catch EofException _ (a/>!! canceled-chan ::jetty-eof) nil) (catch InterruptedException _ (a/>!! canceled-chan ::thread-interrupted) nil))) | |
Runs | (defn- do-f-async [^AsyncContext async-context f ^OutputStream os finished-chan canceled-chan] {:pre [(some? os)]} (let [task (^:once fn* [] (try (do-f* f os finished-chan canceled-chan) (catch Throwable e (log/error e "Caught unexpected Exception in streaming response body") (a/>!! finished-chan :unexpected-error) (write-error! os e nil)) (finally (a/>!! finished-chan (if (a/poll! canceled-chan) :canceled :completed)) (a/close! finished-chan) (a/close! canceled-chan) (.complete async-context))))] (.submit (thread-pool/thread-pool) ^Runnable task) nil)) |
Does the client accept GZIP-encoded responses?
| (defn- should-gzip-response? [{{:strs [accept-encoding]} :headers}] (some->> accept-encoding (re-find #"gzip|\*"))) |
(defn- output-stream-delay [gzip? ^HttpServletResponse response] (if gzip? (delay (GZIPOutputStream. (.getOutputStream response) true)) (delay (.getOutputStream response)))) | |
An OutputStream proxy that fetches the actual output stream by dereffing a delay (or other dereffable) before first use. | (defn- delay-output-stream [dlay] (proxy [OutputStream] [] (close [] (.close ^OutputStream @dlay)) (flush [] (.flush ^OutputStream @dlay)) (write ([x] (write-to-output-stream! @dlay x)) ([ba offset length] (write-to-output-stream! @dlay ba offset length))))) |
How often to check whether the request was canceled by the client. | (def ^:private async-cancellation-poll-interval-ms 1000) |
Protocol to get a SocketChannel from various types of transports. | (p.types/defprotocol+ ^:private ChannelProvider (^SocketChannel get-channel [transport] "Method to extract a SocketChannel.")) |
Extend the protocol to SocketChannel, returning itself | (extend-protocol ChannelProvider SocketChannel (get-channel [self] self) SocketChannelEndPoint (get-channel [self] (.getChannel self)) Object (get-channel [_] nil)) |
A set of types returned from | (def ^:private *reported-types (atom #{})) |
Log an error when an unexpected transport is encountered. | (defn log-unexpected-transport! [transport] (let [transport-type (type transport)] (when-not (contains? @*reported-types transport-type) (log/errorf "Unexpected transport type encountered in `canceled?`: %s" transport-type)) (swap! *reported-types conj transport-type))) |
Check whether the HTTP request has been canceled by the client. This function attempts to read a single byte from the underlying TCP socket; if the request is canceled, | (defn- canceled? [^Request request] (try (let [transport (.. request getHttpChannel getEndPoint getTransport)] (if-let [channel (get-channel transport)] (let [buf (ByteBuffer/allocate 1) status (.read channel buf)] (log/tracef "Check cancelation status: .read returned %d" status) (neg? status)) (do (log-unexpected-transport! transport) false))) (catch InterruptedException _ false) (catch ClosedChannelException _ true) (catch Throwable e (log/error e "Error determining whether HTTP request was canceled") false))) |
How long to wait for the cancelation check to complete (it should usually complete immediately -- see above -- but if it doesn't, we don't want to block forever). | (def ^:private async-cancellation-poll-timeout-ms 1000) |
Starts an async loop that checks whether the client has canceled HTTP | (defn- start-async-cancel-loop! [request finished-chan canceled-chan] (a/go-loop [] (let [poll-timeout-chan (a/timeout async-cancellation-poll-interval-ms) [_ port] (a/alts! [poll-timeout-chan finished-chan])] (when (= port poll-timeout-chan) (log/tracef "Checking cancelation status after waiting %s" (u/format-milliseconds async-cancellation-poll-interval-ms)) (let [canceled-status-chan (async.u/cancelable-thread (canceled? request)) status-timeout-chan (a/timeout async-cancellation-poll-timeout-ms) [canceled? port] (a/alts! [finished-chan canceled-status-chan status-timeout-chan])] ;; if `canceled-status-chan` *wasn't* the first channel to return (i.e., we either timed out or the request ;; was completed) then close `canceled-status-chan` which will kill the underlying thread (a/close! canceled-status-chan) (when (= port status-timeout-chan) (log/debugf "Check cancelation status timed out after %s" (u/format-milliseconds async-cancellation-poll-timeout-ms))) (when (not= port finished-chan) (if canceled? (a/>! canceled-chan ::request-canceled) (recur)))))))) |
(defn- respond [{:keys [^HttpServletResponse response ^AsyncContext async-context request-map response-map request]} f {:keys [content-type status headers], :as _options} finished-chan] (let [canceled-chan (a/promise-chan)] (try (.setStatus response (or status 202)) (let [gzip? (should-gzip-response? request-map) headers (cond-> (assoc (merge headers (:headers response-map)) "Content-Type" content-type ;; Very important: connections which serve streaming responses SHOULD NOT be reused ;; by the client because of `start-async-cancel-loop!`. The latter tries to read a ;; byte from the input stream at some interval, and that may/will cause corruption ;; of the subsequent requests that come through the reused connection (see #46071). "Connection" "close") gzip? (assoc "Content-Encoding" "gzip"))] (#'servlet/set-headers response headers) (let [output-stream-delay (output-stream-delay gzip? response) delay-os (delay-output-stream output-stream-delay)] (start-async-cancel-loop! request finished-chan canceled-chan) (do-f-async async-context f delay-os finished-chan canceled-chan))) (catch Throwable e (log/error e "Unexpected exception in do-f-async") (try (.sendError response 500 (.getMessage e)) (catch Throwable e (log/error e "Unexpected exception writing error response"))) (a/>!! finished-chan :unexpected-error) (a/close! finished-chan) (a/close! canceled-chan) (.complete async-context))))) | |
(declare render) | |
(p.types/deftype+ StreamingResponse [f options donechan] pretty/PrettyPrintable (pretty [_] (list `->StreamingResponse f options donechan)) server.protocols/Respond (respond [_this context] (respond context f options donechan)) ;; sync responses only (in some cases?) compojure.response/Renderable (render [this request] (render this (should-gzip-response? request))) ;; async responses only compojure.response/Sendable (send* [this request respond* _raise] (respond* (compojure.response/render this request)))) | |
(defn- render [^StreamingResponse streaming-response gzip?] (let [{:keys [headers content-type], :as options} (.options streaming-response)] (assoc (response/response (if gzip? (StreamingResponse. (.f streaming-response) (assoc options :gzip? true) (.donechan streaming-response)) streaming-response)) :headers (cond-> (assoc headers "Content-Type" content-type) gzip? (assoc "Content-Encoding" "gzip")) :status (or (:status options) 202)))) | |
Fetch a promise channel that will get a message when a | (defn finished-chan [^StreamingResponse response] (.donechan response)) |
Impl for [[streaming-response]] macro. | (defn -streaming-response [f options] (->StreamingResponse f options (a/promise-chan))) |
Create an API response that streams results to an Minimal example: (streaming-response {:content-type "application/json; charset=utf-8"} [os canceled-chan] (write-something-to-stream! os))
Current options:
| (defmacro streaming-response {:style/indent 2, :arglists '([options [os-binding canceled-chan-binding] & body])} [options [os-binding canceled-chan-binding :as bindings] & body] {:pre [(= (count bindings) 2)]} `(-streaming-response (bound-fn [~(vary-meta os-binding assoc :tag 'java.io.OutputStream) ~canceled-chan-binding] ~@body) ~options)) |