Files
the-way-to-go_ZH_CN/eBook/examples/chapter_11/tideland-cgl.googlecode.com/hg/cglmon.go

570 lines
15 KiB
Go

/*
Tideland Common Go Library - Monitoring
Copyright (C) 2009-2011 Frank Mueller / Oldenburg / Germany
Redistribution and use in source and binary forms, with or
modification, are permitted provided that the following conditions are
met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
Neither the name of Tideland nor the names of its contributors may be
used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.
*/
package cgl
//--------------------
// IMPORTS
//--------------------
import (
"fmt"
"io"
"log"
"os"
"time"
)
//--------------------
// GLOBAL VARIABLES
//--------------------
var monitor *SystemMonitor
//--------------------
// CONSTANTS
//--------------------
const (
etmTLine = "+------------------------------------------+-----------+-----------+-----------+-----------+---------------+-----------+\n"
etmHeader = "| Name | Count | Min Time | Max Time | Avg Time | Total Time | Op/Sec |\n"
etmFormat = "| %-40s | %9d | %9.3f | %9.3f | %9.3f | %13.3f | %9d |\n"
etmFooter = "| All times in milliseconds. |\n"
etmELine = "+----------------------------------------------------------------------------------------------------------------------+\n"
ssiTLine = "+------------------------------------------+-----------+---------------+---------------+---------------+---------------+\n"
ssiHeader = "| Name | Count | Act Value | Min Value | Max Value | Avg Value |\n"
ssiFormat = "| %-40s | %9d | %13d | %13d | %13d | %13d |\n"
dsrTLine = "+------------------------------------------+---------------------------------------------------------------------------+\n"
dsrHeader = "| Name | Value |\n"
dsrFormat = "| %-40s | %-73s |\n"
)
const (
cmdMeasuringPointsMap = iota
cmdMeasuringPointsDo
cmdStaySetVariablesMap
cmdStaySetVariablesDo
cmdDynamicStatusRetrieversMap
cmdDynamicStatusRetrieversDo
)
//--------------------
// MONITORING
//--------------------
// Command encapsulated the data for any command.
type command struct {
opCode int
args interface{}
respChan chan interface{}
}
// The system monitor type.
type SystemMonitor struct {
etmData map[string]*MeasuringPoint
ssiData map[string]*StaySetVariable
dsrData map[string]DynamicStatusRetriever
measuringChan chan *Measuring
valueChan chan *value
retrieverRegistrationChan chan *retrieverRegistration
commandChan chan *command
}
// Monitor returns the system monitor if it exists.
// Otherwise it creates it first.
func Monitor() *SystemMonitor {
if monitor == nil {
// Create system monitor.
monitor = &SystemMonitor{
etmData: make(map[string]*MeasuringPoint),
ssiData: make(map[string]*StaySetVariable),
dsrData: make(map[string]DynamicStatusRetriever),
measuringChan: make(chan *Measuring, 1000),
valueChan: make(chan *value, 1000),
retrieverRegistrationChan: make(chan *retrieverRegistration, 10),
commandChan: make(chan *command),
}
go monitor.backend()
}
return monitor
}
// BeginMeasuring starts a new measuring with a given id.
// All measurings with the same id will be aggregated.
func (sm *SystemMonitor) BeginMeasuring(id string) *Measuring {
return &Measuring{sm, id, time.Nanoseconds(), 0}
}
// Measure the execution of a function.
func (sm *SystemMonitor) Measure(id string, f func()) {
m := sm.BeginMeasuring(id)
f()
m.EndMeasuring()
}
// MeasuringPointsMap performs the function f for all measuring points
// and returns a slice with the return values of the function that are
// not nil.
func (sm *SystemMonitor) MeasuringPointsMap(f func(*MeasuringPoint) interface{}) []interface{} {
cmd := &command{cmdMeasuringPointsMap, f, make(chan interface{})}
sm.commandChan <- cmd
resp := <-cmd.respChan
return resp.([]interface{})
}
// MeasuringPointsDo performs the function f for
// all measuring points.
func (sm *SystemMonitor) MeasuringPointsDo(f func(*MeasuringPoint)) {
cmd := &command{cmdMeasuringPointsDo, f, nil}
sm.commandChan <- cmd
}
// MeasuringPointsWrite prints the measuring points for which
// the passed function returns true to the passed writer.
func (sm *SystemMonitor) MeasuringPointsWrite(w io.Writer, ff func(*MeasuringPoint) bool) {
pf := func(t int64) float64 { return float64(t) / 1000000.0 }
fmt.Fprint(w, etmTLine)
fmt.Fprint(w, etmHeader)
fmt.Fprint(w, etmTLine)
lines := sm.MeasuringPointsMap(func(mp *MeasuringPoint) interface{} {
if ff(mp) {
ops := 1e9 / mp.AvgTime
return fmt.Sprintf(etmFormat, mp.Id, mp.Count, pf(mp.MinTime), pf(mp.MaxTime), pf(mp.AvgTime), pf(mp.TtlTime), ops)
}
return nil
})
for _, line := range lines {
fmt.Fprint(w, line)
}
fmt.Fprint(w, etmTLine)
fmt.Fprint(w, etmFooter)
fmt.Fprint(w, etmELine)
}
// MeasuringPointsPrintAll prints all measuring points
// to STDOUT.
func (sm *SystemMonitor) MeasuringPointsPrintAll() {
sm.MeasuringPointsWrite(os.Stdout, func(mp *MeasuringPoint) bool { return true })
}
// SetValue sets a value of a stay-set variable.
func (sm *SystemMonitor) SetValue(id string, v int64) {
sm.valueChan <- &value{id, v}
}
// StaySetVariablesMap performs the function f for all variables
// and returns a slice with the return values of the function that are
// not nil.
func (sm *SystemMonitor) StaySetVariablesMap(f func(*StaySetVariable) interface{}) []interface{} {
cmd := &command{cmdStaySetVariablesMap, f, make(chan interface{})}
sm.commandChan <- cmd
resp := <-cmd.respChan
return resp.([]interface{})
}
// StaySetVariablesDo performs the function f for all
// variables.
func (sm *SystemMonitor) StaySetVariablesDo(f func(*StaySetVariable)) {
cmd := &command{cmdStaySetVariablesDo, f, nil}
sm.commandChan <- cmd
}
// StaySetVariablesWrite prints the stay-set variables for which
// the passed function returns true to the passed writer.
func (sm *SystemMonitor) StaySetVariablesWrite(w io.Writer, ff func(*StaySetVariable) bool) {
fmt.Fprint(w, ssiTLine)
fmt.Fprint(w, ssiHeader)
fmt.Fprint(w, ssiTLine)
lines := sm.StaySetVariablesMap(func(ssv *StaySetVariable) interface{} {
if ff(ssv) {
return fmt.Sprintf(ssiFormat, ssv.Id, ssv.Count, ssv.ActValue, ssv.MinValue, ssv.MaxValue, ssv.AvgValue)
}
return nil
})
for _, line := range lines {
fmt.Fprint(w, line)
}
fmt.Fprint(w, ssiTLine)
}
// StaySetVariablesPrintAll prints all stay-set variables
// to STDOUT.
func (sm *SystemMonitor) StaySetVariablesPrintAll() {
sm.StaySetVariablesWrite(os.Stdout, func(ssv *StaySetVariable) bool { return true })
}
// Register registers a new dynamic status retriever function.
func (sm *SystemMonitor) Register(id string, rf DynamicStatusRetriever) {
sm.retrieverRegistrationChan <- &retrieverRegistration{id, rf}
}
// Unregister unregisters a dynamic status retriever function.
func (sm *SystemMonitor) Unregister(id string) {
sm.retrieverRegistrationChan <- &retrieverRegistration{id, nil}
}
// DynamicStatusValuesMap performs the function f for all status values
// and returns a slice with the return values of the function that are
// not nil.
func (sm *SystemMonitor) DynamicStatusValuesMap(f func(string, string) interface{}) []interface{} {
cmd := &command{cmdDynamicStatusRetrieversMap, f, make(chan interface{})}
sm.commandChan <- cmd
resp := <-cmd.respChan
return resp.([]interface{})
}
// DynamicStatusValuesDo performs the function f for all
// status values.
func (sm *SystemMonitor) DynamicStatusValuesDo(f func(string, string)) {
cmd := &command{cmdDynamicStatusRetrieversDo, f, nil}
sm.commandChan <- cmd
}
// DynamicStatusValuesWrite prints the status values for which
// the passed function returns true to the passed writer.
func (sm *SystemMonitor) DynamicStatusValuesWrite(w io.Writer, ff func(string, string) bool) {
fmt.Fprint(w, dsrTLine)
fmt.Fprint(w, dsrHeader)
fmt.Fprint(w, dsrTLine)
lines := sm.DynamicStatusValuesMap(func(id, dsv string) interface{} {
if ff(id, dsv) {
return fmt.Sprintf(dsrFormat, id, dsv)
}
return nil
})
for _, line := range lines {
fmt.Fprint(w, line)
}
fmt.Fprint(w, dsrTLine)
}
// DynamicStatusValuesPrintAll prints all status values to STDOUT.
func (sm *SystemMonitor) DynamicStatusValuesPrintAll() {
sm.DynamicStatusValuesWrite(os.Stdout, func(id, dsv string) bool { return true })
}
// Return the supervisor.
func (sm *SystemMonitor) Supervisor() *Supervisor {
return GlobalSupervisor()
}
// Recover after an error.
func (sm *SystemMonitor) Recover(recoverable Recoverable, err interface{}) {
log.Printf("[cgl] recovering system monitor backend after error '%v'!", err)
go sm.backend()
}
// Backend of the system monitor.
func (sm *SystemMonitor) backend() {
defer func() {
HelpIfNeeded(sm, recover())
}()
for {
select {
case measuring := <-sm.measuringChan:
// Received a new measuring.
if mp, ok := sm.etmData[measuring.id]; ok {
// Measuring point found.
mp.update(measuring)
} else {
// New measuring point.
sm.etmData[measuring.id] = newMeasuringPoint(measuring)
}
case value := <-sm.valueChan:
// Received a new value.
if ssv, ok := sm.ssiData[value.id]; ok {
// Variable found.
ssv.update(value)
} else {
// New stay-set variable.
sm.ssiData[value.id] = newStaySetVariable(value)
}
case registration := <-sm.retrieverRegistrationChan:
// Received a new retriever for registration.
if registration.dsr != nil {
// Register a new retriever.
sm.dsrData[registration.id] = registration.dsr
} else {
// Deregister a retriever.
if dsr, ok := sm.dsrData[registration.id]; ok {
sm.dsrData[registration.id] = dsr, false
}
}
case cmd := <-sm.commandChan:
// Receivedd a command to process.
sm.processCommand(cmd)
}
}
}
// Process a command.
func (sm *SystemMonitor) processCommand(cmd *command) {
switch cmd.opCode {
case cmdMeasuringPointsMap:
// Map the measuring points.
var resp []interface{}
f := cmd.args.(func(*MeasuringPoint) interface{})
for _, mp := range sm.etmData {
v := f(mp)
if v != nil {
resp = append(resp, v)
}
}
cmd.respChan <- resp
case cmdMeasuringPointsDo:
// Iterate over the measurings.
f := cmd.args.(func(*MeasuringPoint))
for _, mp := range sm.etmData {
f(mp)
}
case cmdStaySetVariablesMap:
// Map the stay-set variables.
var resp []interface{}
f := cmd.args.(func(*StaySetVariable) interface{})
for _, ssv := range sm.ssiData {
v := f(ssv)
if v != nil {
resp = append(resp, v)
}
}
cmd.respChan <- resp
case cmdStaySetVariablesDo:
// Iterate over the stay-set variables.
f := cmd.args.(func(*StaySetVariable))
for _, ssv := range sm.ssiData {
f(ssv)
}
case cmdDynamicStatusRetrieversMap:
// Map the return values of the dynamic status
// retriever functions.
var resp []interface{}
f := cmd.args.(func(string, string) interface{})
for id, dsr := range sm.dsrData {
dsv := dsr()
v := f(id, dsv)
if v != nil {
resp = append(resp, v)
}
}
cmd.respChan <- resp
case cmdDynamicStatusRetrieversDo:
// Iterate over the return values of the
// dynamic status retriever functions.
f := cmd.args.(func(string, string))
for id, dsr := range sm.dsrData {
dsv := dsr()
f(id, dsv)
}
}
}
//--------------------
// ADDITIONAL MEASURING TYPES
//--------------------
// Measuring contains one measuring.
type Measuring struct {
systemMonitor *SystemMonitor
id string
startTime int64
endTime int64
}
// EndMEasuring ends a measuring and passes it to the
// measuring server in the background.
func (m *Measuring) EndMeasuring() int64 {
m.endTime = time.Nanoseconds()
m.systemMonitor.measuringChan <- m
return m.endTime - m.startTime
}
// MeasuringPoint contains the cumulated measuring
// data of one measuring point.
type MeasuringPoint struct {
Id string
Count int64
MinTime int64
MaxTime int64
TtlTime int64
AvgTime int64
}
// Create a new measuring point out of a measuring.
func newMeasuringPoint(m *Measuring) *MeasuringPoint {
time := m.endTime - m.startTime
mp := &MeasuringPoint{
Id: m.id,
Count: 1,
MinTime: time,
MaxTime: time,
TtlTime: time,
AvgTime: time,
}
return mp
}
// Update a measuring point with a measuring.
func (mp *MeasuringPoint) update(m *Measuring) {
time := m.endTime - m.startTime
mp.Count++
if mp.MinTime > time {
mp.MinTime = time
}
if mp.MaxTime < time {
mp.MaxTime = time
}
mp.TtlTime += time
mp.AvgTime = mp.TtlTime / mp.Count
}
// New value for a stay-set variable.
type value struct {
id string
value int64
}
// StaySetVariable contains the cumulated values
// for one stay-set variable.
type StaySetVariable struct {
Id string
Count int64
ActValue int64
MinValue int64
MaxValue int64
AvgValue int64
total int64
}
// Create a new stay-set variable out of a value.
func newStaySetVariable(v *value) *StaySetVariable {
ssv := &StaySetVariable{
Id: v.id,
Count: 1,
ActValue: v.value,
MinValue: v.value,
MaxValue: v.value,
AvgValue: v.value,
}
return ssv
}
// Update a stay-set variable with a value.
func (ssv *StaySetVariable) update(v *value) {
ssv.Count++
ssv.ActValue = v.value
ssv.total += v.value
if ssv.MinValue > ssv.ActValue {
ssv.MinValue = ssv.ActValue
}
if ssv.MaxValue < ssv.ActValue {
ssv.MaxValue = ssv.ActValue
}
ssv.AvgValue = ssv.total / ssv.Count
}
// DynamicStatusRetriever is called by the server and
// returns a current status as string.
type DynamicStatusRetriever func() string
// New registration of a retriever function.
type retrieverRegistration struct {
id string
dsr DynamicStatusRetriever
}
/*
EOF
*/