Golang github.com-araddon-qlbridge-expr.Context类(方法)实例源码

下面列出了Golang github.com-araddon-qlbridge-expr.Context 类(方法)源码代码实例,从而了解它的用法。

作者:kyled    项目:qlbridg   
func (m *Upsert) Run(ctx *expr.Context) error {
	defer ctx.Recover()
	defer close(m.msgOutCh)

	var err error
	var affectedCt int64
	switch {
	case m.insert != nil:
		//u.Debugf("Insert.Run():  %v   %#v", len(m.insert.Rows), m.insert)
		affectedCt, err = m.insertRows(ctx, m.insert.Rows)
	case m.upsert != nil && len(m.upsert.Rows) > 0:
		u.Debugf("Upsert.Run():  %v   %#v", len(m.upsert.Rows), m.upsert)
		affectedCt, err = m.insertRows(ctx, m.upsert.Rows)
	case m.update != nil:
		u.Debugf("Update.Run() %s", m.update.String())
		affectedCt, err = m.updateValues(ctx)
	default:
		u.Warnf("unknown mutation op?  %v", m)
	}
	if err != nil {
		return err
	}
	vals := make([]driver.Value, 2)
	vals[0] = int64(0) // status?
	vals[1] = affectedCt
	m.msgOutCh <- &datasource.SqlDriverMessage{vals, 1}
	return nil
}

作者:kyled    项目:qlbridg   
func (m *Source) Run(context *expr.Context) error {
	defer context.Recover()
	defer close(m.msgOutCh)

	scanner, ok := m.source.(datasource.Scanner)
	if !ok {
		return fmt.Errorf("Does not implement Scanner: %T", m.source)
	}
	//u.Debugf("scanner: %T %v", scanner, scanner)
	iter := scanner.CreateIterator(nil)
	//u.Debugf("iter in source: %T  %#v", iter, iter)
	sigChan := m.SigChan()

	for item := iter.Next(); item != nil; item = iter.Next() {

		//u.Infof("In source Scanner iter %#v", item)
		select {
		case <-sigChan:
			return nil
		case m.msgOutCh <- item:
			// continue
		}

	}
	//u.Debugf("leaving source scanner")
	return nil
}

作者:kyled    项目:qlbridg   
func (m *TaskStepper) Run(ctx *expr.Context) error {
	defer ctx.Recover()     // Our context can recover panics, save error msg
	defer close(m.msgOutCh) // closing output channels is the signal to stop

	//u.Infof("runner: %T inchan", m)
	for {
		select {
		case <-m.sigCh:
			break
		}
	}
	//u.Warnf("end of Runner")
	return nil
}

作者:kyled    项目:qlbridg   
// For ResultWriter, since we are are not paging through messages
//  using this mesage channel, instead using Next() as defined by sql/driver
//  we don't read the input channel, just watch stop channels
func (m *ResultWriter) Run(ctx *expr.Context) error {
	defer ctx.Recover() // Our context can recover panics, save error msg
	defer func() {
		close(m.msgOutCh) // closing output channels is the signal to stop
		//u.Warnf("close taskbase: %v", m.Type())
	}()
	//u.Debugf("start Run() for ResultWriter")
	select {
	case err := <-m.errCh:
		u.Errorf("got error:  %v", err)
		return err
	case <-m.sigCh:
		return nil
	}
	return nil
}

作者:kyled    项目:qlbridg   
func (m *JoinKey) Run(context *expr.Context) error {
	defer context.Recover()
	defer close(m.msgOutCh)

	outCh := m.MessageOut()
	inCh := m.MessageIn()
	joinNodes := m.from.JoinNodes()

	for {

		select {
		case <-m.SigChan():
			//u.Debugf("got signal quit")
			return nil
		case msg, ok := <-inCh:
			if !ok {
				//u.Debugf("NICE, got msg shutdown")
				return nil
			} else {
				//u.Infof("In joinkey msg %#v", msg)
			msgTypeSwitch:
				switch mt := msg.(type) {
				case *datasource.SqlDriverMessageMap:
					vals := make([]string, len(joinNodes))
					for i, node := range joinNodes {
						joinVal, ok := vm.Eval(mt, node)
						//u.Debugf("evaluating: ok?%v T:%T result=%v node '%v'", ok, joinVal, joinVal.ToString(), node.String())
						if !ok {
							u.Errorf("could not evaluate: %T %#v   %v", joinVal, joinVal, msg)
							break msgTypeSwitch
						}
						vals[i] = joinVal.ToString()
					}
					key := strings.Join(vals, string(byte(0)))
					mt.SetKeyHashed(key)
					outCh <- mt
				default:
					return fmt.Errorf("To use JoinKey must use SqlDriverMessageMap but got %T", msg)
				}
			}
		}
	}
	return nil
}

作者:kyled    项目:qlbridg   
func (m *TaskSequential) Run(ctx *expr.Context) error {
	defer ctx.Recover() // Our context can recover panics, save error msg
	defer func() {
		//close(m.msgOutCh) // closing output channels is the signal to stop
		//u.Debugf("close TaskSequential: %v", m.Type())
	}()

	// Either of the SigQuit, or error channel will
	//  cause breaking out of message channels below
	select {
	case err := <-m.errCh:
		u.Errorf("%v", err)
	case <-m.sigCh:
		u.Warnf("got quit channel?")
	default:
	}

	var wg sync.WaitGroup

	// start tasks in reverse order, so that by time
	// source starts up all downstreams have started
	for i := len(m.tasks) - 1; i >= 0; i-- {
		wg.Add(1)
		go func(taskId int) {
			task := m.tasks[taskId]
			//u.Infof("starting task %d-%d %T in:%p  out:%p", m.depth, taskId, task, task.MessageIn(), task.MessageOut())
			if err := task.Run(ctx); err != nil {
				u.Errorf("%T.Run() errored %v", task, err)
				// TODO:  what do we do with this error?   send to error channel?
			}
			//u.Warnf("exiting taskId: %v %T", taskId, m.tasks[taskId])
			wg.Done()
		}(i)
	}

	wg.Wait() // block until all tasks have finished
	//u.Debugf("exit TaskSequential Run()")
	return nil
}

作者:kyled    项目:qlbridg   
func (m *TaskParallel) Run(ctx *expr.Context) error {
	defer ctx.Recover() // Our context can recover panics, save error msg
	defer func() {
		close(m.msgOutCh) // closing output channels is the signal to stop
		//u.Warnf("close TaskParallel: %v", m.Type())
	}()

	// Either of the SigQuit, or error channel will
	//  cause breaking out of message channels below
	select {
	case err := <-m.errCh:
		//m.errors = append(m.errors, err)
		u.Errorf("%v", err)
	case <-m.sigCh:

	default:
	}

	var wg sync.WaitGroup

	// start tasks in reverse order, so that by time
	// source starts up all downstreams have started
	for i := len(m.tasks) - 1; i >= 0; i-- {
		wg.Add(1)
		go func(taskId int) {
			if err := m.tasks[taskId].Run(ctx); err != nil {
				u.Errorf("%T.Run() errored %v", m.tasks[taskId], err)
				// TODO:  what do we do with this error?   send to error channel?
			}
			//u.Warnf("exiting taskId: %v %T", taskId, tasks[taskId])
			wg.Done()
		}(i)
	}

	wg.Wait()

	return nil
}


问题


面经


文章

微信
公众号

扫码关注公众号