Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gian chi 2994 1.18 rc merge #758

Merged
merged 7 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions hrm-domain/hrm-core/case/adminCaseRoutesV0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
*/

import type { Request, Response, NextFunction } from 'express';
import { isErr, mapHTTPError } from '@tech-matters/types';
import { SafeRouter, publicEndpoint } from '../permissions';
import { reindexCases } from './caseReindexService';
import { reindexCasesStream } from './caseReindexService';

const adminContactsRouter = SafeRouter();

Expand All @@ -29,17 +28,13 @@ adminContactsRouter.post(
const { hrmAccountId } = req;
const { dateFrom, dateTo } = req.body;

const result = await reindexCases(hrmAccountId, dateFrom, dateTo);
const resultStream = await reindexCasesStream(hrmAccountId, dateFrom, dateTo);

if (isErr(result)) {
return next(
mapHTTPError(result, {
InvalidParameterError: 400,
}),
);
}

res.json(result.data);
resultStream.on('error', err => {
next(err);
});
res.status(200).setHeader('Content-Type', 'text/plain');
resultStream.pipe(res);
},
);

Expand Down
86 changes: 73 additions & 13 deletions hrm-domain/hrm-core/case/caseDataAccess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
import { CaseSectionRecord } from './caseSection/types';
import { pick } from 'lodash';
import { HrmAccountId } from '@tech-matters/types';
import QueryStream from 'pg-query-stream';

export { PrecalculatedCasePermissionConditions, CaseRecordCommon };

Expand Down Expand Up @@ -219,21 +220,30 @@ const generalizedSearchQueryFunction = <T>(
};
};

const searchParametersToQueryParameters: SearchQueryParamsBuilder<CaseSearchCriteria> = (
accountSid,
user,
searchCriteria,
filters,
limit,
offset,
) => ({
...filters,
accountSid,
firstName: searchCriteria.firstName ? `%${searchCriteria.firstName}%` : null,
lastName: searchCriteria.lastName ? `%${searchCriteria.lastName}%` : null,
phoneNumber: searchCriteria.phoneNumber
? `%${searchCriteria.phoneNumber.replace(/[\D]/gi, '')}%`
: null,
contactNumber: searchCriteria.contactNumber || null,
limit: limit,
offset: offset,
twilioWorkerSid: user.workerSid,
});

export const search = generalizedSearchQueryFunction<CaseSearchCriteria>(
selectCaseSearch,
(accountSid, user, searchCriteria, filters, limit, offset) => ({
...filters,
accountSid,
firstName: searchCriteria.firstName ? `%${searchCriteria.firstName}%` : null,
lastName: searchCriteria.lastName ? `%${searchCriteria.lastName}%` : null,
phoneNumber: searchCriteria.phoneNumber
? `%${searchCriteria.phoneNumber.replace(/[\D]/gi, '')}%`
: null,
contactNumber: searchCriteria.contactNumber || null,
limit: limit,
offset: offset,
twilioWorkerSid: user.workerSid,
}),
searchParametersToQueryParameters,
);

export const searchByProfileId = generalizedSearchQueryFunction<{
Expand Down Expand Up @@ -311,3 +321,53 @@ export const searchByCaseIds = generalizedSearchQueryFunction<{
twilioWorkerSid: user.workerSid,
};
});

export const streamCasesForReindexing = ({
accountSid,
filters,
user,
viewCasePermissions,
viewContactPermissions,
batchSize = 1000,
}: {
accountSid: HrmAccountId;
filters: NonNullable<Pick<CaseListFilters, 'createdAt' | 'updatedAt'>>;
user: TwilioUser;
viewCasePermissions: TKConditionsSets<'case'>;
viewContactPermissions: TKConditionsSets<'contact'>;
batchSize?: number;
}): Promise<NodeJS.ReadableStream> => {
const { sortBy, sortDirection } = getPaginationElements({});
const orderByClause = [{ sortBy, sortDirection }];

const qs = new QueryStream(
pgp.as.format(
selectCaseSearch(
user,
viewCasePermissions,
viewContactPermissions,
filters,
orderByClause,
),
searchParametersToQueryParameters(
accountSid,
user,
{},
filters,
Number.MAX_SAFE_INTEGER, // limit
0, // offset
),
),
[],
{
batchSize,
},
);

// Expose the readable stream to the caller as a promise for further pipelining
return new Promise(resolve => {
db.stream(qs, resultStream => {
resolve(resultStream);
});
});
};
103 changes: 60 additions & 43 deletions hrm-domain/hrm-core/case/caseReindexService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,74 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/

import { HrmAccountId, newErr, newOkFromData } from '@tech-matters/types';
import { CaseService, searchCases } from './caseService';
import { HrmAccountId } from '@tech-matters/types';
import { caseRecordToCase } from './caseService';
import { publishCaseToSearchIndex } from '../jobs/search/publishToSearchIndex';
import { maxPermissions } from '../permissions';
import { AsyncProcessor, SearchFunction, processInBatch } from '../autoPaginate';
import formatISO from 'date-fns/formatISO';
import { CaseRecord, streamCasesForReindexing } from './caseDataAccess';
import { TKConditionsSets } from '../permissions/rulesMap';
import { Transform } from 'stream';

