Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Chunker): JSON parsing performance #7171

Merged
merged 16 commits into from
Dec 21, 2020
28 changes: 15 additions & 13 deletions chunker/json_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,16 @@ func getNextBlank() string {
func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred string) (
mapResponse, error) {
var mr mapResponse

// move all facets from global map to smaller mf map
mf := make(map[string]interface{})
for k, v := range m {
if strings.Contains(k, x.FacetDelimeter) {
mf[k] = v
delete(m, k)
}
}

// Check field in map.
if uidVal, ok := m["uid"]; ok {
var uid uint64
Expand Down Expand Up @@ -413,7 +423,6 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
// Delete operations with a non-nil value must have a uid specified.
return mr, errors.Errorf("UID must be present and non-zero while deleting edges.")
}

mr.uid = getNextBlank()
}

Expand All @@ -422,7 +431,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
// v can be nil if user didn't set a value and if omitEmpty was not supplied as JSON
// option.
// We also skip facets here because we parse them with the corresponding predicate.
if pred == "uid" || strings.Index(pred, x.FacetDelimeter) > 0 {
if pred == "uid" {
continue
}

Expand Down Expand Up @@ -451,11 +460,8 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
}

prefix := pred + x.FacetDelimeter
// TODO - Maybe do an initial pass and build facets for all predicates. Then we don't have
// to call parseFacets everytime.
// Only call parseBasicFacets when value type for the predicate is not list.
if _, ok := v.([]interface{}); !ok {
fts, err := parseScalarFacets(m, prefix)
fts, err := parseScalarFacets(mf, prefix)
if err != nil {
return mr, err
}
Expand Down Expand Up @@ -502,7 +508,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
buf.PushPredHint(pred, pb.Metadata_LIST)
// TODO(Ashish): We need to call this only in case of scalarlist, for other lists
// this can be avoided.
facetsMapSlice, err := parseMapFacets(m, prefix)
facetsMapSlice, err := parseMapFacets(mf, prefix)
if err != nil {
return mr, err
}
Expand Down Expand Up @@ -575,8 +581,9 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
}
}

fts, err := parseScalarFacets(m, parentPred+x.FacetDelimeter)
fts, err := parseScalarFacets(mf, parentPred+x.FacetDelimeter)
mr.fcts = fts

return mr, err
}

