fix issues with reporter closing
This commit is contained in:
2
.idea/workspace.xml
generated
2
.idea/workspace.xml
generated
@@ -3,7 +3,9 @@
|
||||
<component name="ChangeListManager">
|
||||
<list default="true" id="fc2840de-29dc-4fca-8e0e-a283562f60ca" name="Default Changelist" comment="">
|
||||
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/reporter.go" beforeDir="false" afterPath="$PROJECT_DIR$/reporter.go" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/reporter_linux.go" beforeDir="false" afterPath="$PROJECT_DIR$/reporter_linux.go" afterDir="false" />
|
||||
<change beforePath="$PROJECT_DIR$/tests/client/client.go" beforeDir="false" afterPath="$PROJECT_DIR$/tests/client/client.go" afterDir="false" />
|
||||
</list>
|
||||
<option name="SHOW_DIALOG" value="false" />
|
||||
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
package iperf
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/BGrewell/tail"
|
||||
)
|
||||
|
||||
type Reporter struct {
|
||||
ReportingChannel chan *StreamIntervalReport
|
||||
LogFile string
|
||||
running bool
|
||||
tailer *tail.Tail
|
||||
}
|
||||
|
||||
func (r *Reporter) Start() {
|
||||
@@ -13,7 +19,10 @@ func (r *Reporter) Start() {
|
||||
|
||||
func (r *Reporter) Stop() {
|
||||
r.running = false
|
||||
r.tailer.Stop()
|
||||
r.tailer.Cleanup()
|
||||
close(r.ReportingChannel)
|
||||
fmt.Println("reporter stopped")
|
||||
}
|
||||
|
||||
// runLogProcessor is OS specific because of differences in iperf on Windows and Linux
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -44,9 +45,10 @@ Connecting to host 10.254.100.100, port 5201
|
||||
iperf Done.
|
||||
|
||||
*/
|
||||
|
||||
//TODO: NEED TO UPDATE WINDOWS IMPLEMENTATION ALSO!!!!!
|
||||
func (r *Reporter) runLogProcessor() {
|
||||
tailer, err := tail.TailFile(r.LogFile, tail.Config{
|
||||
var err error
|
||||
r.tailer, err = tail.TailFile(r.LogFile, tail.Config{
|
||||
Follow: true,
|
||||
ReOpen: true,
|
||||
Poll: true, // on linux we don't need to poll as the fsnotify works properly
|
||||
@@ -55,69 +57,73 @@ func (r *Reporter) runLogProcessor() {
|
||||
if err != nil {
|
||||
log.Fatalf("failed to tail log file: %v", err)
|
||||
}
|
||||
for line := range tailer.Lines {
|
||||
// TODO: For now this only cares about individual streams it ignores the sum lines
|
||||
if len(line.Text) > 5 {
|
||||
id := line.Text[1:4]
|
||||
stream, err := strconv.Atoi(strings.TrimSpace(id))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fields := strings.Fields(line.Text[5:])
|
||||
if len(fields) >= 9 {
|
||||
if fields[0] == "local" {
|
||||
|
||||
for {
|
||||
select {
|
||||
case line := <- r.tailer.Lines:
|
||||
if line == nil {
|
||||
continue
|
||||
}
|
||||
timeFields := strings.Split(fields[0], "-")
|
||||
start, err := strconv.ParseFloat(timeFields[0], 32)
|
||||
if err != nil {
|
||||
log.Printf("failed to convert start time: %s\n", err)
|
||||
if len(line.Text) > 5 {
|
||||
id := line.Text[1:4]
|
||||
stream, err := strconv.Atoi(strings.TrimSpace(id))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fields := strings.Fields(line.Text[5:])
|
||||
if len(fields) >= 9 {
|
||||
if fields[0] == "local" {
|
||||
continue
|
||||
}
|
||||
timeFields := strings.Split(fields[0], "-")
|
||||
start, err := strconv.ParseFloat(timeFields[0], 32)
|
||||
if err != nil {
|
||||
log.Printf("failed to convert start time: %s\n", err)
|
||||
}
|
||||
end, err := strconv.ParseFloat(timeFields[1], 32)
|
||||
transferedStr := fmt.Sprintf("%s%s", fields[2], fields[3])
|
||||
transferedBytes, err := conversions.StringBitRateToInt(transferedStr)
|
||||
if err != nil {
|
||||
log.Printf("failed to convert units: %s\n", err)
|
||||
}
|
||||
transferedBytes = transferedBytes / 8
|
||||
rateStr := fmt.Sprintf("%s%s", fields[4], fields[5])
|
||||
rate, err := conversions.StringBitRateToInt(rateStr)
|
||||
if err != nil {
|
||||
log.Printf("failed to convert units: %s\n", err)
|
||||
}
|
||||
retrans, err := strconv.Atoi(fields[6])
|
||||
if err != nil {
|
||||
log.Printf("failed to convert units: %s\n", err)
|
||||
}
|
||||
cwndStr := fmt.Sprintf("%s%s", fields[7], fields[8])
|
||||
cwnd, err := conversions.StringBitRateToInt(cwndStr)
|
||||
if err != nil {
|
||||
log.Printf("failed to convert units: %s\n", err)
|
||||
}
|
||||
cwnd = cwnd / 8
|
||||
omitted := false
|
||||
if len(fields) >= 10 && fields[9] == "(omitted)" {
|
||||
omitted = true
|
||||
}
|
||||
report := &StreamIntervalReport{
|
||||
Socket: stream,
|
||||
StartInterval: float32(start),
|
||||
EndInterval: float32(end),
|
||||
Seconds: float32(end - start),
|
||||
Bytes: int(transferedBytes),
|
||||
BitsPerSecond: float64(rate),
|
||||
Retransmissions: retrans,
|
||||
CongestionWindow: int(cwnd),
|
||||
Omitted: omitted,
|
||||
}
|
||||
r.ReportingChannel <- report
|
||||
}
|
||||
}
|
||||
end, err := strconv.ParseFloat(timeFields[1], 32)
|
||||
transferedStr := fmt.Sprintf("%s%s", fields[2], fields[3])
|
||||
transferedBytes, err := conversions.StringBitRateToInt(transferedStr)
|
||||
if err != nil {
|
||||
log.Printf("failed to convert units: %s\n", err)
|
||||
case <- time.After(1 * time.Second):
|
||||
if !r.running {
|
||||
return
|
||||
}
|
||||
transferedBytes = transferedBytes / 8
|
||||
rateStr := fmt.Sprintf("%s%s", fields[4], fields[5])
|
||||
rate, err := conversions.StringBitRateToInt(rateStr)
|
||||
if err != nil {
|
||||
log.Printf("failed to convert units: %s\n", err)
|
||||
}
|
||||
retrans, err := strconv.Atoi(fields[6])
|
||||
if err != nil {
|
||||
log.Printf("failed to convert units: %s\n", err)
|
||||
}
|
||||
cwndStr := fmt.Sprintf("%s%s", fields[7], fields[8])
|
||||
cwnd, err := conversions.StringBitRateToInt(cwndStr)
|
||||
if err != nil {
|
||||
log.Printf("failed to convert units: %s\n", err)
|
||||
}
|
||||
cwnd = cwnd / 8
|
||||
omitted := false
|
||||
if len(fields) >= 10 && fields[9] == "(omitted)" {
|
||||
omitted = true
|
||||
}
|
||||
report := &StreamIntervalReport{
|
||||
Socket: stream,
|
||||
StartInterval: float32(start),
|
||||
EndInterval: float32(end),
|
||||
Seconds: float32(end - start),
|
||||
Bytes: int(transferedBytes),
|
||||
BitsPerSecond: float64(rate),
|
||||
Retransmissions: retrans,
|
||||
CongestionWindow: int(cwnd),
|
||||
Omitted: omitted,
|
||||
}
|
||||
r.ReportingChannel <- report
|
||||
}
|
||||
}
|
||||
|
||||
if !r.running {
|
||||
fmt.Println("reporter is finished. exiting")
|
||||
close(r.ReportingChannel)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,8 +10,8 @@ func main() {
|
||||
|
||||
includeServer := true
|
||||
proto := "tcp"
|
||||
runTime := 30
|
||||
omitSec := 10
|
||||
runTime := 10
|
||||
omitSec := 0
|
||||
length := "65500"
|
||||
|
||||
c := iperf.NewClient("10.254.100.100")
|
||||
@@ -22,7 +22,6 @@ func main() {
|
||||
c.SetLength(length)
|
||||
c.SetJSON(false)
|
||||
c.SetIncludeServer(false)
|
||||
c.SetTimeSec(20)
|
||||
c.SetStreams(2)
|
||||
reports := c.SetModeLive()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user