Skip to content

Commit

Permalink
util/parquet: add support for all types
Browse files Browse the repository at this point in the history
This change adds support for the following types families to the `util/parquet`
library:
types.INetFamily, types.JsonFamily, types.FloatFamily, types.BytesFamily,
types.BitFamily, types.EnumFamily, types.Box2DFamily, types.GeographyFamily,
types.GeometryFamily, types.DateFamily, types.TimeFamily, types.TimeTZFamily,
case types.IntervalFamily, types.TimestampTZFamily.

Release note: None

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
  • Loading branch information
jayshrivastava committed Apr 20, 2023
1 parent 42b37b2 commit ebd58ea
Show file tree
Hide file tree
Showing 9 changed files with 1,047 additions and 73 deletions.
78 changes: 76 additions & 2 deletions pkg/sql/sem/tree/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,20 @@ func MustBeDFloat(e Expr) DFloat {
panic(errors.AssertionFailedf("expected *DFloat, found %T", e))
}

// AsDFloat attempts to retrieve a DFloat from an Expr, returning a DFloat and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DFloat wrapped by a
// *DOidWrapper is possible.
func AsDFloat(e Expr) (*DFloat, bool) {
switch t := e.(type) {
case *DFloat:
return t, true
case *DOidWrapper:
return AsDFloat(t.Wrapped)
}
return nil, false
}

// NewDFloat is a helper routine to create a *DFloat initialized from its
// argument.
func NewDFloat(d DFloat) *DFloat {
Expand Down Expand Up @@ -1407,6 +1421,20 @@ func NewDCollatedString(
return &d, nil
}

// AsDCollatedString attempts to retrieve a DString from an Expr, returning a AsDCollatedString and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DCollatedString wrapped by a
// *DOidWrapper is possible.
func AsDCollatedString(e Expr) (DCollatedString, bool) {
switch t := e.(type) {
case *DCollatedString:
return *t, true
case *DOidWrapper:
return AsDCollatedString(t.Wrapped)
}
return DCollatedString{}, false
}

// AmbiguousFormat implements the Datum interface.
func (*DCollatedString) AmbiguousFormat() bool { return false }

Expand Down Expand Up @@ -2286,6 +2314,20 @@ func MakeDTime(t timeofday.TimeOfDay) *DTime {
return &d
}

// AsDTime attempts to retrieve a DTime from an Expr, returning a DTimestamp and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DTime wrapped by a
// *DOidWrapper is possible.
func AsDTime(e Expr) (DTime, bool) {
switch t := e.(type) {
case *DTime:
return *t, true
case *DOidWrapper:
return AsDTime(t.Wrapped)
}
return DTime(timeofday.FromInt(0)), false
}

// ParseDTime parses and returns the *DTime Datum value represented by the
// provided string, or an error if parsing is unsuccessful.
//
Expand Down Expand Up @@ -2434,6 +2476,20 @@ func NewDTimeTZFromLocation(t timeofday.TimeOfDay, loc *time.Location) *DTimeTZ
return &DTimeTZ{timetz.MakeTimeTZFromLocation(t, loc)}
}

// AsDTimeTZ attempts to retrieve a DTimeTZ from an Expr, returning a DTimeTZ and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DTimeTZ wrapped by a
// *DOidWrapper is possible.
func AsDTimeTZ(e Expr) (DTimeTZ, bool) {
switch t := e.(type) {
case *DTimeTZ:
return *t, true
case *DOidWrapper:
return AsDTimeTZ(t.Wrapped)
}
return DTimeTZ{}, false
}

// ParseDTimeTZ parses and returns the *DTime Datum value represented by the
// provided string, or an error if parsing is unsuccessful.
//
Expand Down Expand Up @@ -3069,12 +3125,16 @@ type DInterval struct {
duration.Duration
}

