Go的一个worker(goroutine)池,拥有无限的任务队列
Go的一个worker(goroutine)池,拥有无限的任务队列
Go 其它杂项
共95Star
详细介绍
EVA
Package EVA implements a fixed goroutine pool for managing and recycling a massive number of goroutines with unlimited task queue, allowing developers to limit the number of goroutines that created by your concurrent programs. For example, when processing jobs from HTTP requests that are CPU heavy you can create a pool with a size that matches your CPU count.
Features:
- Friendly interfaces: submitting tasks, getting the number. of running goroutines, readjusting capacity of pool dynamically, closing pool.
- Automatically managing and recycling a massive number of goroutines.
- Availability of overriding the Task or Callable interface.
- Unlimited task queue.
- Recover panic in eva.CustomTask.
Installation
go get github.com/pshopper/eva
Or, using dep:
dep ensure -add github.com/pshopper/eva
Usage
Async
Simple
config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
p := eva.NewPool(&config)
t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
// some work here...
return result, err
}, args)
p.Submit(t)
p.Wait()
fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic())
p.Close()
With completion
config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
p := eva.NewPool(&config)
completion := make(chan eva.Task, 3)
for i := 0; i < 100; i++ {
t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
// some work here...
return result, err
}, args)
p.SubmitWithCompletion(completion, t)
}
for i := 0; i < 100; i++ {
select {
case t := <-completion:
fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic())
}
}
p.Close()
With context
config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
p := eva.NewPool(&config)
context, cancel := context.WithCancel(context.Background()) // WithDeadline or WithTimeout
for i := 0; i < 100; i++ {
t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
// some work here...
return result, err
}, args)
p.SubmitWithContext(context, t)
}
// some work here...
cancel()
p.Close()
With custom
config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
p := eva.NewPool(&config)
context, cancel := context.WithCancel(context.Background()) // WithDeadline or WithTimeout
completion := make(chan eva.Task, 3)
for i := 0; i < 100; i++ {
t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
// some work here...
return result, err
}, args)
p.SubmitCustom(context, completion, t)
}
for i := 0; i < 3; i++ {
select {
case t := <-completion:
fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic())
}
}
cancel()
p.Close()
Immediate
config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
p := eva.NewPool(&config)
t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
// some work here...
return result, err
}, args)
err := p.SubmitImmediate(t)
if err == nil {
fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic())
}
p.Close()
Sync
config := &eva.Config{Size: 10, UnstoppableWorkers: 5}
p := eva.NewPool(&config)
t := eva.NewCustomTask(func(args ...interface{}) (interface{}, error) {
// some work here...
return result, err
}, args)
p.Submit(t)
fmt.Printf("Task result=%v; error=%v; panic=%v, t.Get(), t.Error(), t.Panic()) // like CompletableFuture
p.Close()
Resize
You can change pool size:
pool.SetSize(1000)
pool.SetSize(1000000)
Or you can change changes unstoppable workers count
pool.SetUnstoppableWorkers(1000)
pool.SetUnstoppableWorkers(1000000)
This is safe to perform from any goroutine even if others are still processing.
Interfaces
You can implement Task interface and submit it in pool:
// Task interface for goroutines pool.
type Task interface {
// Cancel perform cancel task.
Cancel()
// Error returns task Run error.
// Blocked until the task is done.
Error() error
// Get returns task Run result.
// Blocked until the task is done.
Get() interface{}
// IsCancelled returns true if this task has been canceled.
IsCancelled() bool
// IsDone returns true if this task has been done.
IsDone() bool
// Panic returns task Run panic.
// Blocked until the task is done.
Panic() interface{}
// Run perform task.
Run()
}
Ordering
All the tasks submitted to EVA pool will not be guaranteed to be processed in order, because those tasks distribute among a series of concurrent workers, thus those tasks are processed concurrently.