You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
buildx/util/ioset/ioset.go

275 lines
6.2 KiB
Go

package ioset
import (
"io"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Pipe returns a pair of piped readers and writers collection.
// They are useful for controlling stdio stream using Forwarder function.
func Pipe() (In, Out) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()
r3, w3 := io.Pipe()
return In{r1, w2, w3}, Out{w1, r2, r3}
}
type In struct {
Stdin io.ReadCloser
Stdout io.WriteCloser
Stderr io.WriteCloser
}
func (s In) Close() (retErr error) {
if err := s.Stdin.Close(); err != nil {
retErr = err
}
if err := s.Stdout.Close(); err != nil {
retErr = err
}
if err := s.Stderr.Close(); err != nil {
retErr = err
}
return
}
type Out struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}
func (s Out) Close() (retErr error) {
if err := s.Stdin.Close(); err != nil {
retErr = err
}
if err := s.Stdout.Close(); err != nil {
retErr = err
}
if err := s.Stderr.Close(); err != nil {
retErr = err
}
return
}
// Forwarder forwards IO between readers and writers contained
// in In and Out structs.
// In and Out can be changed during forwarding using SetIn and SetOut methods.
type Forwarder struct {
stdin *SingleForwarder
stdout *SingleForwarder
stderr *SingleForwarder
mu sync.Mutex
// PropagateStdinClose indicates whether EOF from Stdin of Out should be propagated.
// If this is true, EOF from Stdin (reader) of Out closes Stdin (writer) of In.
PropagateStdinClose bool
}
func NewForwarder() *Forwarder {
return &Forwarder{
stdin: NewSingleForwarder(),
stdout: NewSingleForwarder(),
stderr: NewSingleForwarder(),
PropagateStdinClose: true,
}
}
func (f *Forwarder) Close() (retErr error) {
if err := f.stdin.Close(); err != nil {
retErr = err
}
if err := f.stdout.Close(); err != nil {
retErr = err
}
if err := f.stderr.Close(); err != nil {
retErr = err
}
return retErr
}
func (f *Forwarder) SetOut(out *Out) {
f.mu.Lock()
if out == nil {
f.stdin.SetWriter(nil, func() io.WriteCloser { return nil })
f.stdout.SetReader(nil)
f.stderr.SetReader(nil)
} else {
f.stdin.SetWriter(out.Stdin, func() io.WriteCloser {
if f.PropagateStdinClose {
out.Stdin.Close() // propagate EOF
logrus.Debug("forwarder: propagating stdin close")
return nil
}
return out.Stdin
})
f.stdout.SetReader(out.Stdout)
f.stderr.SetReader(out.Stderr)
}
f.mu.Unlock()
}
func (f *Forwarder) SetIn(in *In) {
f.mu.Lock()
if in == nil {
f.stdin.SetReader(nil)
f.stdout.SetWriter(nil, func() io.WriteCloser { return nil })
f.stderr.SetWriter(nil, func() io.WriteCloser { return nil })
} else {
f.stdin.SetReader(in.Stdin)
f.stdout.SetWriter(in.Stdout, func() io.WriteCloser {
return in.Stdout // continue write; TODO: make it configurable if needed
})
f.stderr.SetWriter(in.Stderr, func() io.WriteCloser {
return in.Stderr // continue write; TODO: make it configurable if needed
})
}
f.mu.Unlock()
}
// SingleForwarder forwards IO from a reader to a writer.
// The reader and writer can be changed during forwarding
// using SetReader and SetWriter methods.
type SingleForwarder struct {
curR io.ReadCloser // closed when set another reader
curRMu sync.Mutex
curW io.WriteCloser // closed when set another writer
curWEOFHandler func() io.WriteCloser
curWMu sync.Mutex
updateRCh chan io.ReadCloser
doneCh chan struct{}
closeOnce sync.Once
}
func NewSingleForwarder() *SingleForwarder {
f := &SingleForwarder{
updateRCh: make(chan io.ReadCloser),
doneCh: make(chan struct{}),
}
go f.doForward()
return f
}
func (f *SingleForwarder) doForward() {
var r io.ReadCloser
for {
readerInvalid := false
var readerInvalidMu sync.Mutex
copyReaderToWriter := false
if r != nil {
copyReaderToWriter = true
}
if copyReaderToWriter {
srcR := r
go func() {
buf := make([]byte, 4096)
readerClosed := false
for {
n, readErr := srcR.Read(buf)
if readErr != nil {
srcR.Close()
readerClosed = true
if !errors.Is(readErr, io.EOF) && !errors.Is(readErr, io.ErrClosedPipe) {
logrus.Debugf("single forwarder: reader error: %v", readErr)
return
}
}
f.curWMu.Lock()
w := f.curW
f.curWMu.Unlock()
if w != nil {
if _, err := w.Write(buf[:n]); err != nil && !errors.Is(err, io.ErrClosedPipe) {
logrus.Debugf("single forwarder: writer error: %v", err)
}
}
readerInvalidMu.Lock()
ri := readerInvalid
readerInvalidMu.Unlock()
if ri || readerClosed {
return
}
if readErr != io.EOF {
logrus.Debugf("unknown error: %v\n", readErr)
continue
}
f.curWMu.Lock()
var newW io.WriteCloser
if f.curWEOFHandler != nil {
newW = f.curWEOFHandler()
}
f.curW = newW
f.curWMu.Unlock()
return
}
}()
}
select {
case newR := <-f.updateRCh:
f.curRMu.Lock()
if f.curR != nil {
f.curR.Close()
}
f.curR = newR
r = newR
readerInvalidMu.Lock()
readerInvalid = true
readerInvalidMu.Unlock()
f.curRMu.Unlock()
case <-f.doneCh:
return
}
}
}
// Close closes the both of registered reader and writer and finishes the forwarder.
func (f *SingleForwarder) Close() (retErr error) {
f.closeOnce.Do(func() {
f.curRMu.Lock()
r := f.curR
f.curR = nil
f.curRMu.Unlock()
if r != nil {
if err := r.Close(); err != nil {
retErr = err
}
}
// TODO: Wait until read data fully written to the current writer if needed.
f.curWMu.Lock()
w := f.curW
f.curW = nil
f.curWMu.Unlock()
if w != nil {
if err := w.Close(); err != nil {
retErr = err
}
}
close(f.doneCh)
})
return retErr
}
// SetWriter sets the specified writer as the forward destination.
// If curWEOFHandler isn't nil, this will be called when the current reader returns EOF.
func (f *SingleForwarder) SetWriter(w io.WriteCloser, curWEOFHandler func() io.WriteCloser) {
f.curWMu.Lock()
if f.curW != nil {
// close all stream on the current IO no to mix with the new IO
f.curW.Close()
}
f.curW = w
f.curWEOFHandler = curWEOFHandler
f.curWMu.Unlock()
}
// SetWriter sets the specified reader as the forward source.
func (f *SingleForwarder) SetReader(r io.ReadCloser) {
f.updateRCh <- r
}