Expand All @@ -597,7 +604,6 @@ func (buf *NQuadBuffer) ParseJSON(b []byte, op int) error {
var list []interface{}
if err := dec.Decode(&ms); err != nil {
// Couldn't parse as map, lets try to parse it as a list.

buffer.Reset() // The previous contents are used. Reset here.
// Rewrite b into buffer, so it can be consumed.
if _, err := buffer.Write(b); err != nil {
Expand All @@ -607,11 +613,9 @@ func (buf *NQuadBuffer) ParseJSON(b []byte, op int) error {
return err
}
}

if len(list) == 0 && len(ms) == 0 {
return nil
}

if len(list) > 0 {
for _, obj := range list {
if _, ok := obj.(map[string]interface{}); !ok {
Expand All @@ -625,7 +629,6 @@ func (buf *NQuadBuffer) ParseJSON(b []byte, op int) error {
}
return nil
}

mr, err := buf.mapToNquads(ms, op, "")
buf.checkForDeletion(mr, ms, op)
return err
Expand All @@ -639,7 +642,6 @@ func ParseJSON(b []byte, op int) ([]*api.NQuad, *pb.Metadata, error) {
if err != nil {
return nil, nil, err
}

buf.Flush()
nqs := <-buf.Ch()
metadata := buf.Metadata()
Expand Down
142 changes: 141 additions & 1 deletion chunker/json_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,6 @@ func TestNquadsFromJsonFacets1(t *testing.T) {

for _, n := range nq {
glog.Infof("%v", n)

}

checkFacets(t, nq, "mobile", []*api.Facet{
Expand Down Expand Up @@ -815,3 +814,144 @@ func TestSetNquadNilValue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(nq))
}

func BenchmarkNoFacets(b *testing.B) {
json := []byte(`[
{
"uid":123,
"flguid":123,
"is_validate":"xxxxxxxxxx",
"createDatetime":"xxxxxxxxxx",
"contains":{
"createDatetime":"xxxxxxxxxx",
"final_individ":"xxxxxxxxxx",
"cm_bad_debt":"xxxxxxxxxx",
"cm_bill_address1":"xxxxxxxxxx",
"cm_bill_address2":"xxxxxxxxxx",
"cm_bill_city":"xxxxxxxxxx",
"cm_bill_state":"xxxxxxxxxx",
"cm_zip":"xxxxxxxxxx",
"zip5":"xxxxxxxxxx",
"cm_customer_id":"xxxxxxxxxx",
"final_gaid":"xxxxxxxxxx",
"final_hholdid":"xxxxxxxxxx",
"final_firstname":"xxxxxxxxxx",
"final_middlename":"xxxxxxxxxx",
"final_surname":"xxxxxxxxxx",
"final_gender":"xxxxxxxxxx",
"final_ace_prim_addr":"xxxxxxxxxx",
"final_ace_sec_addr":"xxxxxxxxxx",
"final_ace_urb":"xxxxxxxxxx",
"final_ace_city_llidx":"xxxxxxxxxx",
"final_ace_state":"xxxxxxxxxx",
"final_ace_postal_code":"xxxxxxxxxx",
"final_ace_zip4":"xxxxxxxxxx",
"final_ace_dpbc":"xxxxxxxxxx",
"final_ace_checkdigit":"xxxxxxxxxx",
"final_ace_iso_code":"xxxxxxxxxx",
"final_ace_cart":"xxxxxxxxxx",
"final_ace_lot":"xxxxxxxxxx",
"final_ace_lot_order":"xxxxxxxxxx",
"final_ace_rec_type":"xxxxxxxxxx",
"final_ace_remainder":"xxxxxxxxxx",
"final_ace_dpv_cmra":"xxxxxxxxxx",
"final_ace_dpv_ftnote":"xxxxxxxxxx",
"final_ace_dpv_status":"xxxxxxxxxx",
"final_ace_foreigncode":"xxxxxxxxxx",
"final_ace_match_5":"xxxxxxxxxx",
"final_ace_match_9":"xxxxxxxxxx",
"final_ace_match_un":"xxxxxxxxxx",
"final_ace_zip_move":"xxxxxxxxxx",
"final_ace_ziptype":"xxxxxxxxxx",
"final_ace_congress":"xxxxxxxxxx",
"final_ace_county":"xxxxxxxxxx",
"final_ace_countyname":"xxxxxxxxxx",
"final_ace_factype":"xxxxxxxxxx",
"final_ace_fipscode":"xxxxxxxxxx",
"final_ace_error_code":"xxxxxxxxxx",
"final_ace_stat_code":"xxxxxxxxxx",
"final_ace_geo_match":"xxxxxxxxxx",
"final_ace_geo_lat":"xxxxxxxxxx",
"final_ace_geo_lng":"xxxxxxxxxx",
"final_ace_ageo_pla":"xxxxxxxxxx",
"final_ace_geo_blk":"xxxxxxxxxx",
"final_ace_ageo_mcd":"xxxxxxxxxx",
"final_ace_cgeo_cbsa":"xxxxxxxxxx",
"final_ace_cgeo_msa":"xxxxxxxxxx",
"final_ace_ap_lacscode":"xxxxxxxxxx",
"final_dsf_businessflag":"xxxxxxxxxx",
"final_dsf_dropflag":"xxxxxxxxxx",
"final_dsf_throwbackflag":"xxxxxxxxxx",
"final_dsf_seasonalflag":"xxxxxxxxxx",
"final_dsf_vacantflag":"xxxxxxxxxx",
"final_dsf_deliverytype":"xxxxxxxxxx",
"final_dsf_dt_curbflag":"xxxxxxxxxx",
"final_dsf_dt_ndcbuflag":"xxxxxxxxxx",
"final_dsf_dt_centralflag":"xxxxxxxxxx",
"final_dsf_dt_doorslotflag":"xxxxxxxxxx",
"final_dsf_dropcount":"xxxxxxxxxx",
"final_dsf_nostatflag":"xxxxxxxxxx",
"final_dsf_educationalflag":"xxxxxxxxxx",
"final_dsf_rectyp":"xxxxxxxxxx",
"final_mailability_score":"xxxxxxxxxx",
"final_occupancy_score":"xxxxxxxxxx",
"final_multi_type":"xxxxxxxxxx",
"final_deceased_flag":"xxxxxxxxxx",
"final_dnm_flag":"xxxxxxxxxx",
"final_dnc_flag":"xxxxxxxxxx",
"final_dnf_flag":"xxxxxxxxxx",
"final_prison_flag":"xxxxxxxxxx",
"final_nursing_home_flag":"xxxxxxxxxx",
"final_date_of_birth":"xxxxxxxxxx",
"final_date_of_death":"xxxxxxxxxx",
"vip_number":"xxxxxxxxxx",
"vip_store_no":"xxxxxxxxxx",
"vip_division":"xxxxxxxxxx",
"vip_phone_number":"xxxxxxxxxx",
"vip_email_address":"xxxxxxxxxx",
"vip_first_name":"xxxxxxxxxx",
"vip_last_name":"xxxxxxxxxx",
"vip_gender":"xxxxxxxxxx",
"vip_status":"xxxxxxxxxx",
"vip_membership_date":"xxxxxxxxxx",
"vip_expiration_date":"xxxxxxxxxx",
"cm_date_addr_chng":"xxxxxxxxxx",
"cm_date_entered":"xxxxxxxxxx",
"cm_name":"xxxxxxxxxx",
"cm_opt_on_acct":"xxxxxxxxxx",
"cm_origin":"xxxxxxxxxx",
"cm_orig_acq_source":"xxxxxxxxxx",
"cm_phone_number":"xxxxxxxxxx",
"cm_phone_number2":"xxxxxxxxxx",
"cm_problem_cust":"xxxxxxxxxx",
"cm_rm_list":"xxxxxxxxxx",
"cm_rm_rented_list":"xxxxxxxxxx",
"cm_tax_code":"xxxxxxxxxx",
"email_address":"xxxxxxxxxx",
"esp_email_id":"xxxxxxxxxx",
"esp_sub_date":"xxxxxxxxxx",
"esp_unsub_date":"xxxxxxxxxx",
"cm_user_def_1":"xxxxxxxxxx",
"cm_user_def_7":"xxxxxxxxxx",
"do_not_phone":"xxxxxxxxxx",
"company_num":"xxxxxxxxxx",
"customer_id":"xxxxxxxxxx",
"load_date":"xxxxxxxxxx",
"activity_date":"xxxxxxxxxx",
"email_address_hashed":"xxxxxxxxxx",
"event_id":"",
"contains":{
"uid": 123,
"flguid": 123,
"is_validate":"xxxxxxxxxx",
"createDatetime":"xxxxxxxxxx"
}
}
}]`)

// we're parsing 125 nquads at a time, so the MB/s == MNquads/s
b.SetBytes(125)
for n := 0; n < b.N; n++ {
Parse([]byte(json), SetNquads)
}
}