Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong trim of FIX_LENGTH_BYTE_ARRAY in statistics #247

Merged
merged 2 commits into from
May 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions layout/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func PagesToChunk(pages []*Page) *Chunk {

var maxVal interface{} = pages[0].MaxVal
var minVal interface{} = pages[0].MinVal
pT, cT := pages[0].DataType, pages[0].DataConvertedType
pT, cT := pages[0].Schema.Type, pages[0].Schema.ConvertedType

for i := 0; i < ln; i++ {
if pages[i].Header.DataPageHeader != nil {
Expand All @@ -41,7 +41,7 @@ func PagesToChunk(pages []*Page) *Chunk {
chunk.Pages = pages
chunk.ChunkHeader = parquet.NewColumnChunk()
metaData := parquet.NewColumnMetaData()
metaData.Type = *pages[0].DataType
metaData.Type = *pages[0].Schema.Type
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_RLE)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_BIT_PACKED)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN)
Expand Down Expand Up @@ -80,7 +80,7 @@ func PagesToDictChunk(pages []*Page) *Chunk {

var maxVal interface{} = pages[1].MaxVal
var minVal interface{} = pages[1].MinVal
pT, cT := pages[1].DataType, pages[1].DataConvertedType
pT, cT := pages[1].Schema.Type, pages[1].Schema.ConvertedType

for i := 0; i < len(pages); i++ {
if pages[i].Header.DataPageHeader != nil {
Expand All @@ -100,7 +100,7 @@ func PagesToDictChunk(pages []*Page) *Chunk {
chunk.Pages = pages
chunk.ChunkHeader = parquet.NewColumnChunk()
metaData := parquet.NewColumnMetaData()
metaData.Type = *pages[1].DataType
metaData.Type = *pages[1].Schema.Type
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_RLE)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_BIT_PACKED)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN)
Expand Down
17 changes: 7 additions & 10 deletions layout/dictpage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func DictRecToDictPage(dictRec *DictRecType, pageSize int32, compressType parque
page.DataTable = new(Table)
page.DataTable.Values = dictRec.DictSlice
dataType := parquet.Type_INT32
page.DataType = &dataType
page.DataConvertedType = nil
page.Schema = &parquet.SchemaElement{
Type: &dataType,
}
page.CompressType = compressType

page.DictPageCompress(compressType, dictRec.Type)
Expand Down Expand Up @@ -75,9 +76,8 @@ func TableToDictDataPages(dictRec *DictRecType, table *Table, pageSize int32, bi
totalLn := len(table.Values)
res := make([]*Page, 0)
i := 0
dataType := table.Type

pT, cT := table.Type, table.ConvertedType
pT, cT := table.Schema.Type, table.Schema.ConvertedType

for i < totalLn {
j := i
Expand Down Expand Up @@ -118,8 +118,7 @@ func TableToDictDataPages(dictRec *DictRecType, table *Table, pageSize int32, bi
page.DataTable.RepetitionLevels = table.RepetitionLevels[i:j]
page.MaxVal = maxVal
page.MinVal = minVal

page.DataType = dataType //check !!!
page.Schema = table.Schema
page.CompressType = compressType
page.Path = table.Path
page.Info = table.Info
Expand Down Expand Up @@ -199,7 +198,6 @@ func (page *Page) DictDataPageCompress(compressType parquet.CompressionCodec, bi
func TableToDictPage(table *Table, pageSize int32, compressType parquet.CompressionCodec) (*Page, int64) {
var totSize int64 = 0
totalLn := len(table.Values)
pT, cT := table.Type, table.ConvertedType

page := NewDataPage()
page.PageSize = pageSize
Expand All @@ -214,13 +212,12 @@ func TableToDictPage(table *Table, pageSize int32, compressType parquet.Compress
page.DataTable.Values = table.Values
page.DataTable.DefinitionLevels = table.DefinitionLevels
page.DataTable.RepetitionLevels = table.RepetitionLevels
page.DataType = pT
page.DataConvertedType = cT
page.Schema = table.Schema
page.CompressType = compressType
page.Path = table.Path
page.Info = table.Info

page.DictPageCompress(compressType, *page.DataType)
page.DictPageCompress(compressType, *page.Schema.Type)
totSize += int64(len(page.RawData))
return page, totSize
}
51 changes: 25 additions & 26 deletions layout/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ type Page struct {
RawData []byte
//Compress type: gzip/snappy/zstd/none
CompressType parquet.CompressionCodec
//Parquet type of the values in the page
DataType *parquet.Type
//Parquet converted type of values in page
DataConvertedType *parquet.ConvertedType
//Schema
Schema *parquet.SchemaElement
//Path in schema(include the root)
Path []string
//Maximum of the values
Expand Down Expand Up @@ -73,7 +71,7 @@ func TableToDataPages(table *Table, pageSize int32, compressType parquet.Compres
totalLn := len(table.Values)
res := make([]*Page, 0)
i := 0
pT, cT := table.Type, table.ConvertedType
pT, cT := table.Schema.Type, table.Schema.ConvertedType

for i < totalLn {
j := i + 1
Expand Down Expand Up @@ -108,8 +106,7 @@ func TableToDataPages(table *Table, pageSize int32, compressType parquet.Compres
page.DataTable.RepetitionLevels = table.RepetitionLevels[i:j]
page.MaxVal = maxVal
page.MinVal = minVal
page.DataType = pT
page.DataConvertedType = cT
page.Schema = table.Schema
page.CompressType = compressType
page.Path = table.Path
page.Info = table.Info
Expand Down Expand Up @@ -159,7 +156,7 @@ func (page *Page) EncodingValues(valuesBuf []interface{}) []byte {
}
if encodingMethod == parquet.Encoding_RLE {
bitWidth := page.Info.Length
return encoding.WriteRLEBitPackedHybrid(valuesBuf, bitWidth, *page.DataType)
return encoding.WriteRLEBitPackedHybrid(valuesBuf, bitWidth, *page.Schema.Type)

} else if encodingMethod == parquet.Encoding_DELTA_BINARY_PACKED {
return encoding.WriteDelta(valuesBuf)
Expand All @@ -171,7 +168,7 @@ func (page *Page) EncodingValues(valuesBuf []interface{}) []byte {
return encoding.WriteDeltaLengthByteArray(valuesBuf)

} else {
return encoding.WritePlain(valuesBuf, *page.DataType)
return encoding.WritePlain(valuesBuf, *page.Schema.Type)
}
return []byte{}
}
Expand Down Expand Up @@ -235,17 +232,19 @@ func (page *Page) DataPageCompress(compressType parquet.CompressionCodec) []byte

page.Header.DataPageHeader.Statistics = parquet.NewStatistics()
if page.MaxVal != nil {
tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.DataType)
if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) ||
(page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.Schema.Type)
if *page.Schema.Type == parquet.Type_BYTE_ARRAY {
// if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) ||
// (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf = tmpBuf[4:]
}
page.Header.DataPageHeader.Statistics.Max = tmpBuf
}
if page.MinVal != nil {
tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.DataType)
if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) ||
(page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.Schema.Type)
if *page.Schema.Type == parquet.Type_BYTE_ARRAY {
// if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) ||
// (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf = tmpBuf[4:]
}
page.Header.DataPageHeader.Statistics.Min = tmpBuf
Expand Down Expand Up @@ -325,17 +324,19 @@ func (page *Page) DataPageV2Compress(compressType parquet.CompressionCodec) []by

page.Header.DataPageHeaderV2.Statistics = parquet.NewStatistics()
if page.MaxVal != nil {
tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.DataType)
if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) ||
(page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.Schema.Type)
if *page.Schema.Type == parquet.Type_BYTE_ARRAY {
// if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) ||
// (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf = tmpBuf[4:]
}
page.Header.DataPageHeaderV2.Statistics.Max = tmpBuf
}
if page.MinVal != nil {
tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.DataType)
if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) ||
(page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.Schema.Type)
if *page.Schema.Type == parquet.Type_BYTE_ARRAY {
// if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) ||
// (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf = tmpBuf[4:]
}
page.Header.DataPageHeaderV2.Statistics.Min = tmpBuf
Expand Down Expand Up @@ -406,9 +407,7 @@ func ReadPageRawData(thriftReader *thrift.TBufferedTransport, schemaHandler *sch
page.Path = append(page.Path, colMetaData.GetPathInSchema()...)
pathIndex := schemaHandler.MapIndex[common.PathToStr(page.Path)]
schema := schemaHandler.SchemaElements[pathIndex]
pT, cT := schema.GetType(), schema.GetConvertedType()
page.DataType = &pT
page.DataConvertedType = &cT
page.Schema = schema
return page, nil
}

Expand Down Expand Up @@ -550,7 +549,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error
case parquet.PageType_DICTIONARY_PAGE:
bytesReader := bytes.NewReader(self.RawData)
self.DataTable.Values, err = encoding.ReadPlain(bytesReader,
*self.DataType,
*self.Schema.Type,
uint64(self.Header.DictionaryPageHeader.GetNumValues()),
0)
if err != nil {
Expand Down Expand Up @@ -581,7 +580,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error

values, err = ReadDataPageValues(bytesReader,
encodingType,
*self.DataType,
*self.Schema.Type,
ct,
uint64(len(self.DataTable.DefinitionLevels))-numNulls,
uint64(schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].GetTypeLength()))
Expand Down
9 changes: 3 additions & 6 deletions layout/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ func NewTableFromTable(src *Table) *Table {
return nil
}
table := new(Table)
table.Type = src.Type
table.ConvertedType = src.ConvertedType
table.Schema = src.Schema
table.Path = append(table.Path, src.Path...)
table.MaxDefinitionLevel = 0
table.MaxRepetitionLevel = 0
Expand All @@ -29,10 +28,8 @@ func NewEmptyTable() *Table {
type Table struct {
//Repetition type of the values: REQUIRED/OPTIONAL/REPEATED
RepetitionType parquet.FieldRepetitionType
//Parquet type
Type *parquet.Type
//Converted type
ConvertedType *parquet.ConvertedType
//Schema
Schema *parquet.SchemaElement
//Path of this column
Path []string
//Maximum of definition levels
Expand Down
3 changes: 1 addition & 2 deletions marshal/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func MarshalCSV(records []interface{}, bgn int, end int, schemaHandler *schema.S
res[pathStr].MaxDefinitionLevel = 1
res[pathStr].MaxRepetitionLevel = 0
res[pathStr].RepetitionType = parquet.FieldRepetitionType_OPTIONAL
res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type
res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType
res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
res[pathStr].Info = schemaHandler.Infos[i+1]

for j := bgn; j < end; j++ {
Expand Down
3 changes: 1 addition & 2 deletions marshal/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem
res[pathStr].MaxDefinitionLevel, _ = schemaHandler.MaxDefinitionLevel(res[pathStr].Path)
res[pathStr].MaxRepetitionLevel, _ = schemaHandler.MaxRepetitionLevel(res[pathStr].Path)
res[pathStr].RepetitionType = schema.GetRepetitionType()
res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type
res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType
res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
res[pathStr].Info = schemaHandler.Infos[i]
}
}
Expand Down
3 changes: 1 addition & 2 deletions marshal/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ func Marshal(srcInterface []interface{}, bgn int, end int, schemaHandler *schema
res[pathStr].MaxDefinitionLevel, _ = schemaHandler.MaxDefinitionLevel(res[pathStr].Path)
res[pathStr].MaxRepetitionLevel, _ = schemaHandler.MaxRepetitionLevel(res[pathStr].Path)
res[pathStr].RepetitionType = schema.GetRepetitionType()
res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type
res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType
res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
res[pathStr].Info = schemaHandler.Infos[i]
}
}
Expand Down
3 changes: 1 addition & 2 deletions reader/columnbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ func (self *ColumnBufferType) ReadPage() error {
if self.DataTable == nil {
index := self.SchemaHandler.MapIndex[self.PathStr]
self.DataTable = layout.NewEmptyTable()
self.DataTable.Type = self.SchemaHandler.SchemaElements[index].Type
self.DataTable.ConvertedType = self.SchemaHandler.SchemaElements[index].ConvertedType
self.DataTable.Schema = self.SchemaHandler.SchemaElements[index]
self.DataTable.Path = common.StrToPath(self.PathStr)

}
Expand Down
2 changes: 1 addition & 1 deletion writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (self *ParquetWriter) flushObjs() error {
table.Info.Encoding == parquet.Encoding_RLE_DICTIONARY {
lock.Lock()
if _, ok := self.DictRecs[name]; !ok {
self.DictRecs[name] = layout.NewDictRec(*table.Type)
self.DictRecs[name] = layout.NewDictRec(*table.Schema.Type)
}
pagesMapList[index][name], _ = layout.TableToDictDataPages(self.DictRecs[name],
table, int32(self.PageSize), 32, self.CompressionType)
Expand Down