Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Feature which enable Connector to leverage Responses #4

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 49 additions & 27 deletions types/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,77 @@ type Invoker struct {
GatewayURL string
}

func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) {
if len(*message) > 0 {
type InvocationResult struct {
StatusCode int
Body *[]byte
Error error
}

matchedFunctions := topicMap.Match(topic)
for _, matchedFunction := range matchedFunctions {
func NewInvocationResult(statusCode int, body *[]byte, error error) *InvocationResult {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks redundant to me.

return &InvocationResult{StatusCode: statusCode, Body: body, Error: error}
}

log.Printf("Invoke function: %s", matchedFunction)
type InvocationResponse struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InvocationResponse vs InvocationResult ?

StatusCode int
Body *[]byte
Headers http.Header
}

gwURL := fmt.Sprintf("%s/function/%s", i.GatewayURL, matchedFunction)
reader := bytes.NewReader(*message)
func NewInvocationResponse(statusCode int, body *[]byte, headers http.Header) *InvocationResponse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it adds no value?

return &InvocationResponse{StatusCode: statusCode, Body: body, Headers: headers}
}

body, statusCode, doErr := invokefunction(i.Client, gwURL, reader)
func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) (result map[string]*InvocationResult) {
result = make(map[string]*InvocationResult)

if doErr != nil {
log.Printf("Unable to invoke from %s, error: %s\n", matchedFunction, doErr)
return
}
if message != nil && len(*message) > 0 {

printBody := false
stringOutput := ""
matchedFunctions := topicMap.Match(topic)
for _, matchedFunction := range matchedFunctions {

if body != nil && i.PrintResponse {
stringOutput = string(*body)
printBody = true
}
log.Printf("Invoke function: %s", matchedFunction)
functionURL := fmt.Sprintf("%s/function/%s", i.GatewayURL, matchedFunction)
reader := bytes.NewReader(*message)

if printBody {
log.Printf("Response [%d] from %s %s", statusCode, matchedFunction, stringOutput)
response, err := i.performInvocation(functionURL, reader)

if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're returning here too soon, i.e. if there's multiple matchedFuntions, others would not be executed if an error is thrown early. There could be an error with the receiving function (bug), others could be fine.

if response != nil {
result[matchedFunction] = NewInvocationResult(response.StatusCode, nil, err)
} else {
result[matchedFunction] = NewInvocationResult(-1, nil, err)
}
} else {
log.Printf("Response [%d] from %s", statusCode, matchedFunction)
result[matchedFunction] = NewInvocationResult(response.StatusCode, response.Body, err)
}

if response != nil && response.Body != nil && i.PrintResponse {
stringOutput := string(*response.Body)
log.Printf("Headers: %s", response.Headers)
log.Printf("Response: [%d] from %s %s", response.StatusCode, matchedFunction, stringOutput)
}
}
}
return result
}

func invokefunction(c *http.Client, gwURL string, reader io.Reader) (*[]byte, int, error) {
func (i *Invoker) performInvocation(functionURL string, bodyReader io.Reader) (*InvocationResponse, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this to the more natural sounding invokefunction, perhaps capitalize to invokeFunction or just invoke.


httpReq, _ := http.NewRequest(http.MethodPost, gwURL, reader)
httpReq, requestErr := http.NewRequest(http.MethodPost, functionURL, bodyReader)

if requestErr != nil {
return nil, requestErr
}

if httpReq.Body != nil {
defer httpReq.Body.Close()
}

var body *[]byte

res, doErr := c.Do(httpReq)
res, doErr := i.Client.Do(httpReq)
if doErr != nil {
return nil, http.StatusServiceUnavailable, doErr
return nil, doErr
}

if res.Body != nil {
Expand All @@ -72,11 +94,11 @@ func invokefunction(c *http.Client, gwURL string, reader io.Reader) (*[]byte, in
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
log.Printf("Error reading body")
return nil, http.StatusServiceUnavailable, doErr
return NewInvocationResponse(res.StatusCode, nil, res.Header), readErr

}
body = &bytesOut
}

return body, res.StatusCode, doErr
return NewInvocationResponse(res.StatusCode, body, res.Header), nil
}
147 changes: 147 additions & 0 deletions types/invoker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) OpenFaaS Project 2018. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package types

import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
)

func TestInvoker_Invoke(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you'd like to back-port your tests to my PR that would be appreciated.

I've implemented the changes with a subscriber model that should make testing easier also.

function := strings.Split(r.URL.Path, "/")[2]

switch function {
case "success":
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(200)
w.Write([]byte("Hello World"))
break
case "headers":
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "text/html")
w.Header().Set("Charset", "utf-8")
w.WriteHeader(200)
w.Write([]byte("<h1>Hello World</h1>"))
break
case "wrong_payload":
w.WriteHeader(400)
w.Write([]byte(""))
break
case "aborts":
if wr, ok := w.(http.Hijacker); ok {
conn, _, err := wr.Hijack()
if err != nil{
fmt.Printf("Recieved %s",err)
}else{
conn.Close()
}

}
break
case "server_error":
w.WriteHeader(500)
w.Write([]byte(""))
break
}
}))

client := srv.Client()
topicMap := NewTopicMap()

sampleFunc := map[string][]string{
"All": []string{"success", "headers", "wrong_payload"},
"Contains_Fail": []string{"success", "server_error", "aborts", "headers"},
"NOP": []string{},
}

topicMap.Sync(&sampleFunc)

t.Run("Should invoke no function when body is nil", func(t *testing.T) {
target := &Invoker{
PrintResponse:false,
Client:client,
GatewayURL: srv.URL,
}

results := target.Invoke(&topicMap, "NOP", nil)

if len(results) != 0 {
t.Errorf("When body is nil it should perform a request")
}
})

t.Run("Should invoke no function when body is empty", func(t *testing.T) {
target := &Invoker{
true,
client,
srv.URL,
}

body := []byte("")
results := target.Invoke(&topicMap, "NOP", &body)

if len(results) != 0 {
t.Errorf("When body is empty it should perform a request")
}
})

t.Run("Should invoke all functions", func(t *testing.T) {
target := &Invoker{
true,
client,
srv.URL,
}

body := []byte("Some Input")
results := target.Invoke(&topicMap, "All", &body)

const ExpectedResults = 3
if len(results) != ExpectedResults {
t.Errorf("Expected %d results recieved %d", ExpectedResults, len(results))
}

for name, result := range results {
if result.Error != nil {
t.Errorf("Received unexpected error %s for %s", result.Error, name)
}

if result.StatusCode != 200 && result.StatusCode != 400 {
t.Errorf("Received unexpected status code %d for %s", result.StatusCode, name)
}
}

})

t.Run("Should invoke all functions even if one request fails", func(t *testing.T) {
target := &Invoker{
true,
client,
srv.URL,
}

body := []byte("Hello World")
results := target.Invoke(&topicMap, "Contains_Fail", &body)

const ExpectedResults = 4
if len(results) != ExpectedResults {
t.Errorf("Expected %d results recieved %d", ExpectedResults, len(results))
}

for name, result := range results {
if name == "aborts" {
if result.Error == nil {
t.Errorf("Expected call for %s to fail", name)
}
} else {
if result.Error != nil {
t.Errorf("Received unexpected error %s for %s", result.Error, name)
}
}
}
})
}