Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: allow builds from stdin for multi-node builders #2656

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type CallFunc struct {
type Inputs struct {
ContextPath string
DockerfilePath string
InStream io.Reader
InStream *SyncMultiReader
ContextState *llb.State
DockerfileInline string
NamedContexts map[string]NamedContext
Expand Down
35 changes: 23 additions & 12 deletions build/opt.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package build

import (
"bufio"
"bytes"
"context"
"io"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -260,7 +261,7 @@ func toSolveOpt(ctx context.Context, node builder.Node, multiDriver bool, opt Op
}

so.Exports = opt.Exports
so.Session = opt.Session
so.Session = slices.Clone(opt.Session)

releaseLoad, err := loadInputs(ctx, nodeDriver, opt.Inputs, pw, &so)
if err != nil {
Expand Down Expand Up @@ -364,7 +365,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog

var (
err error
dockerfileReader io.Reader
dockerfileReader io.ReadCloser
dockerfileDir string
dockerfileName = inp.DockerfilePath
toRemove []string
Expand All @@ -382,23 +383,23 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
return nil, errors.Errorf("invalid argument: can't use stdin for both build context and dockerfile")
}

buf := bufio.NewReader(inp.InStream)
magic, err := buf.Peek(archiveHeaderSize * 2)
rc := inp.InStream.NewReadCloser()
magic, err := inp.InStream.Peek(archiveHeaderSize * 2)
if err != nil && err != io.EOF {
return nil, errors.Wrap(err, "failed to peek context header from STDIN")
}
if !(err == io.EOF && len(magic) == 0) {
if isArchive(magic) {
// stdin is context
up := uploadprovider.New()
target.FrontendAttrs["context"] = up.Add(buf)
target.FrontendAttrs["context"] = up.Add(rc)
target.Session = append(target.Session, up)
} else {
if inp.DockerfilePath != "" {
return nil, errors.Errorf("ambiguous Dockerfile source: both stdin and flag correspond to Dockerfiles")
}
// stdin is dockerfile
dockerfileReader = buf
dockerfileReader = rc
inp.ContextPath, _ = os.MkdirTemp("", "empty-dir")
toRemove = append(toRemove, inp.ContextPath)
if err := setLocalMount("context", inp.ContextPath, target); err != nil {
Expand All @@ -417,7 +418,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
target.SharedKey = sharedKey
switch inp.DockerfilePath {
case "-":
dockerfileReader = inp.InStream
dockerfileReader = inp.InStream.NewReadCloser()
case "":
dockerfileDir = inp.ContextPath
default:
Expand All @@ -426,7 +427,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
}
case IsRemoteURL(inp.ContextPath):
if inp.DockerfilePath == "-" {
dockerfileReader = inp.InStream
dockerfileReader = inp.InStream.NewReadCloser()
} else if filepath.IsAbs(inp.DockerfilePath) {
dockerfileDir = filepath.Dir(inp.DockerfilePath)
dockerfileName = filepath.Base(inp.DockerfilePath)
Expand All @@ -438,11 +439,11 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
}

if inp.DockerfileInline != "" {
dockerfileReader = strings.NewReader(inp.DockerfileInline)
dockerfileReader = io.NopCloser(strings.NewReader(inp.DockerfileInline))
}

if dockerfileReader != nil {
dockerfileDir, err = createTempDockerfile(dockerfileReader)
dockerfileDir, err = createTempDockerfile(dockerfileReader, inp.InStream)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -582,7 +583,7 @@ func setLocalMount(name, dir string, so *client.SolveOpt) error {
return nil
}

func createTempDockerfile(r io.Reader) (string, error) {
func createTempDockerfile(r io.Reader, multiReader *SyncMultiReader) (string, error) {
dir, err := os.MkdirTemp("", "dockerfile")
if err != nil {
return "", err
Expand All @@ -592,6 +593,16 @@ func createTempDockerfile(r io.Reader) (string, error) {
return "", err
}
defer f.Close()

if multiReader != nil {
dt, err := io.ReadAll(r)
if err != nil {
return "", err
}
multiReader.Reset(dt)
r = bytes.NewReader(dt)
}

if _, err := io.Copy(f, r); err != nil {
return "", err
}
Expand Down
164 changes: 164 additions & 0 deletions build/replicatedstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package build

import (
"bufio"
"bytes"
"io"
"sync"
)

type SyncMultiReader struct {
source *bufio.Reader
buffer []byte
static []byte
mu sync.Mutex
cond *sync.Cond
readers []*syncReader
err error
offset int
}

type syncReader struct {
mr *SyncMultiReader
offset int
closed bool
}

func NewSyncMultiReader(source io.Reader) *SyncMultiReader {
mr := &SyncMultiReader{
source: bufio.NewReader(source),
buffer: make([]byte, 0, 32*1024),
}
mr.cond = sync.NewCond(&mr.mu)
return mr
}

func (mr *SyncMultiReader) Peek(n int) ([]byte, error) {
mr.mu.Lock()
defer mr.mu.Unlock()

if mr.static != nil {
return mr.static[min(n, len(mr.static)):], nil
}

return mr.source.Peek(n)
}

func (mr *SyncMultiReader) Reset(dt []byte) {
mr.mu.Lock()
defer mr.mu.Unlock()

mr.static = dt
}

func (mr *SyncMultiReader) NewReadCloser() io.ReadCloser {
mr.mu.Lock()
defer mr.mu.Unlock()

if mr.static != nil {
return io.NopCloser(bytes.NewReader(mr.static))
}

reader := &syncReader{
mr: mr,
}
mr.readers = append(mr.readers, reader)
return reader
}

func (sr *syncReader) Read(p []byte) (int, error) {
sr.mr.mu.Lock()
defer sr.mr.mu.Unlock()

return sr.read(p)
}

func (sr *syncReader) read(p []byte) (int, error) {
end := sr.mr.offset + len(sr.mr.buffer)

loop0:
for {
if sr.closed {
return 0, io.EOF
}

end := sr.mr.offset + len(sr.mr.buffer)

if sr.mr.err != nil && sr.offset == end {
return 0, sr.mr.err
}

start := sr.offset - sr.mr.offset

dt := sr.mr.buffer[start:]

if len(dt) > 0 {
n := copy(p, dt)
sr.offset += n
sr.mr.cond.Broadcast()
return n, nil
}

// check for readers that have not caught up
hasOpen := false
for _, r := range sr.mr.readers {
if !r.closed {
hasOpen = true
} else {
continue
}
if r.offset < end {
sr.mr.cond.Wait()
continue loop0
}
}

if !hasOpen {
return 0, io.EOF
}
break
}

last := sr.mr.offset + len(sr.mr.buffer)
// another reader has already updated the buffer
if last > end || sr.mr.err != nil {
return sr.read(p)
}

sr.mr.offset += len(sr.mr.buffer)

sr.mr.buffer = sr.mr.buffer[:cap(sr.mr.buffer)]
n, err := sr.mr.source.Read(sr.mr.buffer)
if n >= 0 {
sr.mr.buffer = sr.mr.buffer[:n]
} else {
sr.mr.buffer = sr.mr.buffer[:0]
}

sr.mr.cond.Broadcast()

if err != nil {
sr.mr.err = err
return 0, err
}

nn := copy(p, sr.mr.buffer)
sr.offset += nn

return nn, nil
}

func (sr *syncReader) Close() error {
sr.mr.mu.Lock()
defer sr.mr.mu.Unlock()

if sr.closed {
return nil
}

sr.closed = true

sr.mr.cond.Broadcast()

return nil
}
77 changes: 77 additions & 0 deletions build/replicatedstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package build

import (
"bytes"
"crypto/rand"
"io"
mathrand "math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func generateRandomData(size int) []byte {
data := make([]byte, size)
rand.Read(data)
return data
}
func TestSyncMultiReaderParallel(t *testing.T) {
data := generateRandomData(1024 * 1024)
source := bytes.NewReader(data)
mr := NewSyncMultiReader(source)

var wg sync.WaitGroup
numReaders := 10
bufferSize := 4096 * 4

readers := make([]io.ReadCloser, numReaders)

for i := 0; i < numReaders; i++ {
readers[i] = mr.NewReadCloser()
}

for i := 0; i < numReaders; i++ {
wg.Add(1)
go func(readerId int) {
defer wg.Done()
reader := readers[readerId]
defer reader.Close()

totalRead := 0
buf := make([]byte, bufferSize)
for totalRead < len(data) {
// Simulate random read sizes
readSize := mathrand.Intn(bufferSize) //nolint:gosec
n, err := reader.Read(buf[:readSize])

if n > 0 {
assert.Equal(t, data[totalRead:totalRead+n], buf[:n], "Reader %d mismatch", readerId)
totalRead += n
}

if err == io.EOF {
assert.Equal(t, len(data), totalRead, "Reader %d EOF mismatch", readerId)
return
}

require.NoError(t, err, "Reader %d error", readerId)

if mathrand.Intn(1000) == 0 { //nolint:gosec
t.Logf("Reader %d closing", readerId)
// Simulate random close
return
}

// Simulate random timing between reads
time.Sleep(time.Millisecond * time.Duration(mathrand.Intn(5))) //nolint:gosec
}

assert.Equal(t, len(data), totalRead, "Reader %d total read mismatch", readerId)
}(i)
}

wg.Wait()
}
2 changes: 1 addition & 1 deletion controller/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
Inputs: build.Inputs{
ContextPath: in.ContextPath,
DockerfilePath: in.DockerfileName,
InStream: inStream,
InStream: build.NewSyncMultiReader(inStream),
NamedContexts: contexts,
},
Ref: in.Ref,
Expand Down