Skip to content

Commit

Permalink
Fix bug #5: bug, return duplicate tuples
Browse files Browse the repository at this point in the history
  • Loading branch information
Artur Zakirov committed Aug 18, 2016
1 parent 8821cae commit e000001
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 19 deletions.
9 changes: 8 additions & 1 deletion rum.h
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,13 @@ typedef struct
bool recheck;
} RumOrderingItem;

typedef enum
{
RumFastScan,
RumRegularScan,
RumFullScan
} RumScanType;

typedef struct RumScanOpaqueData
{
MemoryContext tempCtx;
Expand All @@ -684,7 +691,7 @@ typedef struct RumScanOpaqueData
RumKey key;
bool firstCall;
bool isVoidRes; /* true if query is unsatisfiable */
bool useFastScan;
RumScanType scanType;
TIDBitmap *tbm;

ScanDirection naturalOrder;
Expand Down
107 changes: 89 additions & 18 deletions rumget.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static bool scanPage(RumState * rumstate, RumScanEntry entry, RumKey *item,
Page page, bool equalOk);
static void insertScanItem(RumScanOpaque so, bool recheck);
static int scan_entry_cmp(const void *p1, const void *p2, void *arg);
static void entryGetItem(RumState * rumstate, RumScanEntry entry);
static void entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList);


/*
Expand Down Expand Up @@ -735,7 +735,7 @@ startScan(IndexScanDesc scan)
RumScanOpaque so = (RumScanOpaque) scan->opaque;
RumState *rumstate = &so->rumstate;
uint32 i;
bool useFastScan = false;
RumScanType scanType = RumRegularScan;

MemoryContextSwitchTo(so->keyCtx);
for (i = 0; i < so->totalentries; i++)
Expand Down Expand Up @@ -783,30 +783,36 @@ startScan(IndexScanDesc scan)
{
RumScanKey key = so->keys[i];

if (so->rumstate.canPreConsistent[key->attnum - 1])
/* Check first key is it used to full-index scan */
if (i == 0 && key->nentries > 0 && key->scanEntry[i]->scanWithAddInfo)
{
useFastScan = true;
scanType = RumFullScan;
break;
}
else if (so->rumstate.canPreConsistent[key->attnum - 1])
{
scanType = RumFastScan;
break;
}
}

if (useFastScan)
if (scanType == RumFastScan)
{
for (i = 0; i < so->totalentries; i++)
{
RumScanEntry entry = so->entries[i];

if (entry->isPartialMatch || entry->forceUseBitmap)
{
useFastScan = false;
scanType = RumRegularScan;
break;
}
}
}

ItemPointerSetInvalid(&so->key.iptr);

