~gabe/mortar unlisted

1aebc2313b16b79307c5a99583562cdce4c57d88 — Gabe Fierro 1 year, 7 months ago e83282e
pending changes to mortar
3 files changed, 58 insertions(+), 136 deletions(-)

M stages/brick.go
M stages/stage.go
M stages/timeseries.go
M stages/brick.go => stages/brick.go +14 -4
@@ 27,7 27,7 @@ type BrickQueryStage struct {
	ctx      context.Context
	output   chan Context

	db            *hod.Log
	db            *hod.HodDB
	highwatermark int64

	sync.Mutex


@@ 55,7 55,7 @@ func NewBrickQueryStage(cfg *BrickQueryStageConfig) (*BrickQueryStage, error) {
	if err != nil {
		return nil, err
	}
	stage.db, err = hod.NewLog(hodcfg)
	stage.db, err = hod.MakeHodDB(hodcfg)
	if err != nil {
		return nil, err
	}


@@ 161,10 161,13 @@ func (stage *BrickQueryStage) processQualify(ctx Context) error {
	version_response, err := stage.db.Versions(ctx.ctx, version_query)
	if err != nil {
		ctx.addError(err)
		log.Error(err)
		return err
	}
	if version_response.Error != "" {
		ctx.addError(errors.New(version_response.Error))
		err = errors.New(version_response.Error)
		log.Error(err)
		ctx.addError(err)
		return err
	}



@@ 173,9 176,10 @@ func (stage *BrickQueryStage) processQualify(ctx Context) error {
	}

	for _, querystring := range ctx.qualify_request.Required {
		query, err := stage.db.ParseQuery(querystring, stage.highwatermark)
		query, err := stage.db.ParseQuery(querystring, 0)
		if err != nil {
			ctx.addError(err)
			log.Error(err)
			return err
		}



@@ 183,6 187,7 @@ func (stage *BrickQueryStage) processQualify(ctx Context) error {
			query.Graphs = []string{site}
			res, err := stage.db.Select(ctx.ctx, query)
			if err != nil {
				log.Error(err)
				ctx.addError(err)
				//return err
			} else if len(res.Rows) == 0 {


@@ 206,6 211,7 @@ func (stage *BrickQueryStage) processQuery2(ctx Context) error {
	// store view name -> list of indexes to dependent dataFrames
	var viewDataFrames = make(map[string][]int)
	for idx, dataFrame := range ctx.request.DataFrames {
		idx := idx
	tsLoop:
		for _, timeseries := range dataFrame.Timeseries {
			viewDataVars[timeseries.View] = append(viewDataVars[timeseries.View], timeseries.DataVars...)


@@ 221,6 227,9 @@ func (stage *BrickQueryStage) processQuery2(ctx Context) error {
		}
	}

	log.Info("DataVars: ", viewDataVars)
	log.Info("DataFrames: ", viewDataFrames)

	for _, view := range ctx.request.Views {
		query, err := stage.db.ParseQuery(view.Definition, stage.highwatermark)
		if err != nil {


@@ 233,6 242,7 @@ func (stage *BrickQueryStage) processQuery2(ctx Context) error {
		// property is how to relate the points to the timeseries database. However, it also introduces the complexity
		// of dealing with whether or not the variables *do* have associated timeseries or not.
		mapping, _ := rewriteQuery(viewDataVars[view.Name], query)
		log.Warning("rewrote: ", query)
		for _, sitename := range ctx.request.Sites {
			query.Graphs = []string{sitename}
			res, err := stage.db.Select(ctx.ctx, query)

M stages/stage.go => stages/stage.go +13 -0
@@ 24,9 24,19 @@ type Context struct {
	sync.Mutex
}

func (ctx *Context) isDone() bool {
	select {
	case <-ctx.ctx.Done():
		return true
	default:
		return false
	}
}

func (ctx *Context) addError(err error) {
	ctx.Lock()
	defer ctx.Unlock()
	log.Error("Context Error", err)
	ctx.errors = append(ctx.errors, err)
}



@@ 57,6 67,9 @@ func Showtime(queue chan Context) {
	go func() {
		log.Println("get output")
		for out := range queue {
			if out.isDone() {
				continue
			}
			if out.response == nil {
				if out.done != nil {
					close(out.done)

M stages/timeseries.go => stages/timeseries.go +31 -132
@@ 57,16 57,20 @@ func NewTimeseriesQueryStage(cfg *TimeseriesStageConfig) (*TimeseriesQueryStage,
			for {
				select {
				case ctx := <-input:

					if ctx.isDone() {
						ctx.response = nil
						stage.output <- ctx
						continue
					}

					if len(ctx.request.Sites) > 0 && len(ctx.request.DataFrames) > 0 {
						if err := stage.processQuery2(ctx); err != nil {
						if err := stage.processQuery(ctx); err != nil {
							log.Println(err)
						}
					} else if len(ctx.request.Sites) > 0 && len(ctx.request.Streams) > 0 {
						ctx.addError(errors.New("Need to upgrade to pymortar>=0.3.2"))
						log.Error("Old client")
						//if err := stage.processQuery(ctx); err != nil {
						//	log.Println(err)
						//}
					}
					ctx.response = nil
					stage.output <- ctx


@@ 173,126 177,7 @@ func (stage *TimeseriesQueryStage) processQuery(ctx Context) error {
		return err
	}

	//ctx.request.TimeParams.window
	//qctx, cancel := context.WithTimeout(ctx.ctx, MAX_TIMEOUT)

	// loop over all streams, and then over all UUIDs
	for _, reqstream := range ctx.request.Streams {
		for _, uuStr := range reqstream.Uuids {
			uu := uuid.Parse(uuStr)
			if uu == nil {
				continue
			}
			stream, err := stage.getStream(ctx.ctx, uu)
			if err != nil {
				ctx.addError(err)
				return err
			}

			// handle RAW streams
			if reqstream.Aggregation == mortarpb.AggFunc_AGG_FUNC_RAW {
				// if raw data...
				rawpoints, generations, errchan := stream.RawValues(ctx.ctx, start_time.UnixNano(), end_time.UnixNano(), 0)
				resp := &mortarpb.FetchResponse{}
				var pcount = 0
				resp.Times = getTimeBuffer()
				resp.Values = getValueBuffer()
				for p := range rawpoints {
					resp.Times[pcount] = p.Time
					resp.Values[pcount] = p.Value
					pcount += 1
					if pcount == TS_BATCH_SIZE {
						resp.Variable = reqstream.Name
						resp.Identifier = uuStr
						ctx.response = resp
						stage.output <- ctx
						resp = &mortarpb.FetchResponse{}
						pcount = 0
					}
				}
				if len(resp.Times) > 0 {
					resp.Variable = reqstream.Name
					resp.Identifier = uuStr
					resp.Times = resp.Times[:pcount]
					resp.Values = resp.Values[:pcount]
					ctx.response = resp
					stage.output <- ctx
				}

				<-generations
				if err := <-errchan; err != nil {
					ctx.addError(err)
					return err
				}
			} else {
				windowSize, err := ParseDuration(ctx.request.Time.Window)
				if err != nil {
					ctx.addError(err)
					return err
				}
				windowDepth := math.Log2(float64(windowSize))
				suggestedAccuracy := uint8(math.Max(windowDepth-5, 30))

				statpoints, generations, errchan := stream.Windows(ctx.ctx, start_time.UnixNano(), end_time.UnixNano(), uint64(windowSize.Nanoseconds()), suggestedAccuracy, 0)

				resp := &mortarpb.FetchResponse{}
				var pcount = 0

				resp.Times = getTimeBuffer()
				resp.Values = getValueBuffer()

				for p := range statpoints {
					resp.Times[pcount] = p.Time
					resp.Values[pcount] = valueFromAggFunc(p, reqstream.Aggregation)
					pcount += 1

					if pcount == TS_BATCH_SIZE {
						resp.Variable = reqstream.Name
						resp.Identifier = uuStr
						ctx.response = resp
						stage.output <- ctx
						resp = &mortarpb.FetchResponse{}
						pcount = 0
					}
				}
				if len(resp.Times) > 0 {
					resp.Variable = reqstream.Name
					resp.Identifier = uuStr
					resp.Times = resp.Times[:pcount]
					resp.Values = resp.Values[:pcount]
					ctx.response = resp
					stage.output <- ctx
				}

				<-generations
				if err := <-errchan; err != nil {
					ctx.addError(err)
					return err
				}

			}

		}
	}

	return nil
}

func (stage *TimeseriesQueryStage) processQuery2(ctx Context) error {
	//	defer ctx.finish()
	// parse timestamps for the query
	start_time, err := time.Parse(time.RFC3339, ctx.request.Time.Start)
	if err != nil {
		err = errors.Wrapf(err, "Could not parse Start time (%s)", ctx.request.Time.Start)
		ctx.addError(err)
		return err
	}
	end_time, err := time.Parse(time.RFC3339, ctx.request.Time.End)
	if err != nil {
		err = errors.Wrapf(err, "Could not parse End time (%s)", ctx.request.Time.End)
		ctx.addError(err)
		return err
	}
	log.Debug("Fetch data in [", start_time, " - ", end_time, "]")

	//ctx.request.TimeParams.window
	//qctx, cancel := context.WithTimeout(ctx.ctx, MAX_TIMEOUT)


@@ 321,11 206,18 @@ func (stage *TimeseriesQueryStage) processQuery2(ctx Context) error {
					pcount += 1
					resp.Times = append(resp.Times, p.Time)
					resp.Values = append(resp.Values, p.Value)
					if p.Time > end_time.UnixNano() {
						//TODO: fix this
						continue
						//log.Warning("TIME start ", start_time.UnixNano(), " until ", end_time.UnixNano(), " but got ", p.Time)
					}
					if pcount == TS_BATCH_SIZE {
						resp.DataFrame = dataFrame.Name
						resp.Identifier = uuStr
						ctx.response = resp
						stage.output <- ctx
						if !ctx.isDone() {
							ctx.response = resp
							stage.output <- ctx
						}
						resp = &mortarpb.FetchResponse{}
						pcount = 0
					}


@@ 333,13 225,16 @@ func (stage *TimeseriesQueryStage) processQuery2(ctx Context) error {
				if len(resp.Times) > 0 {
					resp.DataFrame = dataFrame.Name
					resp.Identifier = uuStr
					ctx.response = resp
					stage.output <- ctx
					if !ctx.isDone() {
						ctx.response = resp
						stage.output <- ctx
					}
				}

				<-generations
				if err := <-errchan; err != nil {
					ctx.addError(err)
					log.Error(errors.Wrap(err, "got error in stream rawvalues"))
					return err
				}
			} else {


@@ 363,8 258,10 @@ func (stage *TimeseriesQueryStage) processQuery2(ctx Context) error {
					if pcount == TS_BATCH_SIZE {
						resp.DataFrame = dataFrame.Name
						resp.Identifier = uuStr
						ctx.response = resp
						stage.output <- ctx
						if !ctx.isDone() {
							ctx.response = resp
							stage.output <- ctx
						}
						resp = &mortarpb.FetchResponse{}
						pcount = 0
					}


@@ 372,8 269,10 @@ func (stage *TimeseriesQueryStage) processQuery2(ctx Context) error {
				if len(resp.Times) > 0 {
					resp.DataFrame = dataFrame.Name
					resp.Identifier = uuStr
					ctx.response = resp
					stage.output <- ctx
					if !ctx.isDone() {
						ctx.response = resp
						stage.output <- ctx
					}
				}

				<-generations