(ns metabase.models.pulse-channel (:require [clojure.set :as set] [medley.core :as m] [metabase.config :as config] [metabase.models.interface :as mi] [metabase.plugins.classloader :as classloader] [metabase.util :as u] [metabase.util.i18n :refer [tru]] [methodical.core :as methodical] [toucan2.core :as t2])) | |
Static Definitions | |
Simple NOTE: order is important here!!
we use the same ordering as the clj-time | (def days-of-week [{:id "mon", :name "Mon"}, {:id "tue", :name "Tue"}, {:id "wed", :name "Wed"}, {:id "thu", :name "Thu"}, {:id "fri", :name "Fri"}, {:id "sat", :name "Sat"}, {:id "sun", :name "Sun"}]) |
Is | (def ^{:arglists '([day])} day-of-week? (partial contains? (set (map :id days-of-week)))) |
Is | (defn hour-of-day? [hour] (and (integer? hour) (<= 0 hour 23))) |
Set of possible schedule-frames allow for a PulseChannel. | (def ^:private schedule-frames #{:first :mid :last}) |
Is | (defn schedule-frame? [frame] (contains? schedule-frames frame)) |
Set of the possible schedule-types allowed for a PulseChannel. | (def ^:private schedule-types #{:hourly :daily :weekly :monthly}) |
Is | (defn schedule-type? [schedule-type] (contains? schedule-types schedule-type)) |
Is this combination of scheduling choices valid? | (defn valid-schedule? [schedule-type schedule-hour schedule-day schedule-frame] (or ;; hourly schedule does not care about other inputs (= schedule-type :hourly) ;; daily schedule requires a valid `hour` (and (= schedule-type :daily) (hour-of-day? schedule-hour)) ;; weekly schedule requires a valid `hour` and `day` (and (= schedule-type :weekly) (hour-of-day? schedule-hour) (day-of-week? schedule-day)) ;; monthly schedule requires a valid `hour` and `frame`. also a `day` if frame = first or last (and (= schedule-type :monthly) (schedule-frame? schedule-frame) (hour-of-day? schedule-hour) (or (contains? #{:first :last} schedule-frame) (and (= :mid schedule-frame) (nil? schedule-day)))))) |
Map which contains the definitions for each type of pulse channel we allow. Each key is a channel type with a map which contains any other relevant information for defining the channel. E.g. {:email {:name "Email", :recipients? true} :slack {:name "Slack", :recipients? false}} | (def channel-types {:email {:type "email" :name "Email" :allows_recipients true :recipients ["user" "email"] :schedules [:hourly :daily :weekly :monthly]} :slack {:type "slack" :name "Slack" :allows_recipients false :schedules [:hourly :daily :weekly :monthly] :fields [{:name "channel" :type "select" :displayName "Post to" :options [] :required true}]} :http {:type "http" :name "Webhook" :allows_recipients false :schedules [:hourly :daily :weekly :monthly]}}) |
Is | (defn channel-type? [channel-type] (contains? (set (keys channel-types)) channel-type)) |
Does given | (defn supports-recipients? [channel] (boolean (:allows_recipients (get channel-types channel)))) |
Entity | |
Used to be the toucan1 model name defined using [[toucan.models/defmodel]], not it's a reference to the toucan2 model name. We'll keep this till we replace all these symbols in our codebase. | (def PulseChannel :model/PulseChannel) |
(methodical/defmethod t2/table-name :model/PulseChannel [_model] :pulse_channel) (methodical/defmethod t2/model-for-automagic-hydration [:default :pulse_channel] [_original-model _k] :model/PulseChannel) | |
(doto :model/PulseChannel (derive :metabase/model) (derive :hook/timestamped?) (derive :hook/entity-id) (derive ::mi/read-policy.always-allow) (derive ::mi/write-policy.superuser)) | |
(t2/deftransforms :model/PulseChannel {:details mi/transform-json :channel_type mi/transform-keyword :schedule_type mi/transform-keyword :schedule_frame mi/transform-keyword}) | |
(methodical/defmethod t2/batched-hydrate [:default :recipients] [_model _k pcs] (when (seq pcs) (let [pcid->recipients (-> (group-by :pulse_channel_id (t2/select [:model/User :id :email :first_name :last_name :pcr.pulse_channel_id] {:left-join [[:pulse_channel_recipient :pcr] [:= :core_user.id :pcr.user_id]] :where [:and [:in :pcr.pulse_channel_id (map :id pcs)] [:= :core_user.is_active true]] :order-by [[:core_user.id :asc]]})) (update-vals #(map (fn [user] (dissoc user :pulse_channel_id)) %)))] (for [pc pcs] (assoc pc :recipients (concat (for [email (get-in pc [:details :emails] [])] {:email email}) (get pcid->recipients (:id pc)))))))) | |
(defn- update-send-pulse-trigger-if-needed! [& args] (classloader/require 'metabase.task.send-pulses) (apply (resolve 'metabase.task.send-pulses/update-send-pulse-trigger-if-needed!) args)) | |
Should we automatically archive a Pulse when its last | (def ^:dynamic *archive-parent-pulse-when-last-channel-is-deleted* true) |
(t2/define-before-delete :model/PulseChannel [{pulse-id :pulse_id, pulse-channel-id :id :as pulse-channel}] ;; This function is called by [[metabase.models.pulse-channel/pre-delete]] when the `PulseChannel` is about to be ;; deleted. Archives `Pulse` if the channel being deleted is its last channel." (when *archive-parent-pulse-when-last-channel-is-deleted* (let [other-channels-count (t2/count PulseChannel :pulse_id pulse-id, :id [:not= pulse-channel-id])] (when (zero? other-channels-count) (t2/update! :model/Pulse pulse-id {:archived true})))) ;; it's best if this is done in after-delete, but toucan2 doesn't support that yet See toucan2#70S ;; remove this pulse from its existing trigger (update-send-pulse-trigger-if-needed! pulse-id pulse-channel :remove-pc-ids #{(:id pulse-channel)})) | |
we want to load this at the top level so the Setting the namespace defines gets loaded | (def ^:private ^{:arglists '([email-addresses])} validate-email-domains* (or (when config/ee-available? (classloader/require 'metabase-enterprise.advanced-config.models.pulse-channel) (resolve 'metabase-enterprise.advanced-config.models.pulse-channel/validate-email-domains)) (constantly nil))) |
For channels that are being sent to raw email addresses: check that the domains in the emails are allowed by
the [[metabase-enterprise.advanced-config.models.pulse-channel/subscription-allowed-domains]] Setting, if set. This
will no-op if | (defn validate-email-domains [{{:keys [emails]} :details, :keys [recipients], :as pulse-channel}] ;; Raw email addresses can be in either `[:details :emails]` or in `:recipients`, depending on who is invoking this ;; function. Make sure we handle both situations. ;; ;; {:details {:emails [\"email@example.com\" ...]}} ;; ;; The Dashboard Subscription FE currently sends raw email address recipients in this format: ;; ;; {:recipients [{:email \"email@example.com\"} ...]} ;; (u/prog1 pulse-channel (let [raw-email-recipients (remove :id recipients) user-recipients (filter :id recipients) emails (concat emails (map :email raw-email-recipients))] (validate-email-domains* emails) ;; validate User `:id` & `:email` match up for User recipients. This is mostly to make sure people don't try to ;; be sneaky and pass in a valid User ID but different email so they can send test Pulses out to arbitrary email ;; addresses (when-let [user-ids (not-empty (into #{} (comp (filter some?) (map :id)) user-recipients))] (let [user-id->email (t2/select-pk->fn :email :model/User, :id [:in user-ids])] (doseq [{:keys [id email]} user-recipients :let [correct-email (get user-id->email id)]] (when-not correct-email (throw (ex-info (tru "User {0} does not exist." id) {:status-code 404}))) ;; only validate the email address if it was explicitly specified, which is not explicitly required. (when (and email (not= email correct-email)) (throw (ex-info (tru "Wrong email address for User {0}." id) {:status-code 403}))))))))) |
(t2/define-before-insert :model/PulseChannel [pulse-channel] (validate-email-domains pulse-channel)) | |
(t2/define-after-insert :model/PulseChannel [{:keys [pulse_id id] :as pulse-channel}] (u/prog1 pulse-channel (when (:enabled pulse-channel) (update-send-pulse-trigger-if-needed! pulse_id pulse-channel :add-pc-ids #{id})))) | |
(t2/define-before-update :model/PulseChannel [{:keys [pulse_id id] :as pulse-channel}] ;; IT's really best if this is done in after-update (let [changes (t2/changes pulse-channel)] ;; if there are changes in schedule ;; better be done in after-update, but t2/changes isn't available in after-update yet See toucan2#129 (when (some #(contains? #{:schedule_type :schedule_hour :schedule_day :schedule_frame} %) (keys changes)) ;; need to remove this PC from the existing trigger (update-send-pulse-trigger-if-needed! pulse_id (t2/original pulse-channel) :remove-pc-ids #{(:id pulse-channel)}) ;; create a new PC with the updated schedule (update-send-pulse-trigger-if-needed! pulse_id pulse-channel :add-pc-ids #{id})) (when (contains? changes :enabled) (if (:enabled changes) (update-send-pulse-trigger-if-needed! pulse_id pulse-channel :add-pc-ids #{(:id pulse-channel)}) (update-send-pulse-trigger-if-needed! pulse_id (t2/original pulse-channel) :remove-pc-ids #{(:id pulse-channel)})))) (validate-email-domains (mi/changes-with-pk pulse-channel))) | |
Persistence Functions | |
Update the
| (defn update-recipients! [id user-ids] {:pre [(integer? id) (coll? user-ids) (every? integer? user-ids)]} (let [recipients-old (set (t2/select-fn-set :user_id :model/PulseChannelRecipient, :pulse_channel_id id)) recipients-new (set user-ids) recipients+ (set/difference recipients-new recipients-old) recipients- (set/difference recipients-old recipients-new)] (when (seq recipients+) (let [vs (map #(assoc {:pulse_channel_id id} :user_id %) recipients+)] (t2/insert! :model/PulseChannelRecipient vs))) (when (seq recipients-) (t2/delete! (t2/table-name :model/PulseChannelRecipient) :pulse_channel_id id :user_id [:in recipients-])))) |
Updates an existing | (defn update-pulse-channel! [{:keys [id channel_type enabled details recipients schedule_type schedule_day schedule_hour schedule_frame] :or {details {} recipients []}}] {:pre [(integer? id) (channel-type? channel_type) (m/boolean? enabled) (schedule-type? schedule_type) (valid-schedule? schedule_type schedule_hour schedule_day schedule_frame) (coll? recipients) (every? map? recipients)]} (let [recipients-by-type (group-by integer? (filter identity (map #(or (:id %) (:email %)) recipients)))] (t2/update! PulseChannel id {:details (cond-> details (supports-recipients? channel_type) (assoc :emails (get recipients-by-type false))) :enabled enabled :schedule_type schedule_type :schedule_hour (when (not= schedule_type :hourly) schedule_hour) :schedule_day (when (contains? #{:weekly :monthly} schedule_type) schedule_day) :schedule_frame (when (= schedule_type :monthly) schedule_frame)}) (when (supports-recipients? channel_type) (update-recipients! id (or (get recipients-by-type true) []))))) |
Create a new | (defn create-pulse-channel! [{:keys [channel_type channel_id details enabled pulse_id recipients schedule_type schedule_day schedule_hour schedule_frame] :or {details {} recipients []}}] {:pre [(channel-type? channel_type) (integer? pulse_id) (boolean? enabled) (schedule-type? schedule_type) (valid-schedule? schedule_type schedule_hour schedule_day schedule_frame) (coll? recipients) (every? map? recipients)]} (let [recipients-by-type (group-by integer? (filter identity (map #(or (:id %) (:email %)) recipients))) id (t2/insert-returning-pk! PulseChannel :pulse_id pulse_id :channel_type channel_type :channel_id channel_id :details (cond-> details (supports-recipients? channel_type) (assoc :emails (get recipients-by-type false))) :enabled enabled :schedule_type schedule_type :schedule_hour (when (not= schedule_type :hourly) schedule_hour) :schedule_day (when (contains? #{:weekly :monthly} schedule_type) schedule_day) :schedule_frame (when (= schedule_type :monthly) schedule_frame))] (when (and (supports-recipients? channel_type) (seq (get recipients-by-type true))) (update-recipients! id (get recipients-by-type true))) ;; return the id of our newly created channel id)) |
(methodical/defmethod mi/to-json PulseChannel "Don't include `:emails`, we use that purely internally" [pulse-channel json-generator] (next-method (m/dissoc-in pulse-channel [:details :emails]) json-generator)) | |