作者:jareks
项目:bosu
func timeTSDBRequest(e *State, T miniprofiler.Timer, req *opentsdb.Request) (s opentsdb.ResponseSet, err error) {
e.tsdbQueries = append(e.tsdbQueries, *req)
if e.autods > 0 {
for _, q := range req.Queries {
if q.Downsample == "" {
if err := req.AutoDownsample(e.autods); err != nil {
return nil, err
}
}
}
}
b, _ := json.MarshalIndent(req, "", " ")
tries := 1
for {
T.StepCustomTiming("tsdb", "query", string(b), func() {
getFn := func() (interface{}, error) {
return e.tsdbContext.Query(req)
}
var val interface{}
val, err = e.cache.Get(string(b), getFn)
s = val.(opentsdb.ResponseSet).Copy()
})
if err == nil || tries == tsdbMaxTries {
break
}
slog.Errorf("Error on tsdb query %d: %s", tries, err.Error())
tries++
}
return
}
作者:jareks
项目:bosu
func Query(e *State, T miniprofiler.Timer, query, sduration, eduration string) (r *Results, err error) {
r = new(Results)
q, err := opentsdb.ParseQuery(query, e.tsdbContext.Version())
if q == nil && err != nil {
return
}
if !e.tsdbContext.Version().FilterSupport() {
if err = e.Search.Expand(q); err != nil {
return
}
}
sd, err := opentsdb.ParseDuration(sduration)
if err != nil {
return
}
req := opentsdb.Request{
Queries: []*opentsdb.Query{q},
Start: fmt.Sprintf("%s-ago", sd),
}
if eduration != "" {
var ed opentsdb.Duration
ed, err = opentsdb.ParseDuration(eduration)
if err != nil {
return
}
req.End = fmt.Sprintf("%s-ago", ed)
}
var s opentsdb.ResponseSet
if err = req.SetTime(e.now); err != nil {
return
}
s, err = timeTSDBRequest(e, T, &req)
if err != nil {
return
}
for _, res := range s {
if e.squelched(res.Tags) {
continue
}
values := make(Series)
for k, v := range res.DPS {
i, err := strconv.ParseInt(k, 10, 64)
if err != nil {
return nil, err
}
values[time.Unix(i, 0).UTC()] = float64(v)
}
r.Results = append(r.Results, &Result{
Value: values,
Group: res.Tags,
})
}
return
}
作者:eswd
项目:bosu
func queryForAggregateTags(query *opentsdb.Query) (opentsdb.TagSet, error) {
req := opentsdb.Request{}
req.Queries = []*opentsdb.Query{query}
req.Start = "1h-ago"
resp, err := req.Query(*tsdbHost)
if err != nil {
return nil, err
}
if len(resp) < 1 {
return nil, fmt.Errorf("No points in last hour to learn aggregate tags")
}
tagset := make(opentsdb.TagSet)
for _, t := range resp[0].AggregateTags {
tagset[t] = "*"
}
return tagset, nil
}
作者:pd
项目:bosu
func timeTSDBRequest(e *State, T miniprofiler.Timer, req *opentsdb.Request) (s opentsdb.ResponseSet, err error) {
e.tsdbQueries = append(e.tsdbQueries, *req)
if e.autods > 0 {
if err := req.AutoDownsample(e.autods); err != nil {
return nil, err
}
}
b, _ := json.MarshalIndent(req, "", " ")
T.StepCustomTiming("tsdb", "query", string(b), func() {
getFn := func() (interface{}, error) {
return e.tsdbContext.Query(req)
}
var val interface{}
val, err = e.cache.Get(string(b), getFn)
s = val.(opentsdb.ResponseSet).Copy()
})
return
}
作者:jareks
项目:bosu
func bandTSDB(e *State, T miniprofiler.Timer, query, duration, period string, num float64, rfunc func(*Results, *opentsdb.Response, time.Duration) error) (r *Results, err error) {
r = new(Results)
r.IgnoreOtherUnjoined = true
r.IgnoreUnjoined = true
T.Step("band", func(T miniprofiler.Timer) {
var d, p opentsdb.Duration
d, err = opentsdb.ParseDuration(duration)
if err != nil {
return
}
p, err = opentsdb.ParseDuration(period)
if err != nil {
return
}
if num < 1 || num > 100 {
err = fmt.Errorf("num out of bounds")
}
var q *opentsdb.Query
q, err = opentsdb.ParseQuery(query, e.tsdbContext.Version())
if err != nil {
return
}
if !e.tsdbContext.Version().FilterSupport() {
if err = e.Search.Expand(q); err != nil {
return
}
}
req := opentsdb.Request{
Queries: []*opentsdb.Query{q},
}
now := e.now
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
if err = req.SetTime(e.now); err != nil {
return
}
for i := 0; i < int(num); i++ {
now = now.Add(time.Duration(-p))
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
var s opentsdb.ResponseSet
s, err = timeTSDBRequest(e, T, &req)
if err != nil {
return
}
for _, res := range s {
if e.squelched(res.Tags) {
continue
}
//offset := e.now.Sub(now.Add(time.Duration(p-d)))
offset := e.now.Sub(now)
if err = rfunc(r, res, offset); err != nil {
return
}
}
}
})
return
}
作者:jareks
项目:bosu
func Over(e *State, T miniprofiler.Timer, query, duration, period string, num float64) (r *Results, err error) {
r = new(Results)
r.IgnoreOtherUnjoined = true
r.IgnoreUnjoined = true
T.Step("band", func(T miniprofiler.Timer) {
var d, p opentsdb.Duration
d, err = opentsdb.ParseDuration(duration)
if err != nil {
return
}
p, err = opentsdb.ParseDuration(period)
if err != nil {
return
}
if num < 1 || num > 100 {
err = fmt.Errorf("num out of bounds")
}
var q *opentsdb.Query
q, err = opentsdb.ParseQuery(query, e.tsdbContext.Version())
if err != nil {
return
}
if !e.tsdbContext.Version().FilterSupport() {
if err = e.Search.Expand(q); err != nil {
return
}
}
req := opentsdb.Request{
Queries: []*opentsdb.Query{q},
}
now := e.now
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
for i := 0; i < int(num); i++ {
var s opentsdb.ResponseSet
s, err = timeTSDBRequest(e, T, &req)
if err != nil {
return
}
offset := e.now.Sub(now)
for _, res := range s {
if e.squelched(res.Tags) {
continue
}
values := make(Series)
a := &Result{Group: res.Tags.Merge(opentsdb.TagSet{"shift": offset.String()})}
for k, v := range res.DPS {
i, err := strconv.ParseInt(k, 10, 64)
if err != nil {
return
}
values[time.Unix(i, 0).Add(offset).UTC()] = float64(v)
}
a.Value = values
r.Results = append(r.Results, a)
}
now = now.Add(time.Duration(-p))
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
}
})
return
}
作者:Skyscanne
项目:bosu
func bandRangeTSDB(e *State, T miniprofiler.Timer, query, rangeStart, rangeEnd, period string, num float64, rfunc func(*Results, *opentsdb.Response) error) (r *Results, err error) {
r = new(Results)
r.IgnoreOtherUnjoined = true
r.IgnoreUnjoined = true
T.Step("bandRange", func(T miniprofiler.Timer) {
var from, to, p opentsdb.Duration
from, err = opentsdb.ParseDuration(rangeStart)
if err != nil {
return
}
to, err = opentsdb.ParseDuration(rangeEnd)
if err != nil {
return
}
p, err = opentsdb.ParseDuration(period)
if err != nil {
return
}
if num < 1 || num > 100 {
err = fmt.Errorf("num out of bounds")
}
var q *opentsdb.Query
q, err = opentsdb.ParseQuery(query, e.tsdbContext.Version())
if err != nil {
return
}
if err = e.Search.Expand(q); err != nil {
return
}
req := opentsdb.Request{
Queries: []*opentsdb.Query{q},
}
now := e.now
req.End = now.Unix()
st := now.Add(time.Duration(from)).Unix()
req.Start = st
if err = req.SetTime(e.now); err != nil {
return
}
for i := 0; i < int(num); i++ {
now = now.Add(time.Duration(-p))
end := now.Add(time.Duration(-to)).Unix()
req.End = &end
st := now.Add(time.Duration(-from)).Unix()
req.Start = &st
var s opentsdb.ResponseSet
s, err = timeTSDBRequest(e, T, &req)
if err != nil {
return
}
for _, res := range s {
if e.squelched(res.Tags) {
continue
}
if err = rfunc(r, res); err != nil {
return
}
}
}
})
return
}
作者:reduxd
项目:grafan
func Band(e *State, T miniprofiler.Timer, query, duration, period string, num float64) (r *Results, err error) {
r = new(Results)
r.IgnoreOtherUnjoined = true
r.IgnoreUnjoined = true
T.Step("band", func(T miniprofiler.Timer) {
var d, p opentsdb.Duration
d, err = opentsdb.ParseDuration(duration)
if err != nil {
return
}
p, err = opentsdb.ParseDuration(period)
if err != nil {
return
}
if num < 1 || num > 100 {
err = fmt.Errorf("expr: Band: num out of bounds")
}
q, err := opentsdb.ParseQuery(query)
if q == nil && err != nil {
return
}
if err = e.Search.Expand(q); err != nil {
return
}
req := opentsdb.Request{
Queries: []*opentsdb.Query{q},
}
now := e.now
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
if err = req.SetTime(e.now); err != nil {
return
}
for i := 0; i < int(num); i++ {
now = now.Add(time.Duration(-p))
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
var s opentsdb.ResponseSet
s, err = timeTSDBRequest(e, T, &req)
if err != nil {
return
}
for _, res := range s {
if e.squelched(res.Tags) {
continue
}
newarr := true
for _, a := range r.Results {
if !a.Group.Equal(res.Tags) {
continue
}
newarr = false
values := a.Value.(Series)
for k, v := range res.DPS {
i, e := strconv.ParseInt(k, 10, 64)
if e != nil {
err = e
return
}
values[time.Unix(i, 0).UTC()] = float64(v)
}
}
if newarr {
values := make(Series)
a := &Result{Group: res.Tags}
for k, v := range res.DPS {
i, e := strconv.ParseInt(k, 10, 64)
if e != nil {
err = e
return
}
values[time.Unix(i, 0).UTC()] = float64(v)
}
a.Value = values
r.Results = append(r.Results, a)
}
}
}
})
return
}
作者:eswd
项目:bosu
func main() {
flag.Parse()
if *tsdbHost == "" {
flag.PrintDefaults()
log.Fatal("host must be supplied")
}
putUrl := (&url.URL{Scheme: "http", Host: *tsdbHost, Path: "api/put"}).String()
if *ruleFlag == "" {
flag.PrintDefaults()
log.Fatal("rule must be supplied")
}
rules, err := denormalize.ParseDenormalizationRules(*ruleFlag)
if err != nil {
log.Fatal(err)
}
if len(rules) > 1 {
log.Fatal("Please specify only one rule")
}
var rule *denormalize.DenormalizationRule
var metric string
for k, v := range rules {
metric = k
rule = v
}
query := &opentsdb.Query{Metric: metric, Aggregator: "avg"}
query.Tags, err = queryForAggregateTags(query)
if err != nil {
log.Fatal(err)
}
startDate, err := opentsdb.ParseTime(*start)
if err != nil {
log.Fatal(err)
}
endDate := time.Now().UTC()
if *end != "" {
endDate, err = opentsdb.ParseTime(*end)
if err != nil {
log.Fatal(err)
}
}
backfill := func(batchStart, batchEnd time.Time) (err error) {
startTimeString := batchStart.Format(opentsdb.TSDBTimeFormat)
endTimeString := batchEnd.Format(opentsdb.TSDBTimeFormat)
defer func() {
if err != nil {
log.Fatalf("Error on batch %s - %s. %v \n", startTimeString, endTimeString, err)
}
}()
req := opentsdb.Request{Start: startTimeString, End: endTimeString, Queries: []*opentsdb.Query{query}}
resp, err := req.Query(*tsdbHost)
if err != nil {
return err
}
dps := []*opentsdb.DataPoint{}
for _, r := range resp {
for t, p := range r.DPS {
timeStamp, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return err
}
dp := &opentsdb.DataPoint{
Timestamp: timeStamp,
Metric: r.Metric,
Tags: r.Tags,
Value: p,
}
err = rule.Translate(dp)
if err != nil {
return err
}
dps = append(dps, dp)
}
}
fmt.Printf("%s - %s: %d dps\n", startTimeString, endTimeString, len(dps))
total := 0
for len(dps) > 0 {
count := len(dps)
if len(dps) > *batchSize {
count = *batchSize
}
putResp, err := collect.SendDataPoints(dps[:count], putUrl)
if err != nil {
return err
}
defer putResp.Body.Close()
if putResp.StatusCode != 204 {
return fmt.Errorf("Non 204 status code from opentsdb: %d", putResp.StatusCode)
}
dps = dps[count:]
total += count
}
fmt.Printf("Relayed %d data points.\n", total)
return nil
}
//.........这里部分代码省略.........