Go的一个worker(goroutine)池,拥有无限的任务队列

Go的一个worker(goroutine)池,拥有无限的任务队列

Go 其它杂项

访问GitHub主页

共95Star

详细介绍

EVA

Build Status goreportcard for pshopper/eva godoc for pshopper/eva MIT Licence

Package EVA implements a fixed goroutine pool for managing and recycling a massive number of goroutines with unlimited task queue, allowing developers to limit the number of goroutines that created by your concurrent programs. For example, when processing jobs from HTTP requests that are CPU heavy you can create a pool with a size that matches your CPU count.

Features:

  • Friendly interfaces: submitting tasks, getting the number. of running goroutines, readjusting capacity of pool dynamically, closing pool.
  • Automatically managing and recycling a massive number of goroutines.
  • Availability of overriding the Task or Callable interface.
  • Unlimited task queue.
  • Recover panic in eva.CustomTask.

Installation

    go get github.com/pshopper/eva

Or, using dep:

    dep ensure -add github.com/pshopper/eva

Usage

Async

Simple

    config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
    p := eva.NewPool(&config)

    t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
        // some work here...


        return result, err
    }, args)

    p.Submit(t)

    p.Wait()

    fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic())

    p.Close()

With completion

    config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
    p := eva.NewPool(&config)

    completion := make(chan eva.Task, 3)

    for i := 0; i < 100; i++ {
        t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
            // some work here...


            return result, err
        }, args)

        p.SubmitWithCompletion(completion, t)
    }


    for i := 0; i < 100; i++ {
        select {
        case t := <-completion:
            fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic())
        }
    }


    p.Close()

With context

    config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
    p := eva.NewPool(&config)

    context, cancel := context.WithCancel(context.Background()) // WithDeadline or WithTimeout

    for i := 0; i < 100; i++ {
        t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
            // some work here...


                return result, err
        }, args)

        p.SubmitWithContext(context, t)
    }

    // some work here...

    cancel()

    p.Close()

With custom

    config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
    p := eva.NewPool(&config)

    context, cancel := context.WithCancel(context.Background()) // WithDeadline or WithTimeout
    completion := make(chan eva.Task, 3)

    for i := 0; i < 100; i++ {
        t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
            // some work here...


                return result, err
        }, args)

        p.SubmitCustom(context, completion, t)
    }

    for i := 0; i < 3; i++ {
        select {
        case t := <-completion:
            fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic())
        }
    }

    cancel()

    p.Close()

Immediate

    config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
    p := eva.NewPool(&config)

    t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
        // some work here...


        return result, err
    }, args)

    err := p.SubmitImmediate(t)

    if err == nil {
        fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic())
    }

    p.Close()

Sync

    config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
    p := eva.NewPool(&config)

    t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
        // some work here...


        return result, err
    }, args)

    p.Submit(t)

    fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic()) // like CompletableFuture

    p.Close()

Resize

You can change pool size:

    pool.SetSize(1000)
    pool.SetSize(1000000)

Or you can change changes unstoppable workers count

    pool.SetUnstoppableWorkers(1000)
    pool.SetUnstoppableWorkers(1000000)

This is safe to perform from any goroutine even if others are still processing.

Interfaces

You can implement Task interface and submit it in pool:

// Task interface for goroutines pool.
type Task interface {
	// Cancel perform cancel task.
	Cancel()

	// Error returns task Run error.
	// Blocked until the task is done.
	Error() error

	// Get returns task Run result.
	// Blocked until the task is done.
	Get() interface{}

	// IsCancelled returns true if this task has been canceled.
	IsCancelled() bool

	// IsDone returns true if this task has been done.
	IsDone() bool

	// Panic returns task Run panic.
	// Blocked until the task is done.
	Panic() interface{}

	// Run perform task.
	Run()
}

Ordering

All the tasks submitted to EVA pool will not be guaranteed to be processed in order, because those tasks distribute among a series of concurrent workers, thus those tasks are processed concurrently.

推荐源码