|
|
|
package sshforward
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"io"
|
|
|
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Stream interface {
|
|
|
|
SendMsg(m interface{}) error
|
|
|
|
RecvMsg(m interface{}) error
|
|
|
|
}
|
|
|
|
|
|
|
|
func Copy(ctx context.Context, conn io.ReadWriteCloser, stream Stream, closeStream func() error) error {
|
|
|
|
defer conn.Close()
|
|
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
|
|
|
|
|
|
g.Go(func() (retErr error) {
|
|
|
|
p := &BytesMessage{}
|
|
|
|
for {
|
|
|
|
if err := stream.RecvMsg(p); err != nil {
|
|
|
|
if err == io.EOF {
|
|
|
|
// indicates client performed CloseSend, but they may still be
|
|
|
|
// reading data
|
|
|
|
if closeWriter, ok := conn.(interface {
|
|
|
|
CloseWrite() error
|
|
|
|
}); ok {
|
|
|
|
closeWriter.CloseWrite()
|
|
|
|
} else {
|
|
|
|
conn.Close()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
conn.Close()
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
conn.Close()
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
if _, err := conn.Write(p.Data); err != nil {
|
|
|
|
conn.Close()
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
p.Data = p.Data[:0]
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
g.Go(func() (retErr error) {
|
|
|
|
for {
|
|
|
|
buf := make([]byte, 32*1024)
|
|
|
|
n, err := conn.Read(buf)
|
|
|
|
switch {
|
|
|
|
case err == io.EOF:
|
|
|
|
if closeStream != nil {
|
|
|
|
closeStream()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
case err != nil:
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
p := &BytesMessage{Data: buf[:n]}
|
|
|
|
if err := stream.SendMsg(p); err != nil {
|
|
|
|
return errors.WithStack(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
return g.Wait()
|
|
|
|
}
|