(ns metabase.util.queue (:import (java.time Duration Instant) (java.util.concurrent ArrayBlockingQueue DelayQueue Delayed SynchronousQueue TimeUnit))) | |
(set! *warn-on-reflection* true) | |
(defprotocol BoundedTransferQueue
(maybe-put! [queue msg]
"Put a message on the queue if there is space for it, otherwise drop it.
Returns whether the item was enqueued.")
(blocking-put! [queue timeout msg]
"Put a message on the queue. If necessary, block until there is space for it.")
(blocking-take! [queue timeout]
"Take a message off the queue, blocking if necessary.")
(clear! [queue]
"Discard all messages on the given queue.")) | |
Similar to java.util.concurrent.LinkedTransferQueue, but bounded. | (deftype ^:private ArrayTransferQueue [^ArrayBlockingQueue async-queue
^SynchronousQueue sync-queue
^long block-ms
^long sleep-ms]
BoundedTransferQueue
(maybe-put! [_ msg]
(.offer async-queue msg))
(blocking-put! [_ timeout msg]
(.offer sync-queue msg timeout TimeUnit/MILLISECONDS))
(blocking-take! [_ timeout]
(loop [time-remaining timeout]
(when (pos? time-remaining)
;; Async messages are given higher priority, as sync messages will never be dropped.
(or (.poll async-queue)
(.poll sync-queue block-ms TimeUnit/MILLISECONDS)
(do (Thread/sleep ^long sleep-ms)
;; This is an underestimate, as the thread may have taken a while to wake up. That's OK.
(recur (- time-remaining block-ms sleep-ms)))))))
(clear! [_]
(.clear sync-queue)
(.clear async-queue))) |
Create a bounded transfer queue, specialized based on the high-level options. | (defn bounded-transfer-queue
[capacity & {:keys [block-ms sleep-ms]
:or {block-ms 100
sleep-ms 100}}]
(->ArrayTransferQueue (ArrayBlockingQueue. capacity)
(SynchronousQueue.)
block-ms
sleep-ms)) |
(defrecord DelayValue [value ^Instant ready-at]
Delayed
(getDelay [_ unit]
(.convert unit (- (.toEpochMilli ready-at) (System/currentTimeMillis)) TimeUnit/MILLISECONDS))
(compareTo [this other]
(Long/compare (.getDelay this TimeUnit/MILLISECONDS)
(.getDelay ^Delayed other TimeUnit/MILLISECONDS)))) | |
Return an unbounded queue that returns each item only after some specified delay. | (defn delay-queue ^DelayQueue [] (DelayQueue.)) |
Put an item on the delay queue, with a delay given in milliseconds. | (defn put-with-delay! [^DelayQueue queue delay-ms value] (.offer queue (->DelayValue value (.plus (Instant/now) (Duration/ofMillis delay-ms))))) |
(defn- take-delayed-batch* [^DelayQueue queue max-items ^long poll-ms acc]
(loop [acc acc]
(if (>= (count acc) max-items)
acc
(if-let [item (if (pos? poll-ms)
(.poll queue poll-ms TimeUnit/MILLISECONDS)
(.poll queue))]
(recur (conj acc (:value item)))
(not-empty acc))))) | |
Get up to | (defn take-delayed-batch!
([queue max-items]
(take-delayed-batch* queue max-items 0 []))
([^DelayQueue queue max-items ^long max-first-ms ^long max-next-ms]
(when-let [fst (.poll queue max-first-ms TimeUnit/MILLISECONDS)]
(take-delayed-batch* queue max-items max-next-ms [(:value fst)])))) |