From fc0c5c654bf326d4f50773c983157e9eed7c4891 Mon Sep 17 00:00:00 2001 From: Simon Date: Sun, 20 Jan 2019 17:22:02 +0100 Subject: [PATCH 1/3] Implemented Feature which enable Connector to leverage Responses Signed-off-by: Simon Pelczer --- types/invoker.go | 48 ++++++++------ types/invoker_test.go | 146 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 19 deletions(-) create mode 100644 types/invoker_test.go diff --git a/types/invoker.go b/types/invoker.go index 1170972..e26167f 100644 --- a/types/invoker.go +++ b/types/invoker.go @@ -15,8 +15,16 @@ type Invoker struct { GatewayURL string } -func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) { - if len(*message) > 0 { +type InvocationResult struct { + StatusCode int + Body *[]byte + Error error +} + +func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) (result map[string]*InvocationResult) { + result = make(map[string]*InvocationResult) + + if message != nil && len(*message) > 0 { matchedFunctions := topicMap.Match(topic) for _, matchedFunction := range matchedFunctions { @@ -26,32 +34,34 @@ func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) { gwURL := fmt.Sprintf("%s/function/%s", i.GatewayURL, matchedFunction) reader := bytes.NewReader(*message) - body, statusCode, doErr := invokefunction(i.Client, gwURL, reader) + err, statusCode, headers, body := performInvocation(i.Client, gwURL, reader) - if doErr != nil { - log.Printf("Unable to invoke from %s, error: %s\n", matchedFunction, doErr) + if err != nil { + result[matchedFunction] = &InvocationResult{ + StatusCode: statusCode, + Body: nil, + Error: err, + } return } - printBody := false - stringOutput := "" - if body != nil && i.PrintResponse { - stringOutput = string(*body) - printBody = true + stringOutput := string(*body) + log.Printf("Headers: %s", headers) + log.Printf("Response: [%d] from %s %s", statusCode, matchedFunction, stringOutput) } - if printBody { - log.Printf("Response [%d] from %s %s", statusCode, matchedFunction, stringOutput) - - } else { - log.Printf("Response [%d] from %s", statusCode, matchedFunction) + result[matchedFunction] = &InvocationResult{ + StatusCode: statusCode, + Body: body, + Error: nil, } } } + return result } -func invokefunction(c *http.Client, gwURL string, reader io.Reader) (*[]byte, int, error) { +func performInvocation(c *http.Client, gwURL string, reader io.Reader) (err error, statusCode int, responseHeaders http.Header, responseBody *[]byte) { httpReq, _ := http.NewRequest(http.MethodPost, gwURL, reader) @@ -63,7 +73,7 @@ func invokefunction(c *http.Client, gwURL string, reader io.Reader) (*[]byte, in res, doErr := c.Do(httpReq) if doErr != nil { - return nil, http.StatusServiceUnavailable, doErr + return doErr, http.StatusServiceUnavailable, nil, nil } if res.Body != nil { @@ -72,11 +82,11 @@ func invokefunction(c *http.Client, gwURL string, reader io.Reader) (*[]byte, in bytesOut, readErr := ioutil.ReadAll(res.Body) if readErr != nil { log.Printf("Error reading body") - return nil, http.StatusServiceUnavailable, doErr + return doErr, http.StatusServiceUnavailable, nil, nil } body = &bytesOut } - return body, res.StatusCode, doErr + return nil, res.StatusCode, res.Header, body } diff --git a/types/invoker_test.go b/types/invoker_test.go new file mode 100644 index 0000000..37adcc4 --- /dev/null +++ b/types/invoker_test.go @@ -0,0 +1,146 @@ +// Copyright (c) OpenFaaS Project 2018. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +package types + +import ( + "bytes" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestInvoker_Invoke(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + function := strings.Split(r.URL.Path, "/")[2] + + switch function { + case "200": + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(200) + w.Write([]byte("Hello World")) + case "401": + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(401) + w.Write([]byte("")) + case "500": + } + w.WriteHeader(500) + })) + client := srv.Client() + topicMap := NewTopicMap() + sampleFunc := map[string][]string{ + "Success": []string{"200"}, + "UnAuthorized": []string{"401"}, + "Internal Error": []string{"500"}, + } + topicMap.Sync(&sampleFunc) + + t.Run("Empty Body", func(t *testing.T) { + target := &Invoker{ + true, + client, + srv.URL, + } + + body := []byte("") + results := target.Invoke(&topicMap, "Success", &body) + + if len(results) != 0 { + t.Errorf("Expected 0 results recieved %d", len(results)) + } + }) + + t.Run("Successful Response", func(t *testing.T) { + target := &Invoker{ + true, + client, + srv.URL, + } + + body := []byte("Body") + results := target.Invoke(&topicMap, "Success", &body) + + if len(results) != 1 { + t.Errorf("Expected 1 results recieved %d", len(results)) + } + + result := results["200"] + + if result.Error != nil { + t.Errorf("Recieved unexpected error %s", result.Error) + } + + if result.StatusCode != 200 { + t.Errorf("Expected Statuscode 200 recieved %d", result.StatusCode) + } + + expected := []byte("Hello World") + if !bytes.Equal(expected, *result.Body) { + t.Errorf("Expected %s as body recieved %s", expected, *result.Body) + } + }) + + t.Run("Unauthorized Response", func(t *testing.T) { + target := &Invoker{ + true, + client, + srv.URL, + } + + body := []byte("Body") + results := target.Invoke(&topicMap, "UnAuthorized", &body) + + if len(results) != 1 { + t.Errorf("Expected 1 results recieved %d", len(results)) + } + + result := results["401"] + + if result.Error != nil { + t.Errorf("Recieved unexpected error %s", result.Error) + } + + if result.StatusCode != 401 { + t.Errorf("Expected Statuscode 200 recieved %d", result.StatusCode) + } + + expected := []byte("") + if !bytes.Equal(expected, *result.Body) { + t.Errorf("Expected %s as body recieved %s", expected, *result.Body) + } + }) + + t.Run("Server Error Response", func(t *testing.T) { + target := &Invoker{ + true, + client, + srv.URL, + } + + body := []byte("Body") + results := target.Invoke(&topicMap, "Internal Error", &body) + + if len(results) != 1 { + t.Errorf("Expected 1 results recieved %d", len(results)) + } + + result := results["500"] + + if result.Error != nil { + t.Errorf("Recieved unexpected error %s", result.Error) + } + + if result.StatusCode != 500 { + t.Errorf("Expected Statuscode 200 recieved %d", result.StatusCode) + } + + expected := []byte("") + if !bytes.Equal(expected, *result.Body) { + t.Errorf("Expected %s as body recieved %s", expected, *result.Body) + } + }) +} From df64b7dcdf7c3d52bf5ee7936093e06b0ce5295d Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 24 Jan 2019 23:03:13 +0100 Subject: [PATCH 2/3] Adjusted Code based on received feedback * Using a Struct for return value for performInvocation * No longer aborting requests on first error * performInvocation is now method of Invoker * Handled NewRequest error, altough this only happens when miss configured Signed-off-by: Simon Pelczer --- types/invoker.go | 60 +++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/types/invoker.go b/types/invoker.go index e26167f..5bcd233 100644 --- a/types/invoker.go +++ b/types/invoker.go @@ -21,6 +21,20 @@ type InvocationResult struct { Error error } +func NewInvocationResult(statusCode int, body *[]byte, error error) *InvocationResult { + return &InvocationResult{StatusCode: statusCode, Body: body, Error: error} +} + +type InvocationResponse struct { + StatusCode int + Body *[]byte + Headers http.Header +} + +func NewInvocationResponse(statusCode int, body *[]byte, headers http.Header) *InvocationResponse { + return &InvocationResponse{StatusCode: statusCode, Body: body, Headers: headers} +} + func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) (result map[string]*InvocationResult) { result = make(map[string]*InvocationResult) @@ -30,40 +44,38 @@ func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) (res for _, matchedFunction := range matchedFunctions { log.Printf("Invoke function: %s", matchedFunction) - - gwURL := fmt.Sprintf("%s/function/%s", i.GatewayURL, matchedFunction) + functionURL := fmt.Sprintf("%s/function/%s", i.GatewayURL, matchedFunction) reader := bytes.NewReader(*message) - err, statusCode, headers, body := performInvocation(i.Client, gwURL, reader) + response, err := i.performInvocation(functionURL, reader) if err != nil { - result[matchedFunction] = &InvocationResult{ - StatusCode: statusCode, - Body: nil, - Error: err, + if response != nil { + result[matchedFunction] = NewInvocationResult(response.StatusCode, nil, err) + } else { + result[matchedFunction] = NewInvocationResult(-1, nil, err) } - return - } - - if body != nil && i.PrintResponse { - stringOutput := string(*body) - log.Printf("Headers: %s", headers) - log.Printf("Response: [%d] from %s %s", statusCode, matchedFunction, stringOutput) + } else { + result[matchedFunction] = NewInvocationResult(response.StatusCode, response.Body, err) } - result[matchedFunction] = &InvocationResult{ - StatusCode: statusCode, - Body: body, - Error: nil, + if response != nil && response.Body != nil && i.PrintResponse { + stringOutput := string(*response.Body) + log.Printf("Headers: %s", response.Headers) + log.Printf("Response: [%d] from %s %s", response.StatusCode, matchedFunction, stringOutput) } } } return result } -func performInvocation(c *http.Client, gwURL string, reader io.Reader) (err error, statusCode int, responseHeaders http.Header, responseBody *[]byte) { +func (i *Invoker) performInvocation(functionURL string, bodyReader io.Reader) (*InvocationResponse, error) { + + httpReq, requestErr := http.NewRequest(http.MethodPost, functionURL, bodyReader) - httpReq, _ := http.NewRequest(http.MethodPost, gwURL, reader) + if requestErr != nil { + return nil, requestErr + } if httpReq.Body != nil { defer httpReq.Body.Close() @@ -71,9 +83,9 @@ func performInvocation(c *http.Client, gwURL string, reader io.Reader) (err erro var body *[]byte - res, doErr := c.Do(httpReq) + res, doErr := i.Client.Do(httpReq) if doErr != nil { - return doErr, http.StatusServiceUnavailable, nil, nil + return nil, doErr } if res.Body != nil { @@ -82,11 +94,11 @@ func performInvocation(c *http.Client, gwURL string, reader io.Reader) (err erro bytesOut, readErr := ioutil.ReadAll(res.Body) if readErr != nil { log.Printf("Error reading body") - return doErr, http.StatusServiceUnavailable, nil, nil + return NewInvocationResponse(res.StatusCode, nil, res.Header), readErr } body = &bytesOut } - return nil, res.StatusCode, res.Header, body + return NewInvocationResponse(res.StatusCode, body, res.Header), nil } From d8420ed06f397110af310a5c08ef7604da1c663c Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 24 Jan 2019 23:05:15 +0100 Subject: [PATCH 3/3] Adjusted Tests * They now check that an failing function wont abort invocations * They check that no invocation is performed for invalid messages * They check error is only set when an error occurred Signed-off-by: Simon Pelczer --- types/invoker_test.go | 145 +++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 72 deletions(-) diff --git a/types/invoker_test.go b/types/invoker_test.go index 37adcc4..8862852 100644 --- a/types/invoker_test.go +++ b/types/invoker_test.go @@ -4,7 +4,7 @@ package types import ( - "bytes" + "fmt" "net/http" "net/http/httptest" "strings" @@ -16,131 +16,132 @@ func TestInvoker_Invoke(t *testing.T) { function := strings.Split(r.URL.Path, "/")[2] switch function { - case "200": - w.Header().Set("Access-Control-Allow-Origin", "*") + case "success": w.Header().Set("Content-Type", "text/plain") w.WriteHeader(200) w.Write([]byte("Hello World")) - case "401": + break + case "headers": w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Content-Type", "text/plain") - w.WriteHeader(401) + w.Header().Set("Content-Type", "text/html") + w.Header().Set("Charset", "utf-8") + w.WriteHeader(200) + w.Write([]byte("

Hello World

")) + break + case "wrong_payload": + w.WriteHeader(400) + w.Write([]byte("")) + break + case "aborts": + if wr, ok := w.(http.Hijacker); ok { + conn, _, err := wr.Hijack() + if err != nil{ + fmt.Printf("Recieved %s",err) + }else{ + conn.Close() + } + + } + break + case "server_error": + w.WriteHeader(500) w.Write([]byte("")) - case "500": + break } - w.WriteHeader(500) })) + client := srv.Client() topicMap := NewTopicMap() + sampleFunc := map[string][]string{ - "Success": []string{"200"}, - "UnAuthorized": []string{"401"}, - "Internal Error": []string{"500"}, + "All": []string{"success", "headers", "wrong_payload"}, + "Contains_Fail": []string{"success", "server_error", "aborts", "headers"}, + "NOP": []string{}, } + topicMap.Sync(&sampleFunc) - t.Run("Empty Body", func(t *testing.T) { + t.Run("Should invoke no function when body is nil", func(t *testing.T) { target := &Invoker{ - true, - client, - srv.URL, + PrintResponse:false, + Client:client, + GatewayURL: srv.URL, } - body := []byte("") - results := target.Invoke(&topicMap, "Success", &body) + results := target.Invoke(&topicMap, "NOP", nil) if len(results) != 0 { - t.Errorf("Expected 0 results recieved %d", len(results)) + t.Errorf("When body is nil it should perform a request") } }) - t.Run("Successful Response", func(t *testing.T) { + t.Run("Should invoke no function when body is empty", func(t *testing.T) { target := &Invoker{ true, client, srv.URL, } - body := []byte("Body") - results := target.Invoke(&topicMap, "Success", &body) - - if len(results) != 1 { - t.Errorf("Expected 1 results recieved %d", len(results)) - } - - result := results["200"] - - if result.Error != nil { - t.Errorf("Recieved unexpected error %s", result.Error) - } - - if result.StatusCode != 200 { - t.Errorf("Expected Statuscode 200 recieved %d", result.StatusCode) - } + body := []byte("") + results := target.Invoke(&topicMap, "NOP", &body) - expected := []byte("Hello World") - if !bytes.Equal(expected, *result.Body) { - t.Errorf("Expected %s as body recieved %s", expected, *result.Body) + if len(results) != 0 { + t.Errorf("When body is empty it should perform a request") } }) - t.Run("Unauthorized Response", func(t *testing.T) { + t.Run("Should invoke all functions", func(t *testing.T) { target := &Invoker{ true, client, srv.URL, } - body := []byte("Body") - results := target.Invoke(&topicMap, "UnAuthorized", &body) + body := []byte("Some Input") + results := target.Invoke(&topicMap, "All", &body) - if len(results) != 1 { - t.Errorf("Expected 1 results recieved %d", len(results)) + const ExpectedResults = 3 + if len(results) != ExpectedResults { + t.Errorf("Expected %d results recieved %d", ExpectedResults, len(results)) } - result := results["401"] - - if result.Error != nil { - t.Errorf("Recieved unexpected error %s", result.Error) - } + for name, result := range results { + if result.Error != nil { + t.Errorf("Received unexpected error %s for %s", result.Error, name) + } - if result.StatusCode != 401 { - t.Errorf("Expected Statuscode 200 recieved %d", result.StatusCode) + if result.StatusCode != 200 && result.StatusCode != 400 { + t.Errorf("Received unexpected status code %d for %s", result.StatusCode, name) + } } - expected := []byte("") - if !bytes.Equal(expected, *result.Body) { - t.Errorf("Expected %s as body recieved %s", expected, *result.Body) - } }) - t.Run("Server Error Response", func(t *testing.T) { + t.Run("Should invoke all functions even if one request fails", func(t *testing.T) { target := &Invoker{ true, client, srv.URL, } - body := []byte("Body") - results := target.Invoke(&topicMap, "Internal Error", &body) - - if len(results) != 1 { - t.Errorf("Expected 1 results recieved %d", len(results)) - } - - result := results["500"] - - if result.Error != nil { - t.Errorf("Recieved unexpected error %s", result.Error) - } + body := []byte("Hello World") + results := target.Invoke(&topicMap, "Contains_Fail", &body) - if result.StatusCode != 500 { - t.Errorf("Expected Statuscode 200 recieved %d", result.StatusCode) + const ExpectedResults = 4 + if len(results) != ExpectedResults { + t.Errorf("Expected %d results recieved %d", ExpectedResults, len(results)) } - expected := []byte("") - if !bytes.Equal(expected, *result.Body) { - t.Errorf("Expected %s as body recieved %s", expected, *result.Body) + for name, result := range results { + if name == "aborts" { + if result.Error == nil { + t.Errorf("Expected call for %s to fail", name) + } + } else { + if result.Error != nil { + t.Errorf("Received unexpected error %s for %s", result.Error, name) + } + } } }) }