Skip to content

Commit

Permalink
Feature/Code Interpreter (#3183)
Browse files Browse the repository at this point in the history
* Base changes for ServerSide Events (instead of socket.io)

* lint fixes

* adding of interface and separate methods for streaming events

* lint

* first draft, handles both internal and external prediction end points.

* lint fixes

* additional internal end point for streaming and associated changes

* return streamresponse as true to build agent flow

* 1) JSON formatting for internal events
2) other fixes

* 1) convert internal event to metadata to maintain consistency with external response

* fix action and metadata streaming

* fix for error when agent flow is aborted

* prevent subflows from streaming and other code cleanup

* prevent streaming from enclosed tools

* add fix for preventing chaintool streaming

* update lock file

* add open when hidden to sse

* Streaming errors

* Streaming errors

* add fix for showing error message

* add code interpreter

* add artifacts to view message dialog

* Update pnpm-lock.yaml

---------

Co-authored-by: Vinod Paidimarry <[email protected]>
  • Loading branch information
HenryHengZJ and vinodkiran authored Sep 17, 2024
1 parent 26444ac commit b02f279
Show file tree
Hide file tree
Showing 21 changed files with 729 additions and 333 deletions.
33 changes: 17 additions & 16 deletions packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class OpenAIAssistant_Agents implements INode {

const usedTools: IUsedTool[] = []
const fileAnnotations = []
const artifacts = []

const assistant = await appDataSource.getRepository(databaseEntities['Assistant']).findOneBy({
id: selectedAssistantId
Expand Down Expand Up @@ -439,21 +440,23 @@ class OpenAIAssistant_Agents implements INode {
const fileId = chunk.image_file.file_id
const fileObj = await openai.files.retrieve(fileId)

const buffer = await downloadImg(openai, fileId, `${fileObj.filename}.png`, options.chatflowid, options.chatId)
const base64String = Buffer.from(buffer).toString('base64')

// TODO: Use a file path and retrieve image on the fly. Storing as base64 to localStorage and database will easily hit limits
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
text += imgHTML
const filePath = await downloadImg(
openai,
fileId,
`${fileObj.filename}.png`,
options.chatflowid,
options.chatId
)
artifacts.push({ type: 'png', data: filePath })

if (!isStreamingStarted) {
isStreamingStarted = true
if (sseStreamer) {
sseStreamer.streamStartEvent(chatId, imgHTML)
sseStreamer.streamStartEvent(chatId, ' ')
}
}
if (sseStreamer) {
sseStreamer.streamTokenEvent(chatId, imgHTML)
sseStreamer.streamArtifactsEvent(chatId, artifacts)
}
}
}
Expand Down Expand Up @@ -565,6 +568,7 @@ class OpenAIAssistant_Agents implements INode {
return {
text,
usedTools,
artifacts,
fileAnnotations,
assistant: { assistantId: openAIAssistantId, threadId, runId: runThreadId, messages: messageData }
}
Expand Down Expand Up @@ -769,12 +773,8 @@ class OpenAIAssistant_Agents implements INode {
const fileId = content.image_file.file_id
const fileObj = await openai.files.retrieve(fileId)

const buffer = await downloadImg(openai, fileId, `${fileObj.filename}.png`, options.chatflowid, options.chatId)
const base64String = Buffer.from(buffer).toString('base64')

// TODO: Use a file path and retrieve image on the fly. Storing as base64 to localStorage and database will easily hit limits
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
returnVal += imgHTML
const filePath = await downloadImg(openai, fileId, `${fileObj.filename}.png`, options.chatflowid, options.chatId)
artifacts.push({ type: 'png', data: filePath })
}
}

Expand All @@ -787,6 +787,7 @@ class OpenAIAssistant_Agents implements INode {
return {
text: returnVal,
usedTools,
artifacts,
fileAnnotations,
assistant: { assistantId: openAIAssistantId, threadId, runId: runThreadId, messages: messageData }
}
Expand All @@ -807,9 +808,9 @@ const downloadImg = async (openai: OpenAI, fileId: string, fileName: string, ...
const image_data_buffer = Buffer.from(image_data)
const mime = 'image/png'

await addSingleFileToStorage(mime, image_data_buffer, fileName, ...paths)
const res = await addSingleFileToStorage(mime, image_data_buffer, fileName, ...paths)

return image_data_buffer
return res
}

const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string, ...paths: string[]) => {
Expand Down
15 changes: 14 additions & 1 deletion packages/components/nodes/agents/ToolAgent/ToolAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class ToolAgent_Agents implements INode {
let res: ChainValues = {}
let sourceDocuments: ICommonObject[] = []
let usedTools: IUsedTool[] = []
let artifacts = []

if (shouldStreamResponse) {
const handler = new CustomChainHandler(sseStreamer, chatId)
Expand All @@ -150,6 +151,12 @@ class ToolAgent_Agents implements INode {
}
usedTools = res.usedTools
}
if (res.artifacts) {
if (sseStreamer) {
sseStreamer.streamArtifactsEvent(chatId, flatten(res.artifacts))
}
artifacts = res.artifacts
}
// If the tool is set to returnDirect, stream the output to the client
if (res.usedTools && res.usedTools.length) {
let inputTools = nodeData.inputs?.tools
Expand All @@ -169,6 +176,9 @@ class ToolAgent_Agents implements INode {
if (res.usedTools) {
usedTools = res.usedTools
}
if (res.artifacts) {
artifacts = res.artifacts
}
}

let output = res?.output
Expand Down Expand Up @@ -203,14 +213,17 @@ class ToolAgent_Agents implements INode {

let finalRes = output

if (sourceDocuments.length || usedTools.length) {
if (sourceDocuments.length || usedTools.length || artifacts.length) {
const finalRes: ICommonObject = { text: output }
if (sourceDocuments.length) {
finalRes.sourceDocuments = flatten(sourceDocuments)
}
if (usedTools.length) {
finalRes.usedTools = usedTools
}
if (artifacts.length) {
finalRes.artifacts = artifacts
}
return finalRes
}

Expand Down
39 changes: 34 additions & 5 deletions packages/components/nodes/sequentialagents/Agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
IDocument,
IStateWithMessages
} from '../../../src/Interface'
import { ToolCallingAgentOutputParser, AgentExecutor, SOURCE_DOCUMENTS_PREFIX } from '../../../src/agents'
import { ToolCallingAgentOutputParser, AgentExecutor, SOURCE_DOCUMENTS_PREFIX, ARTIFACTS_PREFIX } from '../../../src/agents'
import { getInputVariables, getVars, handleEscapeCharacters, prepareSandboxVars } from '../../../src/utils'
import {
customGet,
Expand All @@ -35,7 +35,6 @@ import {
} from '../commonUtils'
import { END, StateGraph } from '@langchain/langgraph'
import { StructuredTool } from '@langchain/core/tools'
import { DynamicStructuredTool } from '../../tools/CustomTool/core'

const defaultApprovalPrompt = `You are about to execute tool: {tools}. Ask if user want to proceed`
const examplePrompt = 'You are a research assistant who can search for up-to-date info using search engine.'
Expand Down Expand Up @@ -739,14 +738,22 @@ async function agentNode(

// If the last message is a tool message and is an interrupted message, format output into standard agent output
if (lastMessage._getType() === 'tool' && lastMessage.additional_kwargs?.nodeId === nodeData.id) {
let formattedAgentResult: { output?: string; usedTools?: IUsedTool[]; sourceDocuments?: IDocument[] } = {}
let formattedAgentResult: {
output?: string
usedTools?: IUsedTool[]
sourceDocuments?: IDocument[]
artifacts?: ICommonObject[]
} = {}
formattedAgentResult.output = result.content
if (lastMessage.additional_kwargs?.usedTools) {
formattedAgentResult.usedTools = lastMessage.additional_kwargs.usedTools as IUsedTool[]
}
if (lastMessage.additional_kwargs?.sourceDocuments) {
formattedAgentResult.sourceDocuments = lastMessage.additional_kwargs.sourceDocuments as IDocument[]
}
if (lastMessage.additional_kwargs?.artifacts) {
formattedAgentResult.artifacts = lastMessage.additional_kwargs.artifacts as ICommonObject[]
}
result = formattedAgentResult
} else {
result.name = name
Expand All @@ -765,12 +772,18 @@ async function agentNode(
if (result.sourceDocuments) {
additional_kwargs.sourceDocuments = result.sourceDocuments
}
if (result.artifacts) {
additional_kwargs.artifacts = result.artifacts
}
if (result.output) {
result.content = result.output
delete result.output
}

const outputContent = typeof result === 'string' ? result : result.content || result.output
let outputContent = typeof result === 'string' ? result : result.content || result.output

// remove invalid markdown image pattern: ![<some-string>](<some-string>)
outputContent = typeof outputContent === 'string' ? outputContent.replace(/!\[.*?\]\(.*?\)/g, '') : outputContent

if (nodeData.inputs?.updateStateMemoryUI || nodeData.inputs?.updateStateMemoryCode) {
let formattedOutput = {
Expand Down Expand Up @@ -931,6 +944,9 @@ class ToolNode<T extends BaseMessage[] | MessagesState> extends RunnableCallable
// Extract all properties except messages for IStateWithMessages
const { messages: _, ...inputWithoutMessages } = Array.isArray(input) ? { messages: input } : input
const ChannelsWithoutMessages = {
chatId: this.options.chatId,
sessionId: this.options.sessionId,
input: this.inputQuery,
state: inputWithoutMessages
}

Expand All @@ -940,12 +956,14 @@ class ToolNode<T extends BaseMessage[] | MessagesState> extends RunnableCallable
if (tool === undefined) {
throw new Error(`Tool ${call.name} not found.`)
}
if (tool && tool instanceof DynamicStructuredTool) {
if (tool && (tool as any).setFlowObject) {
// @ts-ignore
tool.setFlowObject(ChannelsWithoutMessages)
}
let output = await tool.invoke(call.args, config)
let sourceDocuments: Document[] = []
let artifacts = []

if (output?.includes(SOURCE_DOCUMENTS_PREFIX)) {
const outputArray = output.split(SOURCE_DOCUMENTS_PREFIX)
output = outputArray[0]
Expand All @@ -956,12 +974,23 @@ class ToolNode<T extends BaseMessage[] | MessagesState> extends RunnableCallable
console.error('Error parsing source documents from tool')
}
}
if (output?.includes(ARTIFACTS_PREFIX)) {
const outputArray = output.split(ARTIFACTS_PREFIX)
output = outputArray[0]
try {
artifacts = JSON.parse(outputArray[1])
} catch (e) {
console.error('Error parsing artifacts from tool')
}
}

return new ToolMessage({
name: tool.name,
content: typeof output === 'string' ? output : JSON.stringify(output),
tool_call_id: call.id!,
additional_kwargs: {
sourceDocuments,
artifacts,
args: call.args,
usedTools: [
{
Expand Down
23 changes: 19 additions & 4 deletions packages/components/nodes/sequentialagents/ToolNode/ToolNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ import {
import { AIMessage, AIMessageChunk, BaseMessage, ToolMessage } from '@langchain/core/messages'
import { StructuredTool } from '@langchain/core/tools'
import { RunnableConfig } from '@langchain/core/runnables'
import { SOURCE_DOCUMENTS_PREFIX } from '../../../src/agents'
import { ARTIFACTS_PREFIX, SOURCE_DOCUMENTS_PREFIX } from '../../../src/agents'
import { Document } from '@langchain/core/documents'
import { DataSource } from 'typeorm'
import { MessagesState, RunnableCallable, customGet, getVM } from '../commonUtils'
import { getVars, prepareSandboxVars } from '../../../src/utils'
import { ChatPromptTemplate } from '@langchain/core/prompts'
import { DynamicStructuredTool } from '../../tools/CustomTool/core'

const defaultApprovalPrompt = `You are about to execute tool: {tools}. Ask if user want to proceed`

Expand Down Expand Up @@ -408,6 +407,9 @@ class ToolNode<T extends IStateWithMessages | BaseMessage[] | MessagesState> ext
// Extract all properties except messages for IStateWithMessages
const { messages: _, ...inputWithoutMessages } = Array.isArray(input) ? { messages: input } : input
const ChannelsWithoutMessages = {
chatId: this.options.chatId,
sessionId: this.options.sessionId,
input: this.inputQuery,
state: inputWithoutMessages
}

Expand All @@ -417,12 +419,13 @@ class ToolNode<T extends IStateWithMessages | BaseMessage[] | MessagesState> ext
if (tool === undefined) {
throw new Error(`Tool ${call.name} not found.`)
}
if (tool && tool instanceof DynamicStructuredTool) {
if (tool && (tool as any).setFlowObject) {
// @ts-ignore
tool.setFlowObject(ChannelsWithoutMessages)
}
let output = await tool.invoke(call.args, config)
let sourceDocuments: Document[] = []
let artifacts = []
if (output?.includes(SOURCE_DOCUMENTS_PREFIX)) {
const outputArray = output.split(SOURCE_DOCUMENTS_PREFIX)
output = outputArray[0]
Expand All @@ -433,12 +436,23 @@ class ToolNode<T extends IStateWithMessages | BaseMessage[] | MessagesState> ext
console.error('Error parsing source documents from tool')
}
}
if (output?.includes(ARTIFACTS_PREFIX)) {
const outputArray = output.split(ARTIFACTS_PREFIX)
output = outputArray[0]
try {
artifacts = JSON.parse(outputArray[1])
} catch (e) {
console.error('Error parsing artifacts from tool')
}
}

return new ToolMessage({
name: tool.name,
content: typeof output === 'string' ? output : JSON.stringify(output),
tool_call_id: call.id!,
additional_kwargs: {
sourceDocuments,
artifacts,
args: call.args,
usedTools: [
{
Expand Down Expand Up @@ -489,7 +503,8 @@ const getReturnOutput = async (
tool: output.name,
toolInput: output.additional_kwargs.args,
toolOutput: output.content,
sourceDocuments: output.additional_kwargs.sourceDocuments
sourceDocuments: output.additional_kwargs.sourceDocuments,
artifacts: output.additional_kwargs.artifacts
} as IUsedTool
})

Expand Down
Loading

0 comments on commit b02f279

Please sign in to comment.