// AsDInterval attempts to retrieve a DInterval from an Expr, panicking if the
// assertion fails.
// AsDInterval attempts to retrieve a DInterval from an Expr, returning a DInterval and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DInterval wrapped by a
// *DOidWrapper is possible.
func AsDInterval(e Expr) (*DInterval, bool) {
switch t := e.(type) {
case *DInterval:
return t, true
case *DOidWrapper:
return AsDInterval(t.Wrapped)
}
return nil, false
}
Expand Down Expand Up @@ -5017,6 +5077,20 @@ func NewDEnum(e DEnum) *DEnum {
return &e
}

// AsDEnum attempts to retrieve a DEnum from an Expr, returning a DEnum and
// a flag signifying whether the assertion was successful. The function should
// // be used instead of direct type assertions wherever a *DEnum wrapped by a
// // *DOidWrapper is possible.
func AsDEnum(e Expr) (*DEnum, bool) {
switch t := e.(type) {
case *DEnum:
return t, true
case *DOidWrapper:
return AsDEnum(t.Wrapped)
}
return nil, false
}

// MakeDEnumFromPhysicalRepresentation creates a DEnum of the input type
// and the input physical representation.
func MakeDEnumFromPhysicalRepresentation(typ *types.T, rep []byte) (DEnum, error) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/sem/tree/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,14 @@ func (ctx *FmtCtx) CloseAndGetString() string {
return s
}

// CloseAndGetBytes is the same as CloseAndGetString, except it returns bytes.
// This avoids an allocation by not calling ctx.String().
func (ctx *FmtCtx) CloseAndGetBytes() []byte {
s := ctx.Bytes()
ctx.Close()
return s
}

