Non-identifying fingerprinters for various field types. | (ns metabase.analyze.fingerprint.fingerprinters (:require [bigml.histogram.core :as hist] [java-time.api :as t] [kixi.stats.core :as stats] [kixi.stats.math :as math] [medley.core :as m] [metabase.analyze.classifiers.name :as classifiers.name] [metabase.sync.util :as sync-util] [metabase.util :as u] [metabase.util.date-2 :as u.date] [metabase.util.log] [metabase.util.performance :as perf] [redux.core :as redux]) (:import (com.bigml.histogram Histogram) (com.clearspring.analytics.stream.cardinality HyperLogLogPlus) (java.time ZoneOffset) (java.time.chrono ChronoLocalDateTime ChronoZonedDateTime) (java.time.temporal Temporal))) |
(set! *warn-on-reflection* true) | |
Apply reducing functinons | (defn col-wise [& rfs] (let [rfs (vec rfs)] (fn ([] (perf/mapv (fn [rf] (rf)) rfs)) ([accs] (perf/mapv (fn [rf acc] (rf (unreduced acc))) rfs accs)) ([accs row] (let [all-reduced? (volatile! true) results (perf/mapv (fn [rf acc x] (if-not (reduced? acc) (do (vreset! all-reduced? false) (rf acc x)) acc)) rfs accs row)] (if @all-reduced? (reduced results) results)))))) |
Constantly return | (defn constant-fingerprinter [init] (fn ([] (reduced init)) ([_] init) ([_ _] (reduced init)))) |
Transducer that sketches cardinality using HyperLogLog++. https://research.google.com/pubs/pub40671.html | (defn- cardinality ([] (HyperLogLogPlus. 14 25)) ([^HyperLogLogPlus acc] (.cardinality acc)) ([^HyperLogLogPlus acc x] (.offer acc x) acc)) |
Wrap each map value in try-catch block. | (defmacro robust-map [& kvs] `(hash-map ~@(apply concat (for [[k v] (partition 2 kvs)] `[~k (try ~v (catch Throwable _#))])))) |
This macro and its usage is written in a specific way to ensure that try-catch blocks don't produce closures. | (defmacro ^:private do-with-error-handling [form action-on-exception msg] `(if sync-util/*log-exceptions-and-continue?* (try ~form (catch Throwable e# (metabase.util.log/warn e# ~msg) (~action-on-exception e#))) ~form)) |
Wrap | (defn with-error-handling [rf msg] ;; This function is written in a specific way to ensure that try-catch blocks don't produce closures. (fn ([] (do-with-error-handling (rf) reduced msg)) ([acc] (if (or (reduced? acc) (instance? Throwable acc)) (unreduced acc) (do-with-error-handling (unreduced (rf acc)) identity msg))) ([acc e] (do-with-error-handling (rf acc e) reduced msg)))) |
Like | (defn robust-fuse [kfs] (redux/fuse (m/map-kv-vals (fn [k f] (redux/post-complete (with-error-handling f (format "Error reducing %s" (name k))) (fn [result] (when-not (instance? Throwable result) result)))) kfs))) |
Return a fingerprinter transducer for a given field based on the field's type. | (defmulti fingerprinter {:arglists '([field])} (fn [{base-type :base_type, effective-type :effective_type, semantic-type :semantic_type, :keys [unit]}] [(cond (u.date/extract-units unit) :type/Integer ;; for historical reasons the Temporal fingerprinter is still called `:type/DateTime` so anything that derives ;; from `Temporal` (such as DATEs and TIMEs) should still use the `:type/DateTime` fingerprinter (isa? (or effective-type base-type) :type/Temporal) :type/DateTime :else base-type) (if (isa? semantic-type :Semantic/*) semantic-type :Semantic/*) (if (isa? semantic-type :Relation/*) semantic-type :Relation/*)])) |
(defn- global-fingerprinter [] (redux/post-complete (robust-fuse {:distinct-count cardinality :nil% (stats/share nil?)}) (partial hash-map :global))) | |
(defmethod fingerprinter :default [_] (global-fingerprinter)) | |
(defmethod fingerprinter [:type/* :Semantic/* :type/FK] [_] (global-fingerprinter)) | |
(defmethod fingerprinter [:type/* :Semantic/* :type/PK] [_] (constant-fingerprinter nil)) | |
(prefer-method fingerprinter [:type/* :Semantic/* :type/FK] [:type/Number :Semantic/* :Relation/*]) (prefer-method fingerprinter [:type/* :Semantic/* :type/FK] [:type/Text :Semantic/* :Relation/*]) (prefer-method fingerprinter [:type/* :Semantic/* :type/PK] [:type/Number :Semantic/* :Relation/*]) (prefer-method fingerprinter [:type/* :Semantic/* :type/PK] [:type/Text :Semantic/* :Relation/*]) (prefer-method fingerprinter [:type/DateTime :Semantic/* :Relation/*] [:type/* :Semantic/* :type/PK]) (prefer-method fingerprinter [:type/DateTime :Semantic/* :Relation/*] [:type/* :Semantic/* :type/FK]) | |
(defn- with-global-fingerprinter [fingerprinter] (redux/post-complete (redux/juxt fingerprinter (global-fingerprinter)) (fn [[type-fingerprint global-fingerprint]] (merge global-fingerprint type-fingerprint)))) | |
(defmacro ^:private deffingerprinter [field-type transducer] {:pre [(keyword? field-type)]} (let [field-type [field-type :Semantic/* :Relation/*]] `(defmethod fingerprinter ~field-type [field#] (with-error-handling (with-global-fingerprinter (redux/post-complete ~transducer (fn [fingerprint#] {:type {~(first field-type) fingerprint#}}))) (format "Error generating fingerprint for %s" (sync-util/name-for-logging field#)))))) | |
(declare ->temporal) | |
(defn- earliest ([] nil) ([acc] (some-> acc u.date/format)) ([acc t] (if (and t acc (t/before? t acc)) t (or acc t)))) | |
(defn- latest ([] nil) ([acc] (some-> acc u.date/format)) ([acc t] (if (and t acc (t/after? t acc)) t (or acc t)))) | |
Protocol for converting objects in resultset to a | (defprotocol ^:private ITemporalCoerceable (->temporal ^java.time.temporal.Temporal [this] "Coerce object to a `java.time` temporal type.")) |
(extend-protocol ITemporalCoerceable nil (->temporal [_] nil) String (->temporal [this] (->temporal (u.date/parse this))) Long (->temporal [this] (->temporal (t/instant this))) Integer (->temporal [this] (->temporal (t/instant this))) ChronoLocalDateTime (->temporal [this] (.toInstant this ZoneOffset/UTC)) ChronoZonedDateTime (->temporal [this] (.toInstant this)) Temporal (->temporal [this] this) java.util.Date (->temporal [this] (t/instant this))) | |
(deffingerprinter :type/DateTime ((map ->temporal) (robust-fuse {:earliest earliest :latest latest}))) | |
Transducer that summarizes numerical data with a histogram. | (defn- histogram ([] (hist/create)) ([^Histogram histogram] histogram) ([^Histogram histogram x] (hist/insert-simple! histogram x))) |
(deffingerprinter :type/Number (redux/post-complete ((filter u/real-number?) histogram) (fn [h] (let [{q1 0.25 q3 0.75} (hist/percentiles h 0.25 0.75)] (robust-map :min (hist/minimum h) :max (hist/maximum h) :avg (hist/mean h) :sd (some-> h hist/variance math/sqrt) :q1 q1 :q3 q3))))) | |
Is x a serialized JSON dictionary or array. Hueristically recognize maps and arrays. Uses the following strategies: - leading character {: assume valid JSON - leading character [: assume valid json unless its of the form [ident] where ident is not a boolean. | (defn- valid-serialized-json? [x] (u/ignore-exceptions (when (and x (string? x)) (let [matcher (case (first x) \[ (fn bracket-matcher [s] (cond (re-find #"^\[\s*(?:true|false)" s) true (re-find #"^\[\s*[a-zA-Z]" s) false :else true)) \{ (constantly true) (constantly false))] (matcher x))))) |
(deffingerprinter :type/Text ((map str) ; we cast to str to support `field-literal` type overwriting: ; `[:field-literal "A_NUMBER" :type/Text]` (which still ; returns numbers in the result set) (robust-fuse {:percent-json (stats/share valid-serialized-json?) :percent-url (stats/share u/url?) :percent-email (stats/share u/email?) :percent-state (stats/share u/state?) :average-length ((map count) stats/mean)}))) | |
Return a transducer for fingerprinting a resultset with fields | (defn fingerprint-fields [fields] (apply col-wise (for [field fields] (fingerprinter (cond-> field ;; Try to get a better guestimate of what we're dealing with on first sync (every? nil? ((juxt :semantic_type :last_analyzed) field)) (assoc :semantic_type (classifiers.name/infer-semantic-type-by-name field))))))) |