Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle null Timestamp values #175

Merged
merged 2 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions python/vegafusion/tests/test_pretransform.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as pd
from pandas import Timestamp, NaT
import vegafusion as vf
import json
from datetime import date
Expand Down Expand Up @@ -664,6 +665,136 @@ def period_in_col_name_spec():
}
"""


def nat_bar_spec():
return r"""
{
"$schema": "https://vega.github.io/schema/vega/v5.json",
"background": "white",
"padding": 5,
"width": 200,
"height": 200,
"style": "cell",
"data": [
{
"name": "dataframe",
"url": "vegafusion+dataset://dataframe",
"format": {"type": "json", "parse": {"NULL_TEST": "date"}},
"transform": [
{
"type": "stack",
"groupby": ["NULL_TEST"],
"field": "SALES",
"sort": {"field": [], "order": []},
"as": ["SALES_start", "SALES_end"],
"offset": "zero"
},
{
"type": "filter",
"expr": "(isDate(datum[\"NULL_TEST\"]) || (isValid(datum[\"NULL_TEST\"]) && isFinite(+datum[\"NULL_TEST\"]))) && isValid(datum[\"SALES\"]) && isFinite(+datum[\"SALES\"])"
}
]
}
],
"marks": [
{
"name": "layer_0_marks",
"type": "rect",
"clip": true,
"style": ["bar"],
"from": {"data": "dataframe"},
"encode": {
"update": {
"tooltip": {
"signal": "{\"NULL_TEST\": timeFormat(datum[\"NULL_TEST\"], '%b %d, %Y'), \"SALES\": format(datum[\"SALES\"], \"\")}"
},
"fill": {"value": "#4c78a8"},
"ariaRoleDescription": {"value": "bar"},
"description": {
"signal": "\"NULL_TEST: \" + (timeFormat(datum[\"NULL_TEST\"], '%b %d, %Y')) + \"; SALES: \" + (format(datum[\"SALES\"], \"\"))"
},
"xc": {"scale": "x", "field": "NULL_TEST"},
"width": {"value": 5},
"y": {"scale": "y", "field": "SALES_end"},
"y2": {"scale": "y", "field": "SALES_start"}
}
}
}
],
"scales": [
{
"name": "x",
"type": "time",
"domain": {"data": "dataframe", "field": "NULL_TEST"},
"range": [0, {"signal": "width"}],
"padding": 5
},
{
"name": "y",
"type": "linear",
"domain": {"data": "dataframe", "fields": ["SALES_start", "SALES_end"]},
"range": [{"signal": "height"}, 0],
"nice": true,
"zero": true
}
],
"axes": [
{
"scale": "x",
"orient": "bottom",
"gridScale": "y",
"grid": true,
"tickCount": {"signal": "ceil(width/40)"},
"domain": false,
"labels": false,
"aria": false,
"maxExtent": 0,
"minExtent": 0,
"ticks": false,
"zindex": 0
},
{
"scale": "y",
"orient": "left",
"gridScale": "x",
"grid": true,
"tickCount": {"signal": "ceil(height/40)"},
"domain": false,
"labels": false,
"aria": false,
"maxExtent": 0,
"minExtent": 0,
"ticks": false,
"zindex": 0
},
{
"scale": "x",
"orient": "bottom",
"grid": false,
"title": "NULL_TEST",
"labelFlush": true,
"labelOverlap": true,
"tickCount": {"signal": "ceil(width/40)"},
"zindex": 0
},
{
"scale": "y",
"orient": "left",
"grid": false,
"title": "SALES",
"labelOverlap": true,
"tickCount": {"signal": "ceil(height/40)"},
"zindex": 0
}
],
"config": {
"range": {"ramp": {"scheme": "yellowgreenblue"}},
"axis": {"domain": false}
}
}
"""


def test_pre_transform_multi_partition():
n = 4050
order_items = pd.DataFrame({
Expand Down Expand Up @@ -789,3 +920,49 @@ def test_period_in_column_name():

dataset = datasets[0]
assert dataset.to_dict("records") == [{"normal": 1, "a.b": 2}]


def test_nat_values():
dataframe = pd.DataFrame([
{'ORDER_DATE': date(2011, 3, 1),
'SALES': 457.568,
'NULL_TEST': Timestamp('2011-03-01 00:00:00')},
{'ORDER_DATE': date(2011, 3, 1),
'SALES': 376.509,
'NULL_TEST': Timestamp('2011-03-01 00:00:00')},
{'ORDER_DATE': date(2011, 3, 1),
'SALES': 362.25,
'NULL_TEST': Timestamp('2011-03-01 00:00:00')},
{'ORDER_DATE': date(2011, 3, 1),
'SALES': 129.552,
'NULL_TEST': Timestamp('2011-03-01 00:00:00')},
{'ORDER_DATE': date(2011, 3, 1), 'SALES': 18.84, 'NULL_TEST': NaT},
{'ORDER_DATE': date(2011, 4, 1),
'SALES': 66.96,
'NULL_TEST': Timestamp('2011-04-01 00:00:00')},
{'ORDER_DATE': date(2011, 4, 1), 'SALES': 6.24, 'NULL_TEST': NaT},
{'ORDER_DATE': date(2011, 6, 1),
'SALES': 881.93,
'NULL_TEST': Timestamp('2011-06-01 00:00:00')},
{'ORDER_DATE': date(2011, 6, 1),
'SALES': 166.72,
'NULL_TEST': Timestamp('2011-06-01 00:00:00')},
{'ORDER_DATE': date(2011, 6, 1), 'SALES': 25.92, 'NULL_TEST': NaT}
])

spec = nat_bar_spec()

datasets, warnings = vf.runtime.pre_transform_datasets(spec, ["dataframe"], "UTC", inline_datasets=dict(
dataframe=dataframe
))
assert len(warnings) == 0
assert len(datasets) == 1

dataset = datasets[0]
assert dataset.to_dict("records")[0] == {
'NULL_TEST': '2011-03-01T00:00:00.000',
'ORDER_DATE': Timestamp('2011-03-01 00:00:00'),
'SALES': 457.568,
'SALES_end': 457.568,
'SALES_start': 0.0,
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion_expr::{
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_core::arrow::array::{ArrayRef, TimestampMillisecondArray};
use vegafusion_core::arrow::compute::{cast, unary};
use vegafusion_core::arrow::compute::cast;
use vegafusion_core::arrow::datatypes::{DataType, TimeUnit};
use vegafusion_core::data::scalar::ScalarValue;

Expand Down Expand Up @@ -69,30 +69,40 @@ pub fn convert_timezone(
millis_array: &TimestampMillisecondArray,
tz: chrono_tz::Tz,
) -> TimestampMillisecondArray {
unary(millis_array, |v| {
// Build naive datetime for time
let seconds = v / 1000;
let milliseconds = v % 1000;
let nanoseconds = (milliseconds * 1_000_000) as u32;
let naive_local_datetime = NaiveDateTime::from_timestamp(seconds, nanoseconds);
TimestampMillisecondArray::from(
millis_array
.iter()
.map(|v| {
v.map(|v| {
// Build naive datetime for time
let seconds = v / 1000;
let milliseconds = v % 1000;
let nanoseconds = (milliseconds * 1_000_000) as u32;
let naive_local_datetime = NaiveDateTime::from_timestamp(seconds, nanoseconds);

// Get UTC offset when the naive datetime is considered to be in local time
let local_datetime = if let Some(local_datetime) =
tz.from_local_datetime(&naive_local_datetime).earliest()
{
local_datetime
} else {
// Try adding 1 hour to handle daylight savings boundaries
let hour = naive_local_datetime.hour();
let new_naive_local_datetime = naive_local_datetime.with_hour(hour + 1).unwrap();
tz.from_local_datetime(&new_naive_local_datetime)
.earliest()
.unwrap_or_else(|| panic!("Failed to convert {:?}", naive_local_datetime))
};
// Get UTC offset when the naive datetime is considered to be in local time
let local_datetime = if let Some(local_datetime) =
tz.from_local_datetime(&naive_local_datetime).earliest()
{
local_datetime
} else {
// Try adding 1 hour to handle daylight savings boundaries
let hour = naive_local_datetime.hour();
let new_naive_local_datetime =
naive_local_datetime.with_hour(hour + 1).unwrap();
tz.from_local_datetime(&new_naive_local_datetime)
.earliest()
.unwrap_or_else(|| {
panic!("Failed to convert {:?}", naive_local_datetime)
})
};

// Get timestamp millis (in UTC)
local_datetime.timestamp_millis()
})
// Get timestamp millis (in UTC)
local_datetime.timestamp_millis()
})
})
.collect::<Vec<Option<_>>>(),
)
}

pub fn to_timestamp_ms(array: &ArrayRef) -> Result<ArrayRef, DataFusionError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use datafusion_expr::{
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_core::arrow::array::{ArrayRef, TimestampMillisecondArray};
use vegafusion_core::arrow::compute::unary;
use vegafusion_core::arrow::datatypes::{DataType, TimeUnit};
use vegafusion_core::data::scalar::ScalarValue;

Expand Down Expand Up @@ -39,19 +38,27 @@ pub fn make_timestamptz_to_timestamp() -> ScalarUDF {
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();

let timestamp_array: TimestampMillisecondArray = unary(timestamp_array, |v| {
// Build naive datetime for time
let seconds = v / 1000;
let milliseconds = v % 1000;
let nanoseconds = (milliseconds * 1_000_000) as u32;
let naive_utc_datetime = NaiveDateTime::from_timestamp(seconds, nanoseconds);
let timestamp_array = TimestampMillisecondArray::from(
timestamp_array
.iter()
.map(|v| {
v.map(|v| {
// Build naive datetime for time
let seconds = v / 1000;
let milliseconds = v % 1000;
let nanoseconds = (milliseconds * 1_000_000) as u32;
let naive_utc_datetime =
NaiveDateTime::from_timestamp(seconds, nanoseconds);

// Create local datetime, interpreting the naive datetime as utc
let local_datetime = tz.from_utc_datetime(&naive_utc_datetime);
let naive_local_datetime = local_datetime.naive_local();
// Create local datetime, interpreting the naive datetime as utc
let local_datetime = tz.from_utc_datetime(&naive_utc_datetime);
let naive_local_datetime = local_datetime.naive_local();

naive_local_datetime.timestamp_millis()
});
naive_local_datetime.timestamp_millis()
})
})
.collect::<Vec<Option<_>>>(),
);

let timestamp_array = Arc::new(timestamp_array) as ArrayRef;

Expand Down