Skip to content

Commit

Permalink
stackless: added NewFunc() for wrapping stack-hungry CPU-bound functions
Browse files Browse the repository at this point in the history
  • Loading branch information
valyala committed Feb 5, 2017
1 parent b69eba7 commit 5abb448
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 38 deletions.
4 changes: 2 additions & 2 deletions stackless/doc.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Package stackless saves stack space for high number of concurrently
// running goroutines, which use writers from compress/* packages.
// Package stackless provides functionality that may save stack space
// for high number of concurrently running goroutines.
package stackless
80 changes: 80 additions & 0 deletions stackless/func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package stackless

import (
"runtime"
"sync"
)

// NewFunc returns stackless wrapper for the function f.
//
// Unlike f, the returned stackless wrapper doesn't use stack space
// on the goroutine that calls it.
// The wrapper may save a lot of stack space if the following conditions
// are met:
//
// - f doesn't contain blocking calls on network, I/O or channels;
// - f uses a lot of stack space;
// - the wrapper is called from high number of concurrent goroutines.
//
// The stackless wrapper returns false if the call cannot be processed
// at the moment due to high load.
func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool {
if f == nil {
panic("BUG: f cannot be nil")
}
return func(ctx interface{}) bool {
fw := getFuncWork()
fw.f = f
fw.ctx = ctx

select {
case funcWorkCh <- fw:
default:
putFuncWork(fw)
return false
}
<-fw.done
putFuncWork(fw)
return true
}
}

func init() {
n := runtime.GOMAXPROCS(-1)
for i := 0; i < n; i++ {
go funcWorker()
}
}

func funcWorker() {
for fw := range funcWorkCh {
fw.f(fw.ctx)
fw.done <- struct{}{}
}
}

var funcWorkCh = make(chan *funcWork, runtime.GOMAXPROCS(-1)*1024)

func getFuncWork() *funcWork {
v := funcWorkPool.Get()
if v == nil {
v = &funcWork{
done: make(chan struct{}),
}
}
return v.(*funcWork)
}

func putFuncWork(fw *funcWork) {
fw.f = nil
fw.ctx = nil
funcWorkPool.Put(fw)
}

var funcWorkPool sync.Pool

type funcWork struct {
f func(ctx interface{})
ctx interface{}
done chan struct{}
}
86 changes: 86 additions & 0 deletions stackless/func_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package stackless

import (
"fmt"
"sync/atomic"
"testing"
"time"
)

func TestNewFuncSimple(t *testing.T) {
var n uint64
f := NewFunc(func(ctx interface{}) {
atomic.AddUint64(&n, uint64(ctx.(int)))
})

iterations := 2 * cap(funcWorkCh)
for i := 0; i < iterations; i++ {
if !f(2) {
t.Fatalf("f mustn't return false")
}
}
if n != uint64(2*iterations) {
t.Fatalf("Unexpected n: %d. Expecting %d", n, 2*iterations)
}
}

func TestNewFuncMulti(t *testing.T) {
var n1, n2 uint64
f1 := NewFunc(func(ctx interface{}) {
atomic.AddUint64(&n1, uint64(ctx.(int)))
})
f2 := NewFunc(func(ctx interface{}) {
atomic.AddUint64(&n2, uint64(ctx.(int)))
})

iterations := 2 * cap(funcWorkCh)

f1Done := make(chan error, 1)
go func() {
var err error
for i := 0; i < iterations; i++ {
if !f1(3) {
err = fmt.Errorf("f1 mustn't return false")
break
}
}
f1Done <- err
}()

f2Done := make(chan error, 1)
go func() {
var err error
for i := 0; i < iterations; i++ {
if !f2(5) {
err = fmt.Errorf("f2 mustn't return false")
break
}
}
f2Done <- err
}()

select {
case err := <-f1Done:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}

select {
case err := <-f2Done:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}

if n1 != uint64(3*iterations) {
t.Fatalf("unexpected n1: %d. Expecting %d", n1, 3*iterations)
}
if n2 != uint64(5*iterations) {
t.Fatalf("unexpected n2: %d. Expecting %d", n2, 5*iterations)
}
}
40 changes: 40 additions & 0 deletions stackless/func_timing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package stackless

import (
"sync/atomic"
"testing"
)

func BenchmarkFuncOverhead(b *testing.B) {
var n uint64
f := NewFunc(func(ctx interface{}) {
atomic.AddUint64(&n, *(ctx.(*uint64)))
})
b.RunParallel(func(pb *testing.PB) {
x := uint64(1)
for pb.Next() {
if !f(&x) {
b.Fatalf("f mustn't return false")
}
}
})
if n != uint64(b.N) {
b.Fatalf("unexected n: %d. Expecting %d", n, b.N)
}
}

func BenchmarkFuncPure(b *testing.B) {
var n uint64
f := func(x *uint64) {
atomic.AddUint64(&n, *x)
}
b.RunParallel(func(pb *testing.PB) {
x := uint64(1)
for pb.Next() {
f(&x)
}
})
if n != uint64(b.N) {
b.Fatalf("unexected n: %d. Expecting %d", n, b.N)
}
}
64 changes: 28 additions & 36 deletions stackless/writer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package stackless

import (
"errors"
"fmt"
"github.com/valyala/bytebufferpool"
"io"
"runtime"
)

// Writer is an interface stackless writer must conform to.
Expand All @@ -31,7 +31,6 @@ type NewWriterFunc func(w io.Writer) Writer
func NewWriter(dstW io.Writer, newWriter NewWriterFunc) Writer {
w := &writer{
dstW: dstW,
done: make(chan error),
}
w.zw = newWriter(&w.xw)
return w
Expand All @@ -42,8 +41,8 @@ type writer struct {
zw Writer
xw xWriter

done chan error
n int
err error
n int

p []byte
op op
Expand Down Expand Up @@ -81,8 +80,10 @@ func (w *writer) Reset(dstW io.Writer) {

func (w *writer) do(op op) error {
w.op = op
writerCh <- w
err := <-w.done
if !stacklessWriterFunc(w) {
return errHighLoad
}
err := w.err
if err != nil {
return err
}
Expand All @@ -94,6 +95,27 @@ func (w *writer) do(op op) error {
return err
}

var errHighLoad = errors.New("cannot compress data due to high load")

var stacklessWriterFunc = NewFunc(writerFunc)

func writerFunc(ctx interface{}) {
w := ctx.(*writer)
switch w.op {
case opWrite:
w.n, w.err = w.zw.Write(w.p)
case opFlush:
w.err = w.zw.Flush()
case opClose:
w.err = w.zw.Close()
case opReset:
w.zw.Reset(&w.xw)
w.err = nil
default:
panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))
}
}

type xWriter struct {
bb *bytebufferpool.ByteBuffer
}
Expand All @@ -114,33 +136,3 @@ func (w *xWriter) Reset() {
}

var bufferPool bytebufferpool.Pool

func init() {
n := runtime.GOMAXPROCS(-1)
writerCh = make(chan *writer, n)
for i := 0; i < n; i++ {
go worker()
}
}

var writerCh chan *writer

func worker() {
var err error
for w := range writerCh {
switch w.op {
case opWrite:
w.n, err = w.zw.Write(w.p)
case opFlush:
err = w.zw.Flush()
case opClose:
err = w.zw.Close()
case opReset:
w.zw.Reset(&w.xw)
err = nil
default:
panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))
}
w.done <- err
}
}

0 comments on commit 5abb448

Please sign in to comment.