(
defmethod
qp.si/streaming-results-writer
:json
[
_
^
OutputStream
os
]
(
let
[
writer
(
BufferedWriter.
(
OutputStreamWriter.
os
StandardCharsets/UTF_8
)
)
col-names
(
volatile!
nil
)
ordered-formatters
(
volatile!
nil
)
pivot-grouping-idx
(
volatile!
nil
)
]
(
reify
qp.si/StreamingResultsWriter
(
begin!
[
_
{
{
:keys
[
ordered-cols
results_timezone
format-rows?
]
:or
{
format-rows?
true
}
}
:data
}
viz-settings
]
(
let
[
cols
(
common/column-titles
ordered-cols
(
::mb.viz/column-settings
viz-settings
)
format-rows?
)
pivot-grouping
(
qp.pivot.postprocess/pivot-grouping-key
cols
)
]
(
when
pivot-grouping
(
vreset!
pivot-grouping-idx
pivot-grouping
)
)
(
let
[
names
(
cond->>
cols
pivot-grouping
(
m/remove-nth
pivot-grouping
)
)
]
(
vreset!
col-names
names
)
)
(
vreset!
ordered-formatters
(
mapv
#(
formatter/create-formatter
results_timezone
%
viz-settings
format-rows?
)
ordered-cols
)
)
(
.write
writer
"[\n"
)
)
)
(
write-row!
[
_
row
row-num
_
{
:keys
[
output-order
]
}
]
(
let
[
ordered-row
(
vec
(
if
output-order
(
let
[
row-v
(
into
[
]
row
)
]
(
for
[
i
output-order
]
(
row-v
i
)
)
)
row
)
)
pivot-grouping-key
@
pivot-grouping-idx
group
(
get
ordered-row
pivot-grouping-key
)
cleaned-row
(
cond->>
ordered-row
pivot-grouping-key
(
m/remove-nth
pivot-grouping-key
)
)
]
(
when
(
or
(
not
group
)
(
=
qp.pivot.postprocess/NON_PIVOT_ROW_GROUP
(
int
group
)
)
)
(
when-not
(
zero?
row-num
)
(
.write
writer
",\n"
)
)
(
json/encode-to
(
zipmap
@
col-names
(
map
(
fn
[
formatter
r
]
(
let
[
res
(
formatter
(
common/format-value
r
)
)
]
(
if-some
[
num-str
(
:num-str
res
)
]
num-str
res
)
)
)
@
ordered-formatters
cleaned-row
)
)
writer
{
}
)
(
.flush
writer
)
)
)
)
(
finish!
[
_
_
]
(
.write
writer
"\n]"
)
(
.flush
writer
)
(
.flush
os
)
(
.close
writer
)
)
)
)
)