Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote AI Worker (Livepool version) #3106

Closed

Conversation

kyriediculous
Copy link
Contributor

@kyriediculous kyriediculous commented Jul 27, 2024

What does this pull request do? Explain your changes. (required)
This PR adds a Remote AI worker to the go-livepeer repo, integrated on the transcoder. Its control flow and networking layer are akin to that of the remote O+T setup.

It allows an Orchestrator to run in standalone mode, and dispatch AI jobs to connected standalone transcoders in which the AI worker is integrated. Remote AI workers register to one or more orchestrators over gRPC and receive a one-directional stream in return, on which AI job notifications are received. The transcoder runs an AI worker and runner to perform the tasks. Once a job is completed, remote AI workers will POST the results back to the orchestrator.

Remote Orchestrator+AI Worker Specification

Abstract

This document outlines the specification for integrating a remote AI worker setup with an Orchestrator, akin to the existing remote Orchestrator + Transcoder configuration. The goal is to enable dynamic orchestrator capabilities while maintaining flexibility for other potential use cases, such as public pools.

image

Architecture

The architecture mirrors that of the remote transcoder setup for an orchestrator. Instead of hosting AI workers locally, the orchestrator will function as a message broker and session manager, delegating tasks to remote workers.

image

Design Goals

  • Minimize changes/additions to the codebase
  • Adhere to go-livepeer conventions and design principles
  • Ensure maximum flexibility for various use cases

O<>W Wire Protocol

The communication between the orchestrator (O) and remote AI workers, integrated in the transcoder node (W), will use gRPC to register workers and notify them of new AI jobs over an open stream. The orchestrator will also run an HTTP server to receive results from remote workers. This approach aligns with the existing remote transcoding setup, facilitating ease of understanding and maintenance. Using the same networking stack eliminates the need for additional servers or microservices.

One huge benefit of this design is that the remote AI worker doesn't have to run a server and purely acts as a gRPC and HTTP RPC client. This means that the host wouldn't need to worry about port forwarding, which is of significant importance for public pools.

gRPC

RegisterAIWorker
The existing Transcoder service will be extended with a new RPC method RegisterAIWorker to register remote AI workers, specifying their capabilities, and maintaining an open connection to receive AI tasks.

service Transcoder {
    ...
  // Called by the transcoder to register a `RemoteAIWorker` worker to an orchestrator
  // notifies the registered `RemoteAIWorker` of AI jobs as they come in.
  rpc RegisterAIWorker(RegisterRequest) returns (stream NotifyAIJob);
}

NotifyAIJob
Remote AI workers will receive AI tasks through the NotifyAIJob stream. Upon completion, results will be sent back to the orchestrator via HTTP.

enum AIRequestType {
  TextToImage= 0;
  ImageToText= 1;
  ImageToImage= 2;
  Upscale = 3;
  AudioToText = 4;
}
// Sent by the orchestrator to the remote AI worker
message NotifyAIJob {
  AIRequestType type = 1;
  int64 taskID = 2;
  bytes data = 3; // json
}

HTTP

/aiResults
A new route on the orchestrator's HTTP server to receive AI task results from remote AI workers.

The route expects the following headers:

  • Authorization: "Livepeer-Transcoder-1.0"
  • Credentials: orchestrator's transcoder secret

The POST request body should be JSON, marshaling into a RemoteAIWorkerResult.

type RemoteAIWorkerResult struct {
	JobType net.AIRequestType
	TaskID  int64
	Data   []byte
	Err     string
}
  • JobType: used to identify the pipeline and the type to marshal Data into
  • TaskID: taskID used to track the channel on which to return the data
  • Data: actual result of the task as raw byte array
  • Err: any potential error that might have occured (convention is to return the error message as string, not the wrapped interface error type)

Orchestrator

RemoteAIWorkerManager

The RemoteAIWorkerManager is a newly implemented class that adheres to the AI interface. It is responsible for managing connections with remote workers, tracking ongoing tasks, and routing tasks to the appropriate remote workers.

type RemoteAIWorkerManager struct {
	// workers mapped by Pipeline(Capability) + ModelID
	remoteWorkers map[Capability]map[string][]*RemoteAIWorker
	liveWorkers   map[net.Transcoder_RegisterAIWorkerServer]*RemoteAIWorker
	workersMutex  sync.Mutex

	// tasks
	taskChans map[int64]RemoteAIResultChan
	taskMutex sync.RWMutex
	taskCount int64
}
Connection Management

Remote workers establish a client->server connection by sending a RegisterAIWorker request over gRPC. Active workers are kept in the liveWorkers mapping and removed upon connection loss.

Capability Management

On startup, the orchestrator specifies pipelines and models it wishes to have supported by its remote AI workers, along with a price if the set-up is on-chain.

Capability management works similar to tracking capacity management for remote transcoding. When a remote AI worker is registered, its supported AI capabilities and constraints will be incremented by '1' on the orchestrator.

