This namespace is responsible for subscribing to events which should update the view log and view counts. | (ns metabase.events.view-log (:require [java-time.api :as t] [metabase.api.common :as api] [metabase.events :as events] [metabase.models.audit-log :as audit-log] [metabase.models.query.permissions :as query-perms] [metabase.premium-features.core :as premium-features] [metabase.util :as u] [metabase.util.grouper :as grouper] [metabase.util.log :as log] [metabase.util.malli :as mu] [methodical.core :as m] [steffan-westcott.clj-otel.api.trace.span :as span] [toucan2.core :as t2])) |
Given a list of items, returns a map of frequencies to items. (group-by-frequency [:a :a :b :b :c :c :c]) ;; => {2 [:a :b] 3 [:c]} | (defn- group-by-frequency
[items]
(reduce (fn [acc [item cnt]]
(update acc cnt u/conjv item))
{}
(frequencies items))) |
(defn- increment-view-counts!*
[items]
(log/debugf "Increment view counts of %d items" (count items))
(try
(let [model->ids (reduce (fn [acc {:keys [id model]}]
(update acc model conj id))
{}
items)]
(doseq [[model ids] model->ids]
(let [cnt->ids (group-by-frequency ids)]
(t2/query {:update (t2/table-name model)
:set {:view_count [:+ :view_count (into [:case]
(mapcat (fn [[cnt ids]]
[[:in :id ids] cnt])
cnt->ids))]}
:where [:in :id (apply concat (vals cnt->ids))]}))))
(catch Exception e
(log/error e "Failed to increment view counts")))) | |
(def ^:private increment-view-count-interval-seconds 20) | |
(defonce ^:private
increase-view-count-queue
(delay (grouper/start!
increment-view-counts!*
:capacity 500
:interval (* increment-view-count-interval-seconds 1000)))) | |
Increment the view count of the given | (defn- increment-view-counts!
[model model-id]
(grouper/submit! @increase-view-count-queue {:model model :id model-id})) |
Simple base function for recording a view of a given | (mu/defn ^:private record-views!
[view-or-views :- [:or :map [:sequential :map]]]
(span/with-span!
{:name "record-view!"}
(when (premium-features/log-enabled?)
(t2/insert! :model/ViewLog view-or-views)))) |
Generates a view, given an event map. The event map either has an | (defn- generate-view
[& {:keys [model object-id object user-id has-access context]
:or {has-access true}}]
{:model (u/lower-case-en (audit-log/model-name (or model object)))
:user_id (or user-id api/*current-user-id*)
:model_id (or object-id (u/id object))
:has_access has-access
:context context}) |
(derive ::card-read-event :metabase/event) (derive :event/card-read ::card-read-event) | |
(m/defmethod events/publish-event! ::card-read-event
"Handle processing for a generic read event notification"
[topic {:keys [object-id user-id] :as event}]
(span/with-span!
{:name "view-log-card-read"
:topic topic
:user-id user-id}
(try
(increment-view-counts! :model/Card object-id)
(record-views! (generate-view :model :model/Card event))
(catch Throwable e
(log/warnf e "Failed to process view event. %s" topic))))) | |
(derive ::dashboard-queried :metabase/event) (derive :event/dashboard-queried ::dashboard-queried) | |
(def ^:private update-dashboard-last-viewed-at-interval-seconds 20) | |
(defn- update-dashboard-last-viewed-at!* [dashboard-id-timestamps]
(let [dashboard-id->timestamp (update-vals (group-by :id dashboard-id-timestamps)
(fn [xs] (apply t/max (map :timestamp xs))))]
(try
(t2/update! :model/Dashboard :id [:in (keys dashboard-id->timestamp)]
{:last_viewed_at (into [:case]
(mapcat (fn [[id timestamp]]
[[:= :id id] [:greatest [:coalesce :last_viewed_at (t/offset-date-time 0)] timestamp]])
dashboard-id->timestamp))})
(catch Exception e
(log/error e "Failed to update dashboard last_viewed_at"))))) | |
(def ^:private update-dashboard-last-viewed-at-queue
(delay (grouper/start!
update-dashboard-last-viewed-at!*
:capacity 500
:interval (* update-dashboard-last-viewed-at-interval-seconds 1000)))) | |
Update the | (defn- update-dashboard-last-viewed-at!
[dashboard-id]
(let [now (t/offset-date-time)]
(grouper/submit! @update-dashboard-last-viewed-at-queue {:id dashboard-id
:timestamp now}))) |
(m/defmethod events/publish-event! ::dashboard-queried
"Handle processing for a dashboard query being run"
[topic {:keys [object-id] :as _event}]
(try
(update-dashboard-last-viewed-at! object-id)
(catch Throwable e
(log/warnf e "Failed to process dashboard query event. %s" topic)))) | |
(derive ::collection-read-event :metabase/event) (derive :event/collection-read ::collection-read-event) | |
(m/defmethod events/publish-event! ::collection-read-event
"Handle processing for a generic read event notification"
[topic event]
(try
(-> event
generate-view
record-views!)
(catch Throwable e
(log/warnf e "Failed to process view event. %s" topic)))) | |
(derive ::read-permission-failure :metabase/event) (derive :event/read-permission-failure ::read-permission-failure) | |
(m/defmethod events/publish-event! ::read-permission-failure
"Handle processing for a generic read event notification"
[topic {:keys [object] :as event}]
(try
;; Only log permission check failures for Cards and Dashboards. This set can be expanded if we add view logging of
;; other models.
(when (#{:model/Card :model/Dashboard} (t2/model object))
(-> event
generate-view
record-views!))
(catch Throwable e
(log/warnf e "Failed to process view event. %s" topic)))) | |
(derive ::dashboard-read :metabase/event) (derive :event/dashboard-read ::dashboard-read) | |
(m/defmethod events/publish-event! ::dashboard-read
"Handle processing for the dashboard read event. Logs the dashboard view. Card views are logged separately."
[topic {:keys [object-id user-id] :as event}]
(span/with-span!
{:name "view-log-dashboard-read"
:topic topic
:user-id user-id}
(try
(increment-view-counts! :model/Dashboard object-id)
(record-views! (generate-view :model :model/Dashboard event))
(catch Throwable e
(log/warnf e "Failed to process view event. %s" topic))))) | |
(derive ::table-read :metabase/event) (derive :event/table-read ::table-read) | |
(m/defmethod events/publish-event! ::table-read
"Handle processing for the table read event. Does a basic permissions check to see if the the user has data perms for
the table."
[topic {:keys [object user-id] :as event}]
(span/with-span!
{:name "view-log-table-read"
:topic topic
:user-id user-id}
(try
(increment-view-counts! :model/Table (:id object))
(let [table-id (u/id object)
database-id (:db_id object)
has-access? (when (= api/*current-user-id* user-id)
(query-perms/can-query-table? database-id table-id))]
(-> event
(assoc :has-access has-access?)
generate-view
record-views!))
(catch Throwable e
(log/warnf e "Failed to process view event. %s" topic))))) | |