This namespace is responsible for subscribing to events which should update the view log and view counts.

(ns metabase.events.view-log
  (:require
   [java-time.api :as t]
   [metabase.api.common :as api]
   [metabase.events :as events]
   [metabase.models.audit-log :as audit-log]
   [metabase.models.query.permissions :as query-perms]
   [metabase.public-settings.premium-features :as premium-features]
   [metabase.util :as u]
   [metabase.util.grouper :as grouper]
   [metabase.util.log :as log]
   [metabase.util.malli :as mu]
   [methodical.core :as m]
   [steffan-westcott.clj-otel.api.trace.span :as span]
   [toucan2.core :as t2]))

Given a list of items, returns a map of frequencies to items. (group-by-frequency [:a :a :b :b :c :c :c]) ;; => {2 [:a :b] 3 [:c]}

(defn- group-by-frequency
  [items]
  (reduce (fn [acc [item cnt]]
            (update acc cnt u/conjv item))
          {}
          (frequencies items)))
(defn- increment-view-counts!*
  [items]
  (log/debugf "Increment view counts of %d items" (count items))
  (try
    (let [model->ids (reduce (fn [acc {:keys [id model]}]
                               (update acc model conj id))
                             {}
                             items)]
      (doseq [[model ids] model->ids]
        (let [cnt->ids (group-by-frequency ids)]
          (t2/query {:update (t2/table-name model)
                     :set    {:view_count [:+ :view_count (into [:case]
                                                                (mapcat (fn [[cnt ids]]
                                                                          [[:in :id ids] cnt])
                                                                        cnt->ids))]}
                     :where  [:in :id (apply concat (vals cnt->ids))]}))))
    (catch Exception e
      (log/error e "Failed to increment view counts"))))
(def ^:private increment-view-count-interval-seconds 20)
(defonce ^:private
  increase-view-count-queue
  (delay (grouper/start!
          increment-view-counts!*
          :capacity 500
          :interval (* increment-view-count-interval-seconds 1000))))

Increment the view count of the given model and model-id.

(defn- increment-view-counts!
  [model model-id]
  (grouper/submit! @increase-view-count-queue {:model model :id model-id}))

Simple base function for recording a view of a given model and model-id by a certain user.

(mu/defn ^:private record-views!
  [view-or-views :- [:or :map [:sequential :map]]]
  (span/with-span!
    {:name "record-view!"}
    (when (premium-features/log-enabled?)
      (t2/insert! :model/ViewLog view-or-views))))

Generates a view, given an event map. The event map either has an object or a model and object-id.

(defn- generate-view
  [& {:keys [model object-id object user-id has-access context]
      :or   {has-access true}}]
  {:model      (u/lower-case-en (audit-log/model-name (or model object)))
   :user_id    (or user-id api/*current-user-id*)
   :model_id   (or object-id (u/id object))
   :has_access has-access
   :context    context})
(derive ::card-read-event :metabase/event)
(derive :event/card-read ::card-read-event)
(m/defmethod events/publish-event! ::card-read-event
  "Handle processing for a generic read event notification"
  [topic {:keys [object-id user-id] :as event}]
  (span/with-span!
    {:name    "view-log-card-read"
     :topic   topic
     :user-id user-id}
    (try
      (increment-view-counts! :model/Card object-id)
      (record-views! (generate-view :model :model/Card event))
      (catch Throwable e
        (log/warnf e "Failed to process view event. %s" topic)))))
(derive ::dashboard-queried :metabase/event)
(derive :event/dashboard-queried ::dashboard-queried)
(def ^:private update-dashboard-last-viewed-at-interval-seconds 20)
(defn- update-dashboard-last-viewed-at!* [dashboard-id-timestamps]
  (let [dashboard-id->timestamp (update-vals (group-by :id dashboard-id-timestamps)
                                             (fn [xs] (apply t/max (map :timestamp xs))))]
    (try
      (t2/update! :model/Dashboard :id [:in (keys dashboard-id->timestamp)]
                  {:last_viewed_at (into [:case]
                                         (mapcat (fn [[id timestamp]]
                                                   [[:= :id id] [:greatest [:coalesce :last_viewed_at (t/offset-date-time 0)] timestamp]])
                                                 dashboard-id->timestamp))})
      (catch Exception e
        (log/error e "Failed to update dashboard last_viewed_at")))))
(def ^:private update-dashboard-last-viewed-at-queue
  (delay (grouper/start!
          update-dashboard-last-viewed-at!*
          :capacity 500
          :interval (* update-dashboard-last-viewed-at-interval-seconds 1000))))

Update the last_used_at of a dashboard asynchronously

(defn- update-dashboard-last-viewed-at!
  [dashboard-id]
  (let [now (t/offset-date-time)]
    (grouper/submit! @update-dashboard-last-viewed-at-queue {:id dashboard-id
                                                             :timestamp now})))
(m/defmethod events/publish-event! ::dashboard-queried
  "Handle processing for a dashboard query being run"
  [topic {:keys [object-id] :as _event}]
  (try
    (update-dashboard-last-viewed-at! object-id)
    (catch Throwable e
      (log/warnf e "Failed to process dashboard query event. %s" topic))))
(derive ::collection-read-event :metabase/event)
(derive :event/collection-read ::collection-read-event)
(m/defmethod events/publish-event! ::collection-read-event
  "Handle processing for a generic read event notification"
  [topic event]
  (try
    (-> event
        generate-view
        record-views!)
    (catch Throwable e
      (log/warnf e "Failed to process view event. %s" topic))))
(derive ::read-permission-failure :metabase/event)
(derive :event/read-permission-failure ::read-permission-failure)
(m/defmethod events/publish-event! ::read-permission-failure
  "Handle processing for a generic read event notification"
  [topic {:keys [object] :as event}]
  (try
    ;; Only log permission check failures for Cards and Dashboards. This set can be expanded if we add view logging of
    ;; other models.
    (when (#{:model/Card :model/Dashboard} (t2/model object))
      (-> event
          generate-view
          record-views!))
    (catch Throwable e
      (log/warnf e "Failed to process view event. %s" topic))))
(derive ::dashboard-read :metabase/event)
(derive :event/dashboard-read ::dashboard-read)
(m/defmethod events/publish-event! ::dashboard-read
  "Handle processing for the dashboard read event. Logs the dashboard view. Card views are logged separately."
  [topic {:keys [object-id user-id] :as event}]
  (span/with-span!
    {:name "view-log-dashboard-read"
     :topic topic
     :user-id user-id}
    (try
      (increment-view-counts! :model/Dashboard object-id)
      (record-views! (generate-view :model :model/Dashboard event))
      (catch Throwable e
        (log/warnf e "Failed to process view event. %s" topic)))))
(derive ::table-read :metabase/event)
(derive :event/table-read ::table-read)
(m/defmethod events/publish-event! ::table-read
  "Handle processing for the table read event. Does a basic permissions check to see if the the user has data perms for
  the table."
  [topic {:keys [object user-id] :as event}]
  (span/with-span!
    {:name "view-log-table-read"
     :topic topic
     :user-id user-id}
    (try
      (increment-view-counts! :model/Table (:id object))
      (let [table-id    (u/id object)
            database-id (:db_id object)
            has-access? (when (= api/*current-user-id* user-id)
                          (query-perms/can-query-table? database-id table-id))]
        (-> event
            (assoc :has-access has-access?)
            generate-view
            record-views!))
      (catch Throwable e
        (log/warnf e "Failed to process view event. %s" topic)))))