An orchestrator has an AI capability if its capacity>0.

Task Management

For each AI task, a channel is created, identified by an incrementing TaskID (similar to taskIDs and channels for remote transcoding). When the orchestrator receives AI task results via HTTP, the TaskID retrieves the corresponding channel to forward the result back to the original subtrate that made the request.

Selection

Selection is performed on a round-robin basis, maintaining a queue. Workers are filtered based on the requested pipeline and model. The first worker in the filtered array is selected for the job. If it fails or is at capacity, it is moved to the back of the queue, and the task is retried with the next worker.

Later more elaborate selection strategies could be implemented that consider some kind of scoring strategy, though this doesn't currently exist for a normal remote transcoding setup either, as for a private setup poor performing transcoders can be simply taken out of rotation.

Retries

When a Remote AI worker fails a job, we rotate to the next worker in the remoteWorkers array for the given pipeline and models. This process is repeated until the context is cancelled, all workers have been tried, or a succesful job has been returned to the Orchestrator.

RemoteAIWorker

Is a class that is used to manage remote worker state on the RemoteAIWorkerManager. It mainly contains information on the remote worker's host address and capabilities.

type RemoteAIWorker struct {
	manager      *RemoteAIWorkerManager
	stream       net.Transcoder_RegisterAIWorkerServer
	addr         string
	capabilities *Capabilities
	eof          chan struct{}
}

Remote AI Worker (transcoder)

The remote AI worker will be responsible for spinning up containers to perform AI tasks through the AI worker and runner.

It connects to the orchestrator via a gRPC client, maintaining an open stream for job requests. The worker translates NotifyAIJob messages into a format suitable for the AI worker pipeline and sends results back to the orchestrator as RemoteAIWorkerResult over HTTP.

Local setup

Orchestrator

./livepeer -orchestrator -orchSecret abcd -serviceAddr 0.0.0.0:8935 -v 6 -aiModels ~/.lpData/aiModels.json

Remote Transcoder / AI node

./livepeer -aiWorker -transcoder -orchAddr 0.0.0.0:8935 -orchSecret abcd -aiModels ~/.lpData/aiModels2.json -aiModelsDir ~/.lpData -nvidia 1

Gateway

./livepeer -gateway -orchAddr 0.0.0.0:8935 -httpAddr 0.0.0.0:8936 --httpIngest -v 6

Example request

curl -X POST "http://192.168.68.99:8936/text-to-image" \
    -H "Content-Type: application/json" \
    -d '{
        "model_id":"ByteDance/SDXL-Lightning",
        "prompt":"Cool dog at a rave party",
        "width": 1024,
        "height": 1024
    }'
curl -X POST "http://192.168.68.99:8936/upscale" \
    -H "Content-Type: multipart/form-data" \
    -d '{
    "model_id": "stabilityai/stable-diffusion-x4-upscaler",
    }'

How did you test each of these updates (required)

  • Unit tests
  • Integration tests
  • Manual off-chain end-to-end testing
  • Manual on-chain end-to-end testing

Does this pull request close any open issues?

Checklist:

@github-actions github-actions bot added the AI Issues and PR related to the AI-video branch. label Jul 27, 2024
@kyriediculous kyriediculous changed the title Merge check Remote AI Worker (Livepool version) merge check Jul 27, 2024
@kyriediculous kyriediculous changed the title Remote AI Worker (Livepool version) merge check [DRAFT] Remote AI Worker (Livepool version) merge check Jul 28, 2024
kyriediculous and others added 26 commits July 30, 2024 01:57
@kyriediculous kyriediculous marked this pull request as draft July 30, 2024 18:59
@kyriediculous kyriediculous marked this pull request as ready for review July 30, 2024 21:46
@kyriediculous kyriediculous changed the title [DRAFT] Remote AI Worker (Livepool version) merge check Remote AI Worker (Livepool version) Jul 30, 2024
@rickstaa rickstaa force-pushed the ai-video-rebase branch 2 times, most recently from 4d54872 to 8e654d7 Compare August 2, 2024 10:09
@rickstaa rickstaa deleted the branch livepeer:ai-video August 7, 2024 20:53
@rickstaa rickstaa closed this Aug 7, 2024
@rickstaa rickstaa reopened this Aug 7, 2024
@rickstaa rickstaa deleted the branch livepeer:ai-video August 10, 2024 06:53
@rickstaa rickstaa closed this Aug 10, 2024
@rickstaa rickstaa reopened this Aug 10, 2024
@rickstaa rickstaa changed the base branch from ai-video-rebase to ai-video August 10, 2024 15:28
@eliteprox eliteprox mentioned this pull request Aug 12, 2024
5 tasks
@leszko
Copy link
Contributor

leszko commented Oct 21, 2024

Closing this one since an alternative Remote Worker implementation was merged: #3168

@leszko leszko closed this Oct 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI Issues and PR related to the AI-video branch.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants