Skip to content

Commit

Permalink
Handle null Timestamp values (#175)
Browse files Browse the repository at this point in the history
* Handle NaT Timestamp values in timestamp_to_timestamptz
* Handle NaT Timestamp values in timestamptz_to_timestamp
  • Loading branch information
jonmmease authored Nov 29, 2022
1 parent 2f40054 commit ed3a760
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 35 deletions.
177 changes: 177 additions & 0 deletions python/vegafusion/tests/test_pretransform.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as pd
from pandas import Timestamp, NaT
import vegafusion as vf
import json
from datetime import date
Expand Down Expand Up @@ -664,6 +665,136 @@ def period_in_col_name_spec():
}
"""


def nat_bar_spec():
return r"""
{
"$schema": "https://vega.github.io/schema/vega/v5.json",
"background": "white",
"padding": 5,
"width": 200,
"height": 200,
"style": "cell",
"data": [
{
"name": "dataframe",
"url": "vegafusion+dataset://dataframe",
"format": {"type": "json", "parse": {"NULL_TEST": "date"}},
"transform": [
{
"type": "stack",
"groupby": ["NULL_TEST"],
"field": "SALES",
"sort": {"field": [], "order": []},
"as": ["SALES_start", "SALES_end"],
"offset": "zero"
},
{
"type": "filter",
"expr": "(isDate(datum[\"NULL_TEST\"]) || (isValid(datum[\"NULL_TEST\"]) && isFinite(+datum[\"NULL_TEST\"]))) && isValid(datum[\"SALES\"]) && isFinite(+datum[\"SALES\"])"
}
]
}
],
"marks": [
{
"name": "layer_0_marks",
"type": "rect",
"clip": true,
"style": ["bar"],
"from": {"data": "dataframe"},
"encode": {
"update": {
"tooltip": {
"signal": "{\"NULL_TEST\": timeFormat(datum[\"NULL_TEST\"], '%b %d, %Y'), \"SALES\": format(datum[\"SALES\"], \"\")}"
},
"fill": {"value": "#4c78a8"},
"ariaRoleDescription": {"value": "bar"},
"description": {
"signal": "\"NULL_TEST: \" + (timeFormat(datum[\"NULL_TEST\"], '%b %d, %Y')) + \"; SALES: \" + (format(datum[\"SALES\"], \"\"))"
},
"xc": {"scale": "x", "field": "NULL_TEST"},
"width": {"value": 5},
"y": {"scale": "y", "field": "SALES_end"},
"y2": {"scale": "y", "field": "SALES_start"}
}
}
}
],
"scales": [
{
"name": "x",
"type": "time",
"domain": {"data": "dataframe", "field": "NULL_TEST"},
"range": [0, {"signal": "width"}],
"padding": 5
},
{
"name": "y",
"type": "linear",
"domain": {"data": "dataframe", "fields": ["SALES_start", "SALES_end"]},
"range": [{"signal": "height"}, 0],
"nice": true,
"zero": true
}
],
"axes": [
{
"scale": "x",
"orient": "bottom",
"gridScale": "y",
"grid": true,
"tickCount": {"signal": "ceil(width/40)"},
"domain": false,
"labels": false,
"aria": false,
"maxExtent": 0,
"minExtent": 0,
"ticks": false,
"zindex": 0
},
{
"scale": "y",
"orient": "left",
"gridScale": "x",
"grid": true,
"tickCount": {"signal": "ceil(height/40)"},
"domain": false,
"labels": false,
"aria": false,
"maxExtent": 0,
"minExtent": 0,
"ticks": false,
"zindex": 0
},
{
"scale": "x",
"orient": "bottom",
"grid": false,
"title": "NULL_TEST",
"labelFlush": true,
"labelOverlap": true,
"tickCount": {"signal": "ceil(width/40)"},
"zindex": 0
},
{
"scale": "y",
"orient": "left",
"grid": false,
"title": "SALES",
"labelOverlap": true,
"tickCount": {"signal": "ceil(height/40)"},
"zindex": 0
}
],
"config": {
"range": {"ramp": {"scheme": "yellowgreenblue"}},
"axis": {"domain": false}
}
}
"""


def test_pre_transform_multi_partition():
n = 4050
order_items = pd.DataFrame({
Expand Down Expand Up @@ -789,3 +920,49 @@ def test_period_in_column_name():

dataset = datasets[0]
assert dataset.to_dict("records") == [{"normal": 1, "a.b": 2}]


def test_nat_values():
dataframe = pd.DataFrame([
{'ORDER_DATE': date(2011, 3, 1),
'SALES': 457.568,
'NULL_TEST': Timestamp('2011-03-01 00:00:00')},
{'ORDER_DATE': date(2011, 3, 1),
'SALES': 376.509,
'NULL_TEST': Timestamp('2011-03-01 00:00:00')},
{'ORDER_DATE': date(2011, 3, 1),
'SALES': 362.25,
'NULL_TEST': Timestamp('2011-03-01 00:00:00')},
{'ORDER_DATE': date(2011, 3, 1),
'SALES': 129.552,
'NULL_TEST': Timestamp('2011-03-01 00:00:00')},
{'ORDER_DATE': date(2011, 3, 1), 'SALES': 18.84, 'NULL_TEST': NaT},
{'ORDER_DATE': date(2011, 4, 1),
'SALES': 66.96,
'NULL_TEST': Timestamp('2011-04-01 00:00:00')},
{'ORDER_DATE': date(2011, 4, 1), 'SALES': 6.24, 'NULL_TEST': NaT},
{'ORDER_DATE': date(2011, 6, 1),
'SALES': 881.93,
'NULL_TEST': Timestamp('2011-06-01 00:00:00')},
{'ORDER_DATE': date(2011, 6, 1),
'SALES': 166.72,
'NULL_TEST': Timestamp('2011-06-01 00:00:00')},
{'ORDER_DATE': date(2011, 6, 1), 'SALES': 25.92, 'NULL_TEST': NaT}
])

spec = nat_bar_spec()

datasets, warnings = vf.runtime.pre_transform_datasets(spec, ["dataframe"], "UTC", inline_datasets=dict(
dataframe=dataframe
))
assert len(warnings) == 0
assert len(datasets) == 1

dataset = datasets[0]
assert dataset.to_dict("records")[0] == {
'NULL_TEST': '2011-03-01T00:00:00.000',
'ORDER_DATE': Timestamp('2011-03-01 00:00:00'),
'SALES': 457.568,
'SALES_end': 457.568,
'SALES_start': 0.0,
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion_expr::{
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_core::arrow::array::{ArrayRef, TimestampMillisecondArray};
use vegafusion_core::arrow::compute::{cast, unary};
use vegafusion_core::arrow::compute::cast;
use vegafusion_core::arrow::datatypes::{DataType, TimeUnit};
use vegafusion_core::data::scalar::ScalarValue;

Expand Down Expand Up @@ -69,30 +69,40 @@ pub fn convert_timezone(
millis_array: &TimestampMillisecondArray,
tz: chrono_tz::Tz,
) -> TimestampMillisecondArray {
unary(millis_array, |v| {
// Build naive datetime for time
let seconds = v / 1000;
let milliseconds = v % 1000;
let nanoseconds = (milliseconds * 1_000_000) as u32;
let naive_local_datetime = NaiveDateTime::from_timestamp(seconds, nanoseconds);
TimestampMillisecondArray::from(
millis_array
.iter()
.map(|v| {
v.map(|v| {
// Build naive datetime for time
let seconds = v / 1000;
let milliseconds = v % 1000;
let nanoseconds = (milliseconds * 1_000_000) as u32;
let naive_local_datetime = NaiveDateTime::from_timestamp(seconds, nanoseconds);

// Get UTC offset when the naive datetime is considered to be in local time
let local_datetime = if let Some(local_datetime) =
tz.from_local_datetime(&naive_local_datetime).earliest()
{
local_datetime
} else {
// Try adding 1 hour to handle daylight savings boundaries
let hour = naive_local_datetime.hour();
let new_naive_local_datetime = naive_local_datetime.with_hour(hour + 1).unwrap();
tz.from_local_datetime(&new_naive_local_datetime)
.earliest()
.unwrap_or_else(|| panic!("Failed to convert {:?}", naive_local_datetime))
};
// Get UTC offset when the naive datetime is considered to be in local time
let local_datetime = if let Some(local_datetime) =
tz.from_local_datetime(&naive_local_datetime).earliest()
{
local_datetime
} else {
// Try adding 1 hour to handle daylight savings boundaries
let hour = naive_local_datetime.hour();
let new_naive_local_datetime =
naive_local_datetime.with_hour(hour + 1).unwrap();
tz.from_local_datetime(&new_naive_local_datetime)
.earliest()
.unwrap_or_else(|| {
panic!("Failed to convert {:?}", naive_local_datetime)
})
};

// Get timestamp millis (in UTC)
local_datetime.timestamp_millis()
})
// Get timestamp millis (in UTC)
local_datetime.timestamp_millis()
})
})
.collect::<Vec<Option<_>>>(),
)
}

pub fn to_timestamp_ms(array: &ArrayRef) -> Result<ArrayRef, DataFusionError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use datafusion_expr::{
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_core::arrow::array::{ArrayRef, TimestampMillisecondArray};
use vegafusion_core::arrow::compute::unary;
use vegafusion_core::arrow::datatypes::{DataType, TimeUnit};
use vegafusion_core::data::scalar::ScalarValue;

Expand Down Expand Up @@ -39,19 +38,27 @@ pub fn make_timestamptz_to_timestamp() -> ScalarUDF {
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();

let timestamp_array: TimestampMillisecondArray = unary(timestamp_array, |v| {
// Build naive datetime for time
let seconds = v / 1000;
let milliseconds = v % 1000;
let nanoseconds = (milliseconds * 1_000_000) as u32;
let naive_utc_datetime = NaiveDateTime::from_timestamp(seconds, nanoseconds);
let timestamp_array = TimestampMillisecondArray::from(
timestamp_array
.iter()
.map(|v| {
v.map(|v| {
// Build naive datetime for time
let seconds = v / 1000;
let milliseconds = v % 1000;
let nanoseconds = (milliseconds * 1_000_000) as u32;
let naive_utc_datetime =
NaiveDateTime::from_timestamp(seconds, nanoseconds);

// Create local datetime, interpreting the naive datetime as utc
let local_datetime = tz.from_utc_datetime(&naive_utc_datetime);
let naive_local_datetime = local_datetime.naive_local();
// Create local datetime, interpreting the naive datetime as utc
let local_datetime = tz.from_utc_datetime(&naive_utc_datetime);
let naive_local_datetime = local_datetime.naive_local();

naive_local_datetime.timestamp_millis()
});
naive_local_datetime.timestamp_millis()
})
})
.collect::<Vec<Option<_>>>(),
);

let timestamp_array = Arc::new(timestamp_array) as ArrayRef;

Expand Down

0 comments on commit ed3a760

Please sign in to comment.