Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
johakoch authored and Marcel Ludwig committed Jan 5, 2023
1 parent 5aca545 commit 4034511
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 4 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# Couper Changelog

## [Unreleased](https://github.com/avenga/couper/compare/v1.11.0...master)
## [Unreleased](https://github.com/avenga/couper/compare/v1.11.1...master)

Unreleased changes are available as `avenga/couper:edge` container.

---

## [1.11.1](https://github.com/avenga/couper/releases/tag/v1.11.1)
*
* **Fixed**
* [Endpoint sequences](https://docs.couper.io/configuration/block/endpoint#endpoint-sequence) not being terminated by errors (e.g. `unexpected_status`) (regression; since v1.11.0) ([#648](https://github.com/avenga/couper/pull/648))


## [1.11.0](https://github.com/avenga/couper/releases/tag/v1.11.0)

* **Added**
Expand Down
14 changes: 11 additions & 3 deletions handler/producer/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,15 @@ func pipe(req *http.Request, rt []Roundtrip, kind string, additionalSync *sync.M
}
additionalSync.Store(k, []chan *Result{})

toBreak := false
switch kind {
case "parallel": // execute each sequence branch in parallel
go produceAndPipeResults(ctx, req, rch, srt, additionalSync)
case "sequence": // one by one
produceAndPipeResults(ctx, req, rch, srt, additionalSync)
toBreak = produceAndPipeResults(ctx, req, rch, srt, additionalSync)
}
if toBreak {
break
}
}

Expand Down Expand Up @@ -145,7 +149,7 @@ func pipeResults(target, src chan *Result) {
}
}

func produceAndPipeResults(ctx context.Context, req *http.Request, results chan *Result, rt Roundtrip, additionalSync *sync.Map) {
func produceAndPipeResults(ctx context.Context, req *http.Request, results chan *Result, rt Roundtrip, additionalSync *sync.Map) bool {
outreq := req.WithContext(ctx)
defer close(results)
rs := rt.Produce(outreq, additionalSync)
Expand All @@ -167,11 +171,15 @@ func produceAndPipeResults(ctx context.Context, req *http.Request, results chan
for _, ach := range additionalChs {
ach <- e
}
return
return true
case results <- r:
for _, ach := range additionalChs {
ach <- r
}
if r.Err != nil {
return true
}
}
}
return false
}
67 changes: 67 additions & 0 deletions server/http_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,73 @@ func TestEndpointErrorHandler(t *testing.T) {
}
}

func TestEndpointSequenceBreak(t *testing.T) {
client := test.NewHTTPClient()
helper := test.New(t)

shutdown, hook := newCouper(filepath.Join(testdataPath, "14_couper.hcl"), helper)
defer shutdown()
defer func() {
if !t.Failed() {
return
}
for _, e := range hook.AllEntries() {
t.Logf("%#v", e.Data)
}
}()

type testcase struct {
name string
path string
expectedErrorType string
expBERNames []string
}

for _, tc := range []testcase{
{"sequence break unexpected_status", "/sequence-break-unexpected_status", "unexpected_status", []string{"resolve"}},
{"sequence break backend_timeout", "/sequence-break-backend_timeout", "backend_timeout", []string{"resolve"}},
{"break only one sequence", "/break-only-one-sequence", "unexpected_status", []string{"resolve2", "resolve1", "refl"}},
} {
t.Run(tc.name, func(st *testing.T) {
hook.Reset()
h := test.New(st)

req, err := http.NewRequest(http.MethodGet, "http://domain.local:8080"+tc.path, nil)
h.Must(err)

res, err := client.Do(req)
h.Must(err)

if res.StatusCode != http.StatusBadGateway {
st.Fatalf("want: %d, got: %d", http.StatusBadGateway, res.StatusCode)
}

time.Sleep(time.Millisecond * 200)

berNames := make(map[string]struct{})
for _, e := range hook.AllEntries() {
if e.Data["type"] == "couper_backend" {
request := e.Data["request"].(logging.Fields)
berNames[fmt.Sprintf("%s", request["name"])] = struct{}{}
} else if e.Data["type"] == "couper_access" {
if e.Data["error_type"] != tc.expectedErrorType {
st.Errorf("want: %q, got: %q", tc.expectedErrorType, e.Data["error_type"])
}
}
}
if len(berNames) != len(tc.expBERNames) {
st.Errorf("number of BE request names want: %d, got: %d", len(tc.expBERNames), len(berNames))
} else {
for _, n := range tc.expBERNames {
if _, ok := berNames[n]; !ok {
st.Errorf("missing BE request %q", n)
}
}
}
})
}
}

func TestEndpointACBufferOptions(t *testing.T) {
client := test.NewHTTPClient()
helper := test.New(t)
Expand Down
66 changes: 66 additions & 0 deletions server/testdata/endpoints/14_couper.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,72 @@ server { # error_handler
}
}

endpoint "/sequence-break-unexpected_status" {
request "resolve" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/anything"

expected_status = [418] # break
}

proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect"
set_request_headers = {
x = backend_responses.resolve.headers.content-type
}
expected_status = [200]
}
}

endpoint "/sequence-break-backend_timeout" {
request "resolve" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/anything"
backend {
timeout = "1ns" # break
}
}

proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect"
set_request_headers = {
x = backend_responses.resolve.headers.content-type
}
expected_status = [200]
}
}

endpoint "/break-only-one-sequence" {
request "resolve1" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/anything"

expected_status = [418] # break
}

proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect"
set_request_headers = {
x = backend_responses.resolve1.headers.content-type
}
expected_status = [200]
}

request "resolve2" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/anything"
expected_status = [200]
}

proxy "refl" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect"
set_request_headers = {
x = backend_responses.resolve2.headers.content-type
}
expected_status = [200]
}

response {
status = 200
}
}

api {
endpoint "/1.1" {
request "r1" {
Expand Down

0 comments on commit 4034511

Please sign in to comment.