Commit 266c365a authored by Adphi's avatar Adphi

added logger to pinger, fix stats reset

parent bef08bdd
......@@ -38,6 +38,8 @@ type Pinger interface {
// Statistics returns the a map address ping Statistics
Statistics() map[string]Statistics
SetLogger(l logrus.FieldLogger)
// Close closes the connection. It should be call deferred right after the creation of the pinger
Close()
}
......@@ -59,6 +61,8 @@ type _pinger struct {
smu sync.RWMutex
done chan bool
logger logrus.FieldLogger
}
//NewPinger create a new Pinger with given addresses
......@@ -91,6 +95,7 @@ func newPinger(ctx context.Context) (*_pinger, error) {
instance.SetPayloadSize(uint16(defaultOptions.payloadSize))
}
p.pinger = instance
p.logger = logrus.New()
return p, nil
}
......@@ -99,6 +104,10 @@ func (*_pinger) Privileged() bool {
return true
}
func (p *_pinger) SetLogger(l logrus.FieldLogger) {
p.logger = l
}
func (p *_pinger) Addresses() []string {
p.dmu.Lock()
var as []string
......@@ -140,7 +149,7 @@ func (p *_pinger) AddAddress(a string) error {
results: make([]time.Duration, p.opts.statBufferSize),
},
}
p.logger.Debugf("Added address: %s", a)
p.dsts[a] = &dst
return nil
}
......@@ -154,26 +163,33 @@ func (p *_pinger) RemoveAddress(a string) error {
delete(p.dsts, a)
p.smu.Lock()
delete(p.stats, a)
p.logger.Debugf("Removed address %s (%v)", a, p.stats[a])
p.smu.Unlock()
return nil
}
func (p *_pinger) Run() {
if p.IsRunning() {
return
}
p.rmu.Lock()
p.running = true
p.rmu.Unlock()
p.done = make(chan bool)
p.logger.Debug("Starting pinger")
p.ping()
t := time.NewTicker(p.opts.interval)
defer t.Stop()
for {
select {
case <-t.C:
p.logger.Debug("Pinging")
p.ping()
case <-p.ctx.Done():
logrus.Debug(p.ctx.Err())
p.logger.Debug(p.ctx.Err())
p.Stop()
case <-p.done:
logrus.Debug("received stop signal")
p.logger.Debug("received stop signal")
return
}
}
......@@ -181,12 +197,14 @@ func (p *_pinger) Run() {
func (p *_pinger) ping() {
p.dmu.RLock()
logrus.Debugf("destinations' count: %d", len(p.dsts))
p.logger.Debugf("destinations' count: %d", len(p.dsts))
p.logger.Debugf("destinations: %v", p.dsts)
for a := range p.dsts {
go func(d *destination, addr string) {
logrus.Debugf("pinging %s", addr)
p.logger.Debugf("pinging %s", addr)
d.ping(p.pinger, p.opts.timeout)
s := d.compute()
p.logger.Debugf("%s : %v", addr, s)
s.Addr = addr
s.IPAddr = *d.remote
p.smu.Lock()
......@@ -208,6 +226,15 @@ func (p *_pinger) Stop() {
}
close(p.done)
p.running = false
p.smu.Lock()
p.stats = make(map[string]Statistics)
p.smu.Unlock()
p.dmu.Lock()
for k := range p.dsts {
p.dsts[k].history = &history{}
}
p.dmu.Unlock()
p.logger.Debug("Stopped")
}
func (p *_pinger) IsRunning() bool {
......@@ -217,24 +244,27 @@ func (p *_pinger) IsRunning() bool {
}
func (p *_pinger) Statistics() map[string]Statistics {
p.logger.Debug("Collecting stats")
p.smu.RLock()
defer p.smu.RUnlock()
p.dmu.Lock()
defer p.dmu.Unlock()
// Empty Copy
out := make(map[string]Statistics)
// Filter removed addresses
for k := range p.stats {
// Copy to output map
if _, ok := p.dsts[k]; !ok {
p.logger.Debugf("%s has been removed. deleting stats", k)
delete(p.stats, k)
} else {
// Copy to output map
s := p.stats[k]
p.logger.Debugf("Got stats for %s : rtts: %v", s.Addr, s.Rtts)
s.Rtts = make([]time.Duration, 10)
copy(s.Rtts, p.stats[k].Rtts)
out[k] = s
}
}
p.dmu.Unlock()
defer p.smu.RUnlock()
return out
}
......
......@@ -7,6 +7,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRunPingerNoAddress(t *testing.T) {
......@@ -16,46 +17,38 @@ func TestRunPingerNoAddress(t *testing.T) {
cancel()
}()
p, err := NewPinger(ctx)
assertNoErr(t, err)
require.NoError(t, err)
p.Run()
}
func TestRunPingerWrongAddress(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(time.Second)
cancel()
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
p, err := NewPinger(ctx, "some.wrong.address", "localhost")
assertErr(t, err)
assert.Error(t, err)
assert.Nil(t, p)
}
func TestPingerContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(500 * time.Millisecond)
cancel()
}()
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
p, err := newPinger(ctx)
assertNoErr(t, err)
require.NoError(t, err)
go p.Run()
time.Sleep(time.Second)
assert.False(t, p.IsRunning())
}
func TestPingerLocalhost(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(5 * time.Second)
cancel()
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ip := "127.0.0.1"
p, err := NewPinger(ctx)
assertNoErr(t, err)
require.NoError(t, err)
go p.Run()
err = p.AddAddress(ip)
assertNoErr(t, err)
require.NoError(t, err)
c := time.NewTicker(time.Second)
count := 0
for {
......@@ -77,17 +70,15 @@ func TestPingerLocalhost(t *testing.T) {
}
func TestPingerTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(5 * time.Second)
cancel()
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ip := "255.0.0.255"
p, err := NewPinger(ctx)
assertNoErr(t, err)
require.NoError(t, err)
go p.Run()
err = p.AddAddress(ip)
assertNoErr(t, err)
require.NoError(t, err)
c := time.NewTicker(time.Second)
count := 0
for {
......@@ -108,6 +99,94 @@ func TestPingerTimeout(t *testing.T) {
}
}
func TestPingerReset(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 11*time.Second)
defer cancel()
ip1 := "255.0.0.255"
ip2 := "127.0.0.1"
p, err := NewPinger(ctx, ip1, ip2)
require.NoError(t, err)
l := logrus.New()
l.SetLevel(logrus.DebugLevel)
l.WithField("lib", "Pinger")
p.SetLogger(l)
go p.Run()
to := time.After(5 * time.Second)
c := time.NewTicker(time.Second)
resetting := false
resetDone := false
count := 0
for {
select {
case <-c.C:
count++
if !p.IsRunning() || resetting {
logrus.Debug("Pinger not running or resetting")
continue
}
if !resetting && count >= 5 && !resetDone {
logrus.Debug("Re-adding ips")
err = p.AddAddress(ip1)
require.NoError(t, err)
err = p.AddAddress(ip2)
require.NoError(t, err)
resetDone = true
continue
}
if resetDone {
s := p.Statistics()
logrus.Debug(s)
l, ok := s[ip1]
assert.True(t, ok)
logrus.Debug(l)
assert.Equal(t, 0, l.PacketsRecv)
rtts := filterZeros(l.Rtts)
assert.Equal(t, 0, len(rtts))
l, ok = s[ip2]
assert.True(t, ok)
logrus.Debug(l)
assert.InDelta(t, count-5, l.PacketsRecv, 1)
rtts = filterZeros(l.Rtts)
assert.InDelta(t, l.PacketsRecv, len(rtts), 1)
continue
}
if len(p.Addresses()) < 2 {
continue
}
s := p.Statistics()
logrus.Debug(s)
l, ok := s[ip1]
assert.True(t, ok)
logrus.Debug(l)
assert.Equal(t, 0, l.PacketsRecv)
rtts := filterZeros(l.Rtts)
assert.Equal(t, 0, len(rtts))
l, ok = s[ip2]
assert.True(t, ok)
logrus.Debug(l)
assert.InDelta(t, count, l.PacketsRecv, 1)
rtts = filterZeros(l.Rtts)
assert.InDelta(t, l.PacketsRecv, len(rtts), 1)
case <-to:
logrus.Debug("Resetting Pinger")
resetting = true
p.Stop()
err = p.RemoveAddress(ip1)
require.NoError(t, err)
err = p.RemoveAddress(ip2)
require.NoError(t, err)
go p.Run()
resetting = false
case <-ctx.Done():
return
}
}
}
func TestTwoIPs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
......@@ -117,12 +196,12 @@ func TestTwoIPs(t *testing.T) {
ip1 := "255.0.0.255"
ip2 := "127.0.0.1"
p, err := NewPinger(ctx)
assertNoErr(t, err)
require.NoError(t, err)
go p.Run()
err = p.AddAddress(ip1)
assertNoErr(t, err)
require.NoError(t, err)
err = p.AddAddress(ip2)
assertNoErr(t, err)
require.NoError(t, err)
c := time.NewTicker(time.Second)
count := 0
for {
......@@ -131,6 +210,7 @@ func TestTwoIPs(t *testing.T) {
count++
s := p.Statistics()
assert.NotEmpty(t, s)
logrus.Debug(s)
l, ok := s[ip1]
assert.True(t, ok)
......@@ -138,7 +218,6 @@ func TestTwoIPs(t *testing.T) {
assert.Equal(t, 0, l.PacketsRecv)
rtts := filterZeros(l.Rtts)
assert.Equal(t, 0, len(rtts))
l, ok = s[ip2]
assert.True(t, ok)
logrus.Debug(l)
......@@ -151,45 +230,14 @@ func TestTwoIPs(t *testing.T) {
}
}
func assertNoErr(t *testing.T, err error) {
assert.NoError(t, err)
if err != nil {
t.FailNow()
}
}
func assertErr(t *testing.T, err error) {
assert.Error(t, err)
if err == nil {
t.FailNow()
}
}
func filterZeros(s []time.Duration) []time.Duration {
var out []time.Duration
for _, d := range s {
if d != time.Duration(0) {
out = append(out, d)
}
}
return out
}
func init() {
logrus.SetLevel(logrus.DebugLevel)
}
func TestPinger(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
p, err := NewPinger(ctx, "127.0.0.1", "255.0.0.255", "127.0.0.2")
assert.NoError(t, err)
if err != nil {
t.FailNow()
}
go func() {
time.Sleep(10 * time.Second)
cancel()
}()
go func() {
time.Sleep(5 * time.Second)
......@@ -257,17 +305,13 @@ func TestPinger(t *testing.T) {
}
func TestPingerStatistics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
p, err := NewPinger(ctx, "127.0.0.1")
assert.NoError(t, err)
if err != nil {
t.FailNow()
}
go func() {
time.Sleep(5 * time.Second)
cancel()
}()
tk := time.NewTicker(time.Second)
defer tk.Stop()
count := 0
......@@ -292,15 +336,16 @@ func TestPingerStatistics(t *testing.T) {
}
}
func filterZeros(s []time.Duration) []time.Duration {
var out []time.Duration
for _, d := range s {
if d != time.Duration(0) {
out = append(out, d)
}
}
return out
}
func init() {
logrus.SetLevel(logrus.DebugLevel)
}
//func TestMaxint(t *testing.T) {
// c := math.MaxInt64
// var i int64 = 0
// for ; i < 10; i++ {
// c++
// logrus.Debug(c)
// }
//}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment