作者:mathp
项目:bosu
// parseTcollectorValue parses a tcollector-style line into a data point.
func parseTcollectorValue(line string) (*opentsdb.DataPoint, error) {
sp := strings.Fields(line)
if len(sp) < 3 {
return nil, fmt.Errorf("bad line: %s", line)
}
ts, err := strconv.ParseInt(sp[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("bad timestamp: %s", sp[1])
}
val, err := strconv.ParseFloat(sp[2], 64)
if err != nil {
return nil, fmt.Errorf("bad value: %s", sp[2])
}
if !opentsdb.ValidTag(sp[0]) {
return nil, fmt.Errorf("bad metric: %s", sp[0])
}
dp := opentsdb.DataPoint{
Metric: sp[0],
Timestamp: ts,
Value: val,
}
tags := opentsdb.TagSet{}
for _, tag := range sp[3:] {
ts, err := opentsdb.ParseTags(tag)
if err != nil {
return nil, fmt.Errorf("bad tag, metric %s: %v: %v", sp[0], tag, err)
}
tags.Merge(ts)
}
setExternalTags(tags)
dp.Tags = tags
return &dp, nil
}
作者:eswd
项目:bosu
func (pm *putMetric) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
bodyReader io.ReadCloser
err error
)
defer r.Body.Close()
if r.Method != "POST" {
w.WriteHeader(405)
return
}
if r.Header.Get("Content-Encoding") == "gzip" {
if bodyReader, err = gzip.NewReader(r.Body); err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("Unable to decompress: %s\n", err)))
return
}
} else {
bodyReader = r.Body
}
if body, err := ioutil.ReadAll(bodyReader); err != nil {
w.WriteHeader(500)
return
} else {
bodyReader.Close()
var (
dp *opentsdb.DataPoint
mdp opentsdb.MultiDataPoint
)
if err := json.Unmarshal(body, &mdp); err == nil {
} else if err = json.Unmarshal(body, &dp); err == nil {
mdp = opentsdb.MultiDataPoint{dp}
} else {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("Unable to decode OpenTSDB json: %s\n", err)))
return
}
for _, dp := range mdp {
dp.Tags = AddTags.Copy().Merge(dp.Tags)
}
pm.localMetrics <- &mdp
w.WriteHeader(204)
}
}
作者:eswd
项目:bosu
func (d *DenormalizationRule) Translate(dp *opentsdb.DataPoint) error {
tagString := "__"
for i, tagName := range d.TagNames {
val, ok := dp.Tags[tagName]
if !ok {
return fmt.Errorf("tag %s not present in data point for %s.", tagName, dp.Metric)
}
if i > 0 {
tagString += "."
}
tagString += val
}
dp.Metric = tagString + "." + dp.Metric
return nil
}
作者:mathp
项目:bosu
func (c *ProgramCollector) runProgram(dpchan chan<- *opentsdb.DataPoint) (progError error) {
cmd := exec.Command(c.Path)
setupExternalCommand(cmd)
pr, pw := io.Pipe()
s := bufio.NewScanner(pr)
cmd.Stdout = pw
er, ew := io.Pipe()
cmd.Stderr = ew
if err := cmd.Start(); err != nil {
return err
}
go func() {
progError = cmd.Wait()
pw.Close()
ew.Close()
}()
go func() {
es := bufio.NewScanner(er)
for es.Scan() {
line := strings.TrimSpace(es.Text())
slog.Error(line)
}
}()
for s.Scan() {
var errs []error
t := strings.TrimSpace(s.Text())
if len(t) == 0 {
continue
}
if dp, err := parseTcollectorValue(t); err == nil {
dpchan <- dp
continue
} else {
errs = append(errs, fmt.Errorf("tcollector: %v", err))
}
var dp opentsdb.DataPoint
if err := json.Unmarshal([]byte(t), &dp); err != nil {
errs = append(errs, fmt.Errorf("opentsdb.DataPoint: %v", err))
} else if dp.Valid() {
if dp.Tags == nil {
dp.Tags = opentsdb.TagSet{}
}
setExternalTags(dp.Tags)
c.ApplyTags(dp.Tags)
dpchan <- &dp
continue
} else {
errs = append(errs, fmt.Errorf("opentsdb.DataPoint: invalid data"))
}
var m metadata.Metasend
if err := json.Unmarshal([]byte(t), &m); err != nil {
errs = append(errs, fmt.Errorf("metadata.Metasend: %v", err))
} else {
if m.Tags == nil {
m.Tags = opentsdb.TagSet{}
}
setExternalTags(m.Tags)
if m.Value == "" || m.Name == "" || (m.Metric == "" && len(m.Tags) == 0) {
errs = append(errs, fmt.Errorf("metadata.Metasend: invalid data"))
} else {
metadata.AddMeta(m.Metric, m.Tags, m.Name, m.Value, false)
continue
}
}
slog.Errorf("%s: unparseable line: %s", c.Path, t)
for _, e := range errs {
slog.Error(e)
}
}
if err := s.Err(); err != nil {
return err
}
return
}