From 34d0e4302cf49a648a4c684045cd2996e05506b0 Mon Sep 17 00:00:00 2001 From: slaplante-raft Date: Sun, 21 Jul 2024 19:06:48 -0400 Subject: [PATCH] S3 Directory Document Loading Component (#2818) * Add new S3Directory component * Add Additional Metadata and Omit Metadata Keys parameters * Update S3Directory.ts add placeholder for prefix --------- Co-authored-by: Scott Laplante Co-authored-by: Henry Heng --- .../S3Directory/S3Directory.ts | 321 ++++++++++++++++++ .../nodes/documentloaders/S3Directory/s3.svg | 5 + 2 files changed, 326 insertions(+) create mode 100644 packages/components/nodes/documentloaders/S3Directory/S3Directory.ts create mode 100644 packages/components/nodes/documentloaders/S3Directory/s3.svg diff --git a/packages/components/nodes/documentloaders/S3Directory/S3Directory.ts b/packages/components/nodes/documentloaders/S3Directory/S3Directory.ts new file mode 100644 index 00000000000..94dde407fd4 --- /dev/null +++ b/packages/components/nodes/documentloaders/S3Directory/S3Directory.ts @@ -0,0 +1,321 @@ +import { omit } from 'lodash' +import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface' +import { getCredentialData, getCredentialParam } from '../../../src/utils' +import { S3Client, GetObjectCommand, S3ClientConfig, ListObjectsV2Command, ListObjectsV2Output } from '@aws-sdk/client-s3' +import { getRegions, MODEL_TYPE } from '../../../src/modelLoader' +import { Readable } from 'node:stream' +import * as fsDefault from 'node:fs' +import * as path from 'node:path' +import * as os from 'node:os' + +import { DirectoryLoader } from 'langchain/document_loaders/fs/directory' +import { JSONLoader } from 'langchain/document_loaders/fs/json' +import { CSVLoader } from 'langchain/document_loaders/fs/csv' +import { PDFLoader } from 'langchain/document_loaders/fs/pdf' +import { DocxLoader } from 'langchain/document_loaders/fs/docx' +import { TextLoader } from 'langchain/document_loaders/fs/text' +import { TextSplitter } from 'langchain/text_splitter' + +class S3_DocumentLoaders implements INode { + label: string + name: string + version: number + description: string + type: string + icon: string + category: string + baseClasses: string[] + credential: INodeParams + inputs?: INodeParams[] + + constructor() { + this.label = 'S3 Directory' + this.name = 's3Directory' + this.version = 3.0 + this.type = 'Document' + this.icon = 's3.svg' + this.category = 'Document Loaders' + this.description = 'Load Data from S3 Buckets' + this.baseClasses = [this.type] + this.credential = { + label: 'Credential', + name: 'credential', + type: 'credential', + credentialNames: ['awsApi'], + optional: true + } + this.inputs = [ + { + label: 'Text Splitter', + name: 'textSplitter', + type: 'TextSplitter', + optional: true + }, + { + label: 'Bucket', + name: 'bucketName', + type: 'string' + }, + { + label: 'Region', + name: 'region', + type: 'asyncOptions', + loadMethod: 'listRegions', + default: 'us-east-1' + }, + { + label: 'Server URL', + name: 'serverUrl', + description: + 'The fully qualified endpoint of the webservice. This is only for using a custom endpoint (for example, when using a local version of S3).', + type: 'string', + optional: true + }, + { + label: 'Prefix', + name: 'prefix', + type: 'string', + description: 'Limits the response to keys that begin with the specified prefix', + placeholder: 'TestFolder/Something', + optional: true + }, + { + label: 'Pdf Usage', + name: 'pdfUsage', + type: 'options', + options: [ + { + label: 'One document per page', + name: 'perPage' + }, + { + label: 'One document per file', + name: 'perFile' + } + ], + default: 'perPage', + optional: true, + additionalParams: true + }, + { + label: 'Additional Metadata', + name: 'metadata', + type: 'json', + description: 'Additional metadata to be added to the extracted documents', + optional: true, + additionalParams: true + }, + { + label: 'Omit Metadata Keys', + name: 'omitMetadataKeys', + type: 'string', + rows: 4, + description: + 'Each document loader comes with a default set of metadata keys that are extracted from the document. You can use this field to omit some of the default metadata keys. The value should be a list of keys, seperated by comma. Use * to omit all metadata keys execept the ones you specify in the Additional Metadata field', + placeholder: 'key1, key2, key3.nestedKey1', + optional: true, + additionalParams: true + } + ] + } + + loadMethods = { + async listRegions(): Promise { + return await getRegions(MODEL_TYPE.CHAT, 'awsChatBedrock') + } + } + + async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { + const textSplitter = nodeData.inputs?.textSplitter as TextSplitter + const bucketName = nodeData.inputs?.bucketName as string + const prefix = nodeData.inputs?.prefix as string + const region = nodeData.inputs?.region as string + const serverUrl = nodeData.inputs?.serverUrl as string + const pdfUsage = nodeData.inputs?.pdfUsage + const metadata = nodeData.inputs?.metadata + const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string + + let omitMetadataKeys: string[] = [] + if (_omitMetadataKeys) { + omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim()) + } + + let credentials: S3ClientConfig['credentials'] | undefined + + if (nodeData.credential) { + const credentialData = await getCredentialData(nodeData.credential, options) + const accessKeyId = getCredentialParam('awsKey', credentialData, nodeData) + const secretAccessKey = getCredentialParam('awsSecret', credentialData, nodeData) + + if (accessKeyId && secretAccessKey) { + credentials = { + accessKeyId, + secretAccessKey + } + } + } + + let s3Config: S3ClientConfig = { + region: region, + credentials: credentials + } + + if (serverUrl) { + s3Config = { + region: region, + credentials: credentials, + endpoint: serverUrl, + forcePathStyle: true + } + } + + const tempDir = fsDefault.mkdtempSync(path.join(os.tmpdir(), 's3fileloader-')) + + try { + const s3Client = new S3Client(s3Config) + + const listObjectsOutput: ListObjectsV2Output = await s3Client.send( + new ListObjectsV2Command({ + Bucket: bucketName, + Prefix: prefix + }) + ) + + const keys: string[] = (listObjectsOutput?.Contents ?? []).filter((item) => item.Key && item.ETag).map((item) => item.Key!) + + await Promise.all( + keys.map(async (key) => { + const filePath = path.join(tempDir, key) + try { + const response = await s3Client.send( + new GetObjectCommand({ + Bucket: bucketName, + Key: key + }) + ) + + const objectData = await new Promise((resolve, reject) => { + const chunks: Buffer[] = [] + + if (response.Body instanceof Readable) { + response.Body.on('data', (chunk: Buffer) => chunks.push(chunk)) + response.Body.on('end', () => resolve(Buffer.concat(chunks))) + response.Body.on('error', reject) + } else { + reject(new Error('Response body is not a readable stream.')) + } + }) + + // create the directory if it doesnt already exist + fsDefault.mkdirSync(path.dirname(filePath), { recursive: true }) + + // write the file to the directory + fsDefault.writeFileSync(filePath, objectData) + } catch (e: any) { + throw new Error(`Failed to download file ${key} from S3 bucket ${bucketName}: ${e.message}`) + } + }) + ) + + const loader = new DirectoryLoader( + tempDir, + { + '.json': (path) => new JSONLoader(path), + '.txt': (path) => new TextLoader(path), + '.csv': (path) => new CSVLoader(path), + '.docx': (path) => new DocxLoader(path), + '.pdf': (path) => + pdfUsage === 'perFile' + ? // @ts-ignore + new PDFLoader(path, { splitPages: false, pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }) + : // @ts-ignore + new PDFLoader(path, { pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }), + '.aspx': (path) => new TextLoader(path), + '.asp': (path) => new TextLoader(path), + '.cpp': (path) => new TextLoader(path), // C++ + '.c': (path) => new TextLoader(path), + '.cs': (path) => new TextLoader(path), + '.css': (path) => new TextLoader(path), + '.go': (path) => new TextLoader(path), // Go + '.h': (path) => new TextLoader(path), // C++ Header files + '.kt': (path) => new TextLoader(path), // Kotlin + '.java': (path) => new TextLoader(path), // Java + '.js': (path) => new TextLoader(path), // JavaScript + '.less': (path) => new TextLoader(path), // Less files + '.ts': (path) => new TextLoader(path), // TypeScript + '.php': (path) => new TextLoader(path), // PHP + '.proto': (path) => new TextLoader(path), // Protocol Buffers + '.python': (path) => new TextLoader(path), // Python + '.py': (path) => new TextLoader(path), // Python + '.rst': (path) => new TextLoader(path), // reStructuredText + '.ruby': (path) => new TextLoader(path), // Ruby + '.rb': (path) => new TextLoader(path), // Ruby + '.rs': (path) => new TextLoader(path), // Rust + '.scala': (path) => new TextLoader(path), // Scala + '.sc': (path) => new TextLoader(path), // Scala + '.scss': (path) => new TextLoader(path), // Sass + '.sol': (path) => new TextLoader(path), // Solidity + '.sql': (path) => new TextLoader(path), //SQL + '.swift': (path) => new TextLoader(path), // Swift + '.markdown': (path) => new TextLoader(path), // Markdown + '.md': (path) => new TextLoader(path), // Markdown + '.tex': (path) => new TextLoader(path), // LaTeX + '.ltx': (path) => new TextLoader(path), // LaTeX + '.html': (path) => new TextLoader(path), // HTML + '.vb': (path) => new TextLoader(path), // Visual Basic + '.xml': (path) => new TextLoader(path) // XML + }, + true + ) + + let docs = [] + + if (textSplitter) { + docs = await loader.loadAndSplit(textSplitter) + } else { + docs = await loader.load() + } + + if (metadata) { + const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata) + docs = docs.map((doc) => ({ + ...doc, + metadata: + _omitMetadataKeys === '*' + ? { + ...parsedMetadata + } + : omit( + { + ...doc.metadata, + ...parsedMetadata + }, + omitMetadataKeys + ) + })) + } else { + docs = docs.map((doc) => ({ + ...doc, + metadata: + _omitMetadataKeys === '*' + ? {} + : omit( + { + ...doc.metadata + }, + omitMetadataKeys + ) + })) + } + + // remove the temp directory before returning docs + fsDefault.rmSync(tempDir, { recursive: true }) + + return docs + } catch (e: any) { + fsDefault.rmSync(tempDir, { recursive: true }) + throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`) + } + } +} +module.exports = { nodeClass: S3_DocumentLoaders } diff --git a/packages/components/nodes/documentloaders/S3Directory/s3.svg b/packages/components/nodes/documentloaders/S3Directory/s3.svg new file mode 100644 index 00000000000..33948d2f97a --- /dev/null +++ b/packages/components/nodes/documentloaders/S3Directory/s3.svg @@ -0,0 +1,5 @@ + + + + +