func (ctx *FmtCtx) alwaysFormatTablePrefix() bool {
return ctx.flags.HasFlags(FmtAlwaysQualifyTableNames) || ctx.tableNameFormatter != nil
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/parquet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/util/parquet",
visibility = ["//visibility:public"],
deps = [
"//pkg/geo",
"//pkg/geo/geopb",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/duration",
"//pkg/util/timeofday",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_apache_arrow_go_v11//parquet/file",
Expand All @@ -38,10 +44,15 @@ go_test(
args = ["-test.timeout=295s"],
embed = [":parquet"],
deps = [
"//pkg/geo",
"//pkg/sql/randgen",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/bitarray",
"//pkg/util/duration",
"//pkg/util/ipaddr",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_stretchr_testify//require",
Expand Down
164 changes: 163 additions & 1 deletion pkg/util/parquet/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ import (
"time"

"github.com/apache/arrow/go/v11/parquet"
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bitarray"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/timeofday"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)

// decoder is used to store typedDecoders of various types in the same
Expand Down Expand Up @@ -72,7 +78,23 @@ func (timestampDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
dtStr := string(v)
d, dependsOnCtx, err := tree.ParseDTimestamp(nil, dtStr, time.Microsecond)
if dependsOnCtx {
return nil, errors.New("TimestampTZ depends on context")
return nil, errors.Newf("decoding timestamp %s depends on context", v)
}
if err != nil {
return nil, err
}
// Converts the timezone from "loc(+0000)" to "UTC", which are equivalent.
d.Time = d.Time.UTC()
return d, nil
}

type timestampTZDecoder struct{}

func (timestampTZDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
dtStr := string(v)
d, dependsOnCtx, err := tree.ParseDTimestampTZ(nil, dtStr, time.Microsecond)
if dependsOnCtx {
return nil, errors.Newf("decoding timestamp %s depends on context", v)
}
if err != nil {
return nil, err
Expand All @@ -92,6 +114,128 @@ func (uUIDDecoder) decode(v parquet.FixedLenByteArray) (tree.Datum, error) {
return tree.NewDUuid(tree.DUuid{UUID: uid}), nil
}

type iNetDecoder struct{}

func (iNetDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.ParseDIPAddrFromINetString(string(v))
}

type jsonDecoder struct{}

func (jsonDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.ParseDJSON(string(v))
}

type bitDecoder struct{}

func (bitDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
ba, err := bitarray.Parse(string(v))
if err != nil {
return nil, err
}
return &tree.DBitArray{BitArray: ba}, err
}

type bytesDecoder struct{}

func (bytesDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.NewDBytes(tree.DBytes(v)), nil
}

type enumDecoder struct{}

func (ed enumDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return &tree.DEnum{
LogicalRep: string(v),
}, nil
}

type dateDecoder struct{}

func (dateDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
d, dependCtx, err := tree.ParseDDate(nil, string(v))
if dependCtx {
return nil, errors.Newf("decoding date %s depends on context", v)
}
return d, err
}

type box2DDecoder struct{}

func (box2DDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
b, err := geo.ParseCartesianBoundingBox(string(v))
if err != nil {
return nil, err
}
return tree.NewDBox2D(b), nil
}

type geographyDecoder struct{}

func (geographyDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
g, err := geo.ParseGeographyFromEWKB(geopb.EWKB(v))
if err != nil {
return nil, err
}
return &tree.DGeography{Geography: g}, nil
}

type geometryDecoder struct{}

func (geometryDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
g, err := geo.ParseGeometryFromEWKB(geopb.EWKB(v))
if err != nil {
return nil, err
}
return &tree.DGeometry{Geometry: g}, nil
}

type intervalDecoder struct{}

func (intervalDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.ParseDInterval(duration.IntervalStyle_ISO_8601, string(v))
}

type timeDecoder struct{}

func (timeDecoder) decode(v int64) (tree.Datum, error) {
return tree.MakeDTime(timeofday.TimeOfDay(v)), nil
}

type timeTZDecoder struct{}

func (timeTZDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
d, dependsOnCtx, err := tree.ParseDTimeTZ(nil, string(v), time.Microsecond)
if dependsOnCtx {
return nil, errors.New("parsed time depends on context")
}
return d, err
}

type float32Decoder struct{}

func (float32Decoder) decode(v float32) (tree.Datum, error) {
return tree.NewDFloat(tree.DFloat(v)), nil
}

type float64Decoder struct{}

func (float64Decoder) decode(v float64) (tree.Datum, error) {
return tree.NewDFloat(tree.DFloat(v)), nil
}

type oidDecoder struct{}

func (oidDecoder) decode(v int32) (tree.Datum, error) {
return tree.NewDOid(oid.Oid(v)), nil
}

type collatedStringDecoder struct{}

func (collatedStringDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return &tree.DCollatedString{Contents: string(v)}, nil
}

// Defeat the linter's unused lint errors.
func init() {
var _, _ = boolDecoder{}.decode(false)
Expand All @@ -100,5 +244,23 @@ func init() {
var _, _ = int64Decoder{}.decode(0)
var _, _ = decimalDecoder{}.decode(parquet.ByteArray{})
var _, _ = timestampDecoder{}.decode(parquet.ByteArray{})
var _, _ = timestampTZDecoder{}.decode(parquet.ByteArray{})
var _, _ = uUIDDecoder{}.decode(parquet.FixedLenByteArray{})
var _, _ = iNetDecoder{}.decode(parquet.ByteArray{})
var _, _ = jsonDecoder{}.decode(parquet.ByteArray{})
var _, _ = bitDecoder{}.decode(parquet.ByteArray{})
var _, _ = bytesDecoder{}.decode(parquet.ByteArray{})
var _, _ = enumDecoder{}.decode(parquet.ByteArray{})
var _, _ = dateDecoder{}.decode(parquet.ByteArray{})
var _, _ = box2DDecoder{}.decode(parquet.ByteArray{})
var _, _ = box2DDecoder{}.decode(parquet.ByteArray{})
var _, _ = geographyDecoder{}.decode(parquet.ByteArray{})
var _, _ = geometryDecoder{}.decode(parquet.ByteArray{})
var _, _ = intervalDecoder{}.decode(parquet.ByteArray{})
var _, _ = timeDecoder{}.decode(0)
var _, _ = timeTZDecoder{}.decode(parquet.ByteArray{})
var _, _ = float64Decoder{}.decode(0.0)
var _, _ = float32Decoder{}.decode(0.0)
var _, _ = oidDecoder{}.decode(0)
var _, _ = collatedStringDecoder{}.decode(parquet.ByteArray{})
}
Loading

0 comments on commit ebd58ea

Please sign in to comment.