From f589823ad23a1123e50d9f3aed79707575283e2f Mon Sep 17 00:00:00 2001 From: Ben Grewell Date: Mon, 5 Apr 2021 11:00:18 -0700 Subject: [PATCH 1/3] update comments --- .idea/workspace.xml | 7 +------ controller.go | 6 ------ 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index dbdf511..bf78bbd 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,12 +2,7 @@ - - - - - - + diff --git a/controller.go b/controller.go index 22f67d2..6b262a7 100644 --- a/controller.go +++ b/controller.go @@ -26,12 +26,6 @@ func NewController(port int) (controller *Controller, err error) { // server side it listens for new gRPC connections, when a connection is made by a client the client can tell it to // start a new iperf server instance. It will start a instance on an unused port and return the port number to the // client. This allows the entire iperf setup and session to be performed from the client side. -// -// CLIENT SERVER -// connect to grpc ---> accept grpc connection -// call StartServer() ---> find unused port -// start iperf server on port -// get server port info <--- return port information to client type Controller struct { api.UnimplementedCommandServer Port int From a4409243ab416f6ab9a6261ac84e54796b43283b Mon Sep 17 00:00:00 2001 From: Ben Grewell Date: Tue, 6 Apr 2021 12:55:39 -0700 Subject: [PATCH 2/3] add mac osx support --- .idea/workspace.xml | 4 +- reporter_darwin.go | 96 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 reporter_darwin.go diff --git a/.idea/workspace.xml b/.idea/workspace.xml index bf78bbd..e17c851 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,7 +2,9 @@ + + diff --git a/reporter_darwin.go b/reporter_darwin.go new file mode 100644 index 0000000..d7deb4c --- /dev/null +++ b/reporter_darwin.go @@ -0,0 +1,96 @@ +package iperf + +import ( + "fmt" + "github.com/BGrewell/go-conversions" + "github.com/BGrewell/tail" + "log" + "strconv" + "strings" + "time" +) + +//TODO: This has not been tested on OS X ... my assumption is it is the exact same as linux but if it's not then the +// reporting will be broken + +func (r *Reporter) runLogProcessor() { + var err error + r.tailer, err = tail.TailFile(r.LogFile, tail.Config{ + Follow: true, + ReOpen: true, + Poll: false, // on linux we don't need to poll as the fsnotify works properly + MustExist: true, + }) + if err != nil { + log.Fatalf("failed to tail log file: %v", err) + } + + for { + select { + case line := <-r.tailer.Lines: + if line == nil { + continue + } + if len(line.Text) > 5 { + id := line.Text[1:4] + stream, err := strconv.Atoi(strings.TrimSpace(id)) + if err != nil { + continue + } + fields := strings.Fields(line.Text[5:]) + if len(fields) >= 9 { + if fields[0] == "local" { + continue + } + timeFields := strings.Split(fields[0], "-") + start, err := strconv.ParseFloat(timeFields[0], 32) + if err != nil { + log.Printf("failed to convert start time: %s\n", err) + } + end, err := strconv.ParseFloat(timeFields[1], 32) + transferedStr := fmt.Sprintf("%s%s", fields[2], fields[3]) + transferedBytes, err := conversions.StringBitRateToInt(transferedStr) + if err != nil { + log.Printf("failed to convert units: %s\n", err) + } + transferedBytes = transferedBytes / 8 + rateStr := fmt.Sprintf("%s%s", fields[4], fields[5]) + rate, err := conversions.StringBitRateToInt(rateStr) + if err != nil { + log.Printf("failed to convert units: %s\n", err) + } + retrans, err := strconv.Atoi(fields[6]) + if err != nil { + log.Printf("failed to convert units: %s\n", err) + } + cwndStr := fmt.Sprintf("%s%s", fields[7], fields[8]) + cwnd, err := conversions.StringBitRateToInt(cwndStr) + if err != nil { + log.Printf("failed to convert units: %s\n", err) + } + cwnd = cwnd / 8 + omitted := false + if len(fields) >= 10 && fields[9] == "(omitted)" { + omitted = true + } + report := &StreamIntervalReport{ + Socket: stream, + StartInterval: float32(start), + EndInterval: float32(end), + Seconds: float32(end - start), + Bytes: int(transferedBytes), + BitsPerSecond: float64(rate), + Retransmissions: retrans, + CongestionWindow: int(cwnd), + Omitted: omitted, + } + r.ReportingChannel <- report + } + } + case <-time.After(100 * time.Millisecond): + if !r.running { + return + } + } + } +} From 328913249f87399ed1ce133fec58df85a24aa9b0 Mon Sep 17 00:00:00 2001 From: Ben Grewell Date: Thu, 26 Aug 2021 08:40:07 -0700 Subject: [PATCH 3/3] add return of iperf pid --- .idea/workspace.xml | 20 +++++++------ client.go | 17 ++++++++--- cmd/main.go | 70 +++++++++++++++++++++++---------------------- execute.go | 20 +++++++------ server.go | 17 ++++++++--- 5 files changed, 85 insertions(+), 59 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 0ba6e1b..0605f83 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -5,9 +5,11 @@ - - + + + + - + + - + - + - + - + - + - diff --git a/client.go b/client.go index 58ec83e..c38006c 100644 --- a/client.go +++ b/client.go @@ -489,15 +489,24 @@ func (c *Client) SetModeLive() <-chan *StreamIntervalReport { } func (c *Client) Start() (err error) { + _, err = c.start() + return err +} + +func (c *Client) StartEx() (pid int, err error) { + return c.start() +} + +func (c *Client) start() (pid int, err error) { read := make(chan interface{}) cmd, err := c.commandString() if err != nil { - return err + return -1, err } var exit chan int - c.outputStream, c.errorStream, exit, c.cancel, err = ExecuteAsyncWithCancelReadIndicator(cmd, read) + c.outputStream, c.errorStream, exit, c.cancel, pid, err = ExecuteAsyncWithCancelReadIndicator(cmd, read) if err != nil { - return err + return -1, err } c.Running = true @@ -548,7 +557,7 @@ func (c *Client) Start() (err error) { reporter.Stop() } }() - return nil + return pid, nil } func (c *Client) Stop() { diff --git a/cmd/main.go b/cmd/main.go index c2ea8fb..d924131 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,45 +2,47 @@ package main import ( //"fmt" -//"github.com/BGrewell/go-conversions" + "github.com/BGrewell/go-conversions" //"github.com/BGrewell/go-iperf" //"time" + "fmt" + "github.com/BGrewell/go-iperf" + "time" ) func main() { - //s := iperf.NewServer() - //c := iperf.NewClient("127.0.0.1") - //includeServer := true - //c.IncludeServer = &includeServer - //fmt.Println(s.Id) - //fmt.Println(c.Id) - // - //err := s.Start() - //if err != nil { - // fmt.Println(err) - //} - // - //err = c.Start() - //if err != nil { - // fmt.Println(err) - //} - // - //for c.Running { - // time.Sleep(1 * time.Second) - //} - // - //fmt.Println("stopping server") - //s.Stop() - // - //fmt.Printf("Client exit code: %d\n", *c.ExitCode) - //fmt.Printf("Server exit code: %d\n", *s.ExitCode) - //iperf.Cleanup() - //if c.Report.Error != "" { - // fmt.Println(c.Report.Error) - //} else { - // fmt.Printf("Recv Rate: %s\n", conversions.IntBitRateToString(int64(c.Report.End.SumReceived.BitsPerSecond))) - // fmt.Printf("Send Rate: %s\n", conversions.IntBitRateToString(int64(c.Report.End.SumSent.BitsPerSecond))) - //} + s := iperf.NewServer() + c := iperf.NewClient("127.0.0.1") + c.SetIncludeServer(true) + fmt.Println(s.Id) + fmt.Println(c.Id) + + err := s.Start() + if err != nil { + fmt.Println(err) + } + + err = c.Start() + if err != nil { + fmt.Println(err) + } + + for c.Running { + time.Sleep(1 * time.Second) + } + + fmt.Println("stopping server") + s.Stop() + + fmt.Printf("Client exit code: %d\n", *c.ExitCode()) + fmt.Printf("Server exit code: %d\n", *s.ExitCode) + iperf.Cleanup() + if c.Report().Error != "" { + fmt.Println(c.Report().Error) + } else { + fmt.Printf("Recv Rate: %s\n", conversions.IntBitRateToString(int64(c.Report().End.SumReceived.BitsPerSecond))) + fmt.Printf("Send Rate: %s\n", conversions.IntBitRateToString(int64(c.Report().End.SumSent.BitsPerSecond))) + } } diff --git a/execute.go b/execute.go index f8ae6d3..3bdebb4 100644 --- a/execute.go +++ b/execute.go @@ -42,34 +42,38 @@ func ExecuteAsync(cmd string) (outPipe io.ReadCloser, errPipe io.ReadCloser, exi return outPipe, errPipe, exitCode, nil } -func ExecuteAsyncWithCancel(cmd string) (stdOut io.ReadCloser, stdErr io.ReadCloser, exitCode chan int, cancelToken context.CancelFunc, err error) { +func ExecuteAsyncWithCancel(cmd string) (stdOut io.ReadCloser, stdErr io.ReadCloser, exitCode chan int, cancelToken context.CancelFunc, pid int, err error) { return ExecuteAsyncWithCancelReadIndicator(cmd, nil) } -func ExecuteAsyncWithCancelReadIndicator(cmd string, readIndicator chan interface{}) (stdOut io.ReadCloser, stdErr io.ReadCloser, exitCode chan int, cancelToken context.CancelFunc, err error) { +func ExecuteAsyncWithCancelReadIndicator(cmd string, readIndicator chan interface{}) (stdOut io.ReadCloser, stdErr io.ReadCloser, exitCode chan int, cancelToken context.CancelFunc, pid int, err error) { + return executeAsyncWithCancel(cmd, readIndicator) +} + +func executeAsyncWithCancel(cmd string, readIndicator chan interface{}) (stdOut io.ReadCloser, stdErr io.ReadCloser, exitCode chan int, cancelToken context.CancelFunc, pid int, err error) { exitCode = make(chan int) ctx, cancel := context.WithCancel(context.Background()) cmdParts := strings.Fields(cmd) binary, err := exec.LookPath(cmdParts[0]) if err != nil { defer cancel() - return nil, nil, nil, nil, err + return nil, nil, nil, nil, -1, err } exe := exec.CommandContext(ctx, binary, cmdParts[1:]...) stdOut, err = exe.StdoutPipe() if err != nil { defer cancel() - return nil, nil, nil, nil, err + return nil, nil, nil, nil, -1, err } stdErr, err = exe.StderrPipe() if err != nil { defer cancel() - return nil, nil, nil, nil, err + return nil, nil, nil, nil, -1, err } err = exe.Start() if err != nil { defer cancel() - return nil, nil, nil, nil, err + return nil, nil, nil, nil, -1, err } go func() { // Note: Wait() will close the Stdout/Stderr and in some cases can do it before we read. In order to prevent @@ -87,5 +91,5 @@ func ExecuteAsyncWithCancelReadIndicator(cmd string, readIndicator chan interfac exitCode <- 0 } }() - return stdOut, stdErr, exitCode, cancel, nil -} + return stdOut, stdErr, exitCode, cancel, exe.Process.Pid, nil +} \ No newline at end of file diff --git a/server.go b/server.go index e3a11ff..aa49973 100644 --- a/server.go +++ b/server.go @@ -151,14 +151,23 @@ func (s *Server) SetLogFile(filename string) { } func (s *Server) Start() (err error) { + _, err = s.start() + return err +} + +func (s *Server) StartEx() (pid int, err error) { + return s.start() +} + +func (s *Server) start() (pid int, err error) { cmd, err := s.commandString() if err != nil { - return err + return -1, err } var exit chan int - s.outputStream, s.errorStream, exit, s.cancel, err = ExecuteAsyncWithCancel(cmd) + s.outputStream, s.errorStream, exit, s.cancel, pid, err = ExecuteAsyncWithCancel(cmd) if err != nil { - return err + return -1, err } s.Running = true @@ -176,7 +185,7 @@ func (s *Server) Start() (err error) { s.ExitCode = &exitCode s.Running = false }() - return nil + return pid,nil } func (s *Server) Stop() {