Skip to content

Commit

Permalink
Merge pull request #247 from xitongsys/dev
Browse files Browse the repository at this point in the history
Fix wrong trim of FIX_LENGTH_BYTE_ARRAY in statistics
  • Loading branch information
xitongsys authored May 2, 2020
2 parents 19d4d28 + ae16327 commit 0977660
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 55 deletions.
8 changes: 4 additions & 4 deletions layout/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func PagesToChunk(pages []*Page) *Chunk {

var maxVal interface{} = pages[0].MaxVal
var minVal interface{} = pages[0].MinVal
pT, cT := pages[0].DataType, pages[0].DataConvertedType
pT, cT := pages[0].Schema.Type, pages[0].Schema.ConvertedType

for i := 0; i < ln; i++ {
if pages[i].Header.DataPageHeader != nil {
Expand All @@ -41,7 +41,7 @@ func PagesToChunk(pages []*Page) *Chunk {
chunk.Pages = pages
chunk.ChunkHeader = parquet.NewColumnChunk()
metaData := parquet.NewColumnMetaData()
metaData.Type = *pages[0].DataType
metaData.Type = *pages[0].Schema.Type
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_RLE)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_BIT_PACKED)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN)
Expand Down Expand Up @@ -80,7 +80,7 @@ func PagesToDictChunk(pages []*Page) *Chunk {

var maxVal interface{} = pages[1].MaxVal
var minVal interface{} = pages[1].MinVal
pT, cT := pages[1].DataType, pages[1].DataConvertedType
pT, cT := pages[1].Schema.Type, pages[1].Schema.ConvertedType

for i := 0; i < len(pages); i++ {
if pages[i].Header.DataPageHeader != nil {
Expand All @@ -100,7 +100,7 @@ func PagesToDictChunk(pages []*Page) *Chunk {
chunk.Pages = pages
chunk.ChunkHeader = parquet.NewColumnChunk()
metaData := parquet.NewColumnMetaData()
metaData.Type = *pages[1].DataType
metaData.Type = *pages[1].Schema.Type
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_RLE)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_BIT_PACKED)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN)
Expand Down
17 changes: 7 additions & 10 deletions layout/dictpage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func DictRecToDictPage(dictRec *DictRecType, pageSize int32, compressType parque
page.DataTable = new(Table)
page.DataTable.Values = dictRec.DictSlice
dataType := parquet.Type_INT32
page.DataType = &dataType
page.DataConvertedType = nil
page.Schema = &parquet.SchemaElement{
Type: &dataType,
}
page.CompressType = compressType

page.DictPageCompress(compressType, dictRec.Type)
Expand Down Expand Up @@ -75,9 +76,8 @@ func TableToDictDataPages(dictRec *DictRecType, table *Table, pageSize int32, bi
totalLn := len(table.Values)
res := make([]*Page, 0)
i := 0
dataType := table.Type

pT, cT := table.Type, table.ConvertedType
pT, cT := table.Schema.Type, table.Schema.ConvertedType

for i < totalLn {
j := i
Expand Down Expand Up @@ -118,8 +118,7 @@ func TableToDictDataPages(dictRec *DictRecType, table *Table, pageSize int32, bi
page.DataTable.RepetitionLevels = table.RepetitionLevels[i:j]
page.MaxVal = maxVal
page.MinVal = minVal

page.DataType = dataType //check !!!
page.Schema = table.Schema
page.CompressType = compressType
page.Path = table.Path
page.Info = table.Info
Expand Down Expand Up @@ -199,7 +198,6 @@ func (page *Page) DictDataPageCompress(compressType parquet.CompressionCodec, bi
func TableToDictPage(table *Table, pageSize int32, compressType parquet.CompressionCodec) (*Page, int64) {
var totSize int64 = 0
totalLn := len(table.Values)
pT, cT := table.Type, table.ConvertedType

page := NewDataPage()
page.PageSize = pageSize
Expand All @@ -214,13 +212,12 @@ func TableToDictPage(table *Table, pageSize int32, compressType parquet.Compress
page.DataTable.Values = table.Values
page.DataTable.DefinitionLevels = table.DefinitionLevels
page.DataTable.RepetitionLevels = table.RepetitionLevels
page.DataType = pT
page.DataConvertedType = cT
page.Schema = table.Schema
page.CompressType = compressType
page.Path = table.Path
page.Info = table.Info

page.DictPageCompress(compressType, *page.DataType)
page.DictPageCompress(compressType, *page.Schema.Type)
totSize += int64(len(page.RawData))
return page, totSize
}
51 changes: 25 additions & 26 deletions layout/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ type Page struct {
RawData []byte
//Compress type: gzip/snappy/zstd/none
CompressType parquet.CompressionCodec
//Parquet type of the values in the page
DataType *parquet.Type
//Parquet converted type of values in page
DataConvertedType *parquet.ConvertedType
//Schema
Schema *parquet.SchemaElement
//Path in schema(include the root)
Path []string
//Maximum of the values
Expand Down Expand Up @@ -73,7 +71,7 @@ func TableToDataPages(table *Table, pageSize int32, compressType parquet.Compres
totalLn := len(table.Values)
res := make([]*Page, 0)
i := 0
pT, cT := table.Type, table.ConvertedType
pT, cT := table.Schema.Type, table.Schema.ConvertedType

for i < totalLn {
j := i + 1
Expand Down Expand Up @@ -108,8 +106,7 @@ func TableToDataPages(table *Table, pageSize int32, compressType parquet.Compres
page.DataTable.RepetitionLevels = table.RepetitionLevels[i:j]
page.MaxVal = maxVal
page.MinVal = minVal
page.DataType = pT
page.DataConvertedType = cT
page.Schema = table.Schema
page.CompressType = compressType
page.Path = table.Path
page.Info = table.Info
Expand Down Expand Up @@ -159,7 +156,7 @@ func (page *Page) EncodingValues(valuesBuf []interface{}) []byte {
}
if encodingMethod == parquet.Encoding_RLE {
bitWidth := page.Info.Length
return encoding.WriteRLEBitPackedHybrid(valuesBuf, bitWidth, *page.DataType)
return encoding.WriteRLEBitPackedHybrid(valuesBuf, bitWidth, *page.Schema.Type)

} else if encodingMethod == parquet.Encoding_DELTA_BINARY_PACKED {
return encoding.WriteDelta(valuesBuf)
Expand All @@ -171,7 +168,7 @@ func (page *Page) EncodingValues(valuesBuf []interface{}) []byte {
return encoding.WriteDeltaLengthByteArray(valuesBuf)

} else {
return encoding.WritePlain(valuesBuf, *page.DataType)
return encoding.WritePlain(valuesBuf, *page.Schema.Type)
}
return []byte{}
}
Expand Down Expand Up @@ -235,17 +232,19 @@ func (page *Page) DataPageCompress(compressType parquet.CompressionCodec) []byte

page.Header.DataPageHeader.Statistics = parquet.NewStatistics()
if page.MaxVal != nil {
tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.DataType)
if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) ||
(page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.Schema.Type)
if *page.Schema.Type == parquet.Type_BYTE_ARRAY {
// if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) ||
// (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf = tmpBuf[4:]
}
page.Header.DataPageHeader.Statistics.Max = tmpBuf
}
if page.MinVal != nil {
tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.DataType)
if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) ||
(page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.Schema.Type)
if *page.Schema.Type == parquet.Type_BYTE_ARRAY {
// if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) ||
// (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf = tmpBuf[4:]
}
page.Header.DataPageHeader.Statistics.Min = tmpBuf
Expand Down Expand Up @@ -325,17 +324,19 @@ func (page *Page) DataPageV2Compress(compressType parquet.CompressionCodec) []by

page.Header.DataPageHeaderV2.Statistics = parquet.NewStatistics()
if page.MaxVal != nil {
tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.DataType)
if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) ||
(page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.Schema.Type)
if *page.Schema.Type == parquet.Type_BYTE_ARRAY {
// if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) ||
// (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf = tmpBuf[4:]
}
page.Header.DataPageHeaderV2.Statistics.Max = tmpBuf
}
if page.MinVal != nil {
tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.DataType)
if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) ||
(page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.Schema.Type)
if *page.Schema.Type == parquet.Type_BYTE_ARRAY {
// if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) ||
// (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) {
tmpBuf = tmpBuf[4:]
}
page.Header.DataPageHeaderV2.Statistics.Min = tmpBuf
Expand Down Expand Up @@ -406,9 +407,7 @@ func ReadPageRawData(thriftReader *thrift.TBufferedTransport, schemaHandler *sch
page.Path = append(page.Path, colMetaData.GetPathInSchema()...)
pathIndex := schemaHandler.MapIndex[common.PathToStr(page.Path)]
schema := schemaHandler.SchemaElements[pathIndex]
pT, cT := schema.GetType(), schema.GetConvertedType()
page.DataType = &pT
page.DataConvertedType = &cT
page.Schema = schema
return page, nil
}

Expand Down Expand Up @@ -550,7 +549,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error
case parquet.PageType_DICTIONARY_PAGE:
bytesReader := bytes.NewReader(self.RawData)
self.DataTable.Values, err = encoding.ReadPlain(bytesReader,
*self.DataType,
*self.Schema.Type,
uint64(self.Header.DictionaryPageHeader.GetNumValues()),
0)
if err != nil {
Expand Down Expand Up @@ -581,7 +580,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error

values, err = ReadDataPageValues(bytesReader,
encodingType,
*self.DataType,
*self.Schema.Type,
ct,
uint64(len(self.DataTable.DefinitionLevels))-numNulls,
uint64(schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].GetTypeLength()))
Expand Down
9 changes: 3 additions & 6 deletions layout/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ func NewTableFromTable(src *Table) *Table {
return nil
}
table := new(Table)
table.Type = src.Type
table.ConvertedType = src.ConvertedType
table.Schema = src.Schema
table.Path = append(table.Path, src.Path...)
table.MaxDefinitionLevel = 0
table.MaxRepetitionLevel = 0
Expand All @@ -29,10 +28,8 @@ func NewEmptyTable() *Table {
type Table struct {
//Repetition type of the values: REQUIRED/OPTIONAL/REPEATED
RepetitionType parquet.FieldRepetitionType
//Parquet type
Type *parquet.Type
//Converted type
ConvertedType *parquet.ConvertedType
//Schema
Schema *parquet.SchemaElement
//Path of this column
Path []string
//Maximum of definition levels
Expand Down
3 changes: 1 addition & 2 deletions marshal/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func MarshalCSV(records []interface{}, bgn int, end int, schemaHandler *schema.S
res[pathStr].MaxDefinitionLevel = 1
res[pathStr].MaxRepetitionLevel = 0
res[pathStr].RepetitionType = parquet.FieldRepetitionType_OPTIONAL
res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type
res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType
res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
res[pathStr].Info = schemaHandler.Infos[i+1]

for j := bgn; j < end; j++ {
Expand Down
3 changes: 1 addition & 2 deletions marshal/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem
res[pathStr].MaxDefinitionLevel, _ = schemaHandler.MaxDefinitionLevel(res[pathStr].Path)
res[pathStr].MaxRepetitionLevel, _ = schemaHandler.MaxRepetitionLevel(res[pathStr].Path)
res[pathStr].RepetitionType = schema.GetRepetitionType()
res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type
res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType
res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
res[pathStr].Info = schemaHandler.Infos[i]
}
}
Expand Down
3 changes: 1 addition & 2 deletions marshal/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ func Marshal(srcInterface []interface{}, bgn int, end int, schemaHandler *schema
res[pathStr].MaxDefinitionLevel, _ = schemaHandler.MaxDefinitionLevel(res[pathStr].Path)
res[pathStr].MaxRepetitionLevel, _ = schemaHandler.MaxRepetitionLevel(res[pathStr].Path)
res[pathStr].RepetitionType = schema.GetRepetitionType()
res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type
res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType
res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]]
res[pathStr].Info = schemaHandler.Infos[i]
}
}
Expand Down
3 changes: 1 addition & 2 deletions reader/columnbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ func (self *ColumnBufferType) ReadPage() error {
if self.DataTable == nil {
index := self.SchemaHandler.MapIndex[self.PathStr]
self.DataTable = layout.NewEmptyTable()
self.DataTable.Type = self.SchemaHandler.SchemaElements[index].Type
self.DataTable.ConvertedType = self.SchemaHandler.SchemaElements[index].ConvertedType
self.DataTable.Schema = self.SchemaHandler.SchemaElements[index]
self.DataTable.Path = common.StrToPath(self.PathStr)

}
Expand Down
2 changes: 1 addition & 1 deletion writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (self *ParquetWriter) flushObjs() error {
table.Info.Encoding == parquet.Encoding_RLE_DICTIONARY {
lock.Lock()
if _, ok := self.DictRecs[name]; !ok {
self.DictRecs[name] = layout.NewDictRec(*table.Type)
self.DictRecs[name] = layout.NewDictRec(*table.Schema.Type)
}
pagesMapList[index][name], _ = layout.TableToDictDataPages(self.DictRecs[name],
table, int32(self.PageSize), 32, self.CompressionType)
Expand Down

0 comments on commit 0977660

Please sign in to comment.