export const reindexCases = async (
// TODO: move this to service initialization or constant package?
const highWaterMark = 1000;

export const reindexCasesStream = async (
accountSid: HrmAccountId,
dateFrom: string,
dateTo: string,
) => {
try {
const filters = {
createdAt: {
from: formatISO(new Date(dateFrom)),
to: formatISO(new Date(dateTo)),
},
};

const searchFunction: SearchFunction<CaseService> = async limitAndOffset => {
const res = await searchCases(
accountSid,
{
limit: limitAndOffset.limit.toString(),
offset: limitAndOffset.offset.toString(),
},
{},
{ filters },
maxPermissions,
);
return { records: res.cases as CaseService[], count: res.count };
};
): Promise<Transform> => {
const filters = {
createdAt: {
from: formatISO(new Date(dateFrom)),
to: formatISO(new Date(dateTo)),
},
updatedAt: {
from: formatISO(new Date(dateFrom)),
to: formatISO(new Date(dateTo)),
},
};

const asyncProcessor: AsyncProcessor<CaseService, void> = async casesResult => {
const promises = casesResult.records.map(caseObj => {
return publishCaseToSearchIndex({
accountSid,
case: caseObj,
operation: 'index',
});
});
console.debug('Querying DB for cases to index', filters);
const casesStream: NodeJS.ReadableStream = await streamCasesForReindexing({
accountSid,
filters,
user: maxPermissions.user,
viewCasePermissions: maxPermissions.permissions.viewCase as TKConditionsSets<'case'>,
viewContactPermissions: maxPermissions.permissions
.viewContact as TKConditionsSets<'contact'>,
batchSize: highWaterMark,
});

await Promise.all(promises);
};
console.debug('Piping cases to queue for reindexing', filters);
return casesStream.pipe(
new Transform({
objectMode: true,
highWaterMark,
async transform(caseRecord: CaseRecord, _, callback) {
const caseObj = caseRecordToCase(caseRecord);
try {
const { MessageId } = await publishCaseToSearchIndex({
accountSid,
case: caseObj,
operation: 'index',
});

await processInBatch(searchFunction, asyncProcessor);

return newOkFromData('Successfully indexed contacts');
} catch (error) {
console.error('Error reindexing contacts', error);
return newErr({ error, message: 'Error reindexing contacts' });
}
this.push(
`${new Date().toISOString()},${accountSid},contad id: ${
caseObj.id
} Success, MessageId ${MessageId}
\n`,
);
} catch (err) {
this.push(
`${new Date().toISOString()},${accountSid},contad id: ${caseObj.id} Error: ${
err.message?.replace('"', '""') || String(err)
}\n`,
);
}
callback();
},
}),
);
};
17 changes: 2 additions & 15 deletions hrm-domain/hrm-core/case/caseService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
generateCaseSearchFilters,
} from './caseSearchIndex';
import { ContactListCondition } from '../contact/contactSearchIndex';
import { maxPermissions } from '../permissions';

export { WELL_KNOWN_CASE_SECTION_NAMES, CaseService, CaseInfoSection };

Expand Down Expand Up @@ -186,7 +187,7 @@ const caseToCaseRecord = (
return caseWithoutContacts;
};

const caseRecordToCase = (record: CaseRecord): CaseService => {
export const caseRecordToCase = (record: CaseRecord): CaseService => {
// Remove legacy case sections
const info = {
...record.info,
Expand Down Expand Up @@ -284,20 +285,6 @@ const mapEssentialData =
};
};

// TODO: use the factored out version once that's merged
const maxPermissions: {
user: TwilioUser;
can: () => boolean;
} = {
can: () => true,
user: {
accountSid: 'ACxxx',
workerSid: 'WKxxx',
roles: ['supervisor'],
isSupervisor: true,
},
};

const doCaseInSearchIndexOP =
(operation: IndexMessage['operation']) =>
async ({
Expand Down
2 changes: 1 addition & 1 deletion hrm-domain/hrm-core/case/sql/caseSearchSql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ export type SearchQueryBuilder = (
viewCasePermissions: TKConditionsSets<'case'>,
viewContactPermissions: TKConditionsSets<'contact'>,
filters: CaseListFilters,
orderByClauses?: OrderByClauseItem[],
orderByClauses: OrderByClauseItem[],
onlyEssentialData?: boolean,
) => string;

Expand Down
20 changes: 7 additions & 13 deletions hrm-domain/hrm-core/contact/adminContactRoutesV0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
*/

import type { Request, Response, NextFunction } from 'express';
import { isErr, mapHTTPError } from '@tech-matters/types';
import { SafeRouter, publicEndpoint } from '../permissions';
import { reindexContacts } from './contactsReindexService';
import { reindexContactsStream } from './contactsReindexService';

const adminContactsRouter = SafeRouter();

Expand All @@ -29,17 +28,12 @@ adminContactsRouter.post(
const { hrmAccountId } = req;
const { dateFrom, dateTo } = req.body;

const result = await reindexContacts(hrmAccountId, dateFrom, dateTo);

if (isErr(result)) {
return next(
mapHTTPError(result, {
InvalidParameterError: 400,
}),
);
}

res.json(result.data);
const resultStream = await reindexContactsStream(hrmAccountId, dateFrom, dateTo);
resultStream.on('error', err => {
next(err);
});
res.status(200).setHeader('Content-Type', 'text/plain');
resultStream.pipe(res);
},
);

Expand Down
Loading