if (useFastScan)
if (scanType == RumFastScan)
{
/*
* We are going to use fast scan. Do some preliminaries. Start scan of
Expand All @@ -819,13 +825,13 @@ startScan(IndexScanDesc scan)
for (i = 0; i < so->totalentries; i++)
{
if (!so->sortedEntries[i]->isFinished)
entryGetItem(&so->rumstate, so->sortedEntries[i]);
entryGetItem(&so->rumstate, so->sortedEntries[i], NULL);
}
qsort_arg(so->sortedEntries, so->totalentries, sizeof(RumScanEntry),
scan_entry_cmp, rumstate);
}

so->useFastScan = useFastScan;
so->scanType = scanType;
}

/*
Expand Down Expand Up @@ -952,7 +958,7 @@ entryGetNextItem(RumState * rumstate, RumScanEntry entry)
}
}

static void
static bool
entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
{
Page page;
Expand Down Expand Up @@ -998,7 +1004,7 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
ItemPointerSetInvalid(&entry->curRumKey.iptr);
entry->isFinished = TRUE;
LockBuffer(entry->stack->buffer, RUM_UNLOCK);
return;
return false;
}

page = BufferGetPage(entry->stack->buffer);
Expand All @@ -1014,7 +1020,7 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
ItemPointerSetInvalid(&entry->curRumKey.iptr);
entry->isFinished = TRUE;
LockBuffer(entry->stack->buffer, RUM_UNLOCK);
return;
return false;
}

/*
Expand Down Expand Up @@ -1102,6 +1108,8 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)

if (needUnlock)
LockBuffer(entry->stack->buffer, RUM_UNLOCK);

return true;
}

#define rum_rand() (((double) random()) / ((double) MAX_RANDOM_VALUE))
Expand All @@ -1121,10 +1129,13 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
* current implementation this is guaranteed by the behavior of tidbitmaps.
*/
static void
entryGetItem(RumState * rumstate, RumScanEntry entry)
entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList)
{
Assert(!entry->isFinished);

if (nextEntryList)
*nextEntryList = false;

if (entry->matchBitmap)
{
Assert(ScanDirectionIsForward(entry->scanDirection));
Expand Down Expand Up @@ -1186,7 +1197,8 @@ entryGetItem(RumState * rumstate, RumScanEntry entry)
else if (entry->stack)
{
entry->offset++;
entryGetNextItemList(rumstate, entry);
if (entryGetNextItemList(rumstate, entry) && nextEntryList)
*nextEntryList = true;
}
else
{
Expand All @@ -1205,7 +1217,8 @@ entryGetItem(RumState * rumstate, RumScanEntry entry)
if (entry->stack && entry->isFinished)
{
entry->isFinished = FALSE;
entryGetNextItemList(rumstate, entry);
if (entryGetNextItemList(rumstate, entry) && nextEntryList)
*nextEntryList = true;
}
}
}
Expand Down Expand Up @@ -1490,7 +1503,7 @@ scanGetItemRegular(IndexScanDesc scan, RumKey *advancePast,
compareCurRumKeyScanDirection(rumstate, entry,
&myAdvancePast) <= 0))
{
entryGetItem(rumstate, entry);
entryGetItem(rumstate, entry, NULL);

if (!ItemPointerIsValid(&myAdvancePast.iptr))
break;
Expand Down Expand Up @@ -1915,7 +1928,7 @@ entryShift(int i, RumScanOpaque so, bool find)
entryFindItem(rumstate, so->sortedEntries[minIndex],
&so->sortedEntries[i - 1]->curRumKey);
else if (!so->sortedEntries[minIndex]->isFinished)
entryGetItem(rumstate, so->sortedEntries[minIndex]);
entryGetItem(rumstate, so->sortedEntries[minIndex], NULL);

/* Restore order of so->sortedEntries */
while (minIndex > 0 &&
Expand Down Expand Up @@ -2053,6 +2066,62 @@ scanGetItemFast(IndexScanDesc scan, RumKey *advancePast,
return false;
}

/*
* Get next item pointer using full-index scan.
*
* First key is used to full scan, other keys are only used for ranking.
*/
static bool
scanGetItemFull(IndexScanDesc scan, RumKey *advancePast,
RumKey *item, bool *recheck)
{
RumScanOpaque so = (RumScanOpaque) scan->opaque;
RumScanEntry entry;
bool nextEntryList;
uint32 i;

Assert(so->totalentries > 0);
Assert(so->entries[0]->scanWithAddInfo);

/*
* This is first entry of the first key, which is used for full-index
* scan.
*/
entry = so->entries[0];

entryGetItem(&so->rumstate, entry, &nextEntryList);
if (entry->isFinished == TRUE)
return false;

/* Move related order by entries */
if (nextEntryList)
for (i = 1; i < so->totalentries; i++)
{
RumScanEntry orderEntry = so->entries[i];
if (orderEntry->nlist > 0)
{
orderEntry->isFinished = FALSE;
orderEntry->offset = InvalidOffsetNumber;
RumItemSetMin(&orderEntry->curRumKey);
}
}

for (i = 1; i < so->totalentries; i++)
{
RumScanEntry orderEntry = so->entries[i];

while (orderEntry->isFinished == FALSE &&
(!ItemPointerIsValid(&orderEntry->curRumKey.iptr) ||
compareCurRumKeyScanDirection(&so->rumstate, orderEntry,
&entry->curRumKey) < 0))
entryGetItem(&so->rumstate, orderEntry, NULL);
}

*item = entry->curRumKey;
*recheck = false;
return true;
}

/*
* Get next item whether using regular or fast scan.
*/
Expand All @@ -2062,8 +2131,10 @@ scanGetItem(IndexScanDesc scan, RumKey *advancePast,
{
RumScanOpaque so = (RumScanOpaque) scan->opaque;

if (so->useFastScan)
if (so->scanType == RumFastScan)
return scanGetItemFast(scan, advancePast, item, recheck);
else if (so->scanType == RumFullScan)
return scanGetItemFull(scan, advancePast, item, recheck);
else
return scanGetItemRegular(scan, advancePast, item, recheck);
}
Expand Down

0 comments on commit e000001

Please sign in to comment.