Skip to content

Commit

Permalink
feat: bulk streamed processing
Browse files Browse the repository at this point in the history
Add streamed processing of bulk, eliminating the limits needed to
prevent memory requirements as well as recreation of svrquery.Client for
each processed element by implementing a reusable BulkClient.

This changes the processing to best effort and can result in entries
without an address being output.

Also:
* Output to a provided stream.
* Eliminate the use of log.Fatal in the library.
* Include element details when parsing fails.
  • Loading branch information
stevenh committed Jan 22, 2022
1 parent 42f5a0b commit 33ed3b7
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 92 deletions.
214 changes: 123 additions & 91 deletions cmd/cli/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ package main

import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"log"
"io"
"os"
"strings"
"sync"

"github.com/multiplay/go-svrquery/lib/svrquery"
"github.com/multiplay/go-svrquery/lib/svrquery/protocol"
)

const (
numWorkers = 100

// maxQueries is the maximum number of queries that can be queried in one bulk request.
maxQueries = 10000
)

var (
Expand All @@ -28,144 +27,173 @@ var (
// BulkResponseItem contains the information about the query being performed
// against a single server.
type BulkResponseItem struct {
Address string `json:"address"`
Address string `json:"address,omitempty"`
ServerInfo *BulkResponseServerInfoItem `json:"serverInfo,omitempty"`
Error string `json:"error,omitempty"`
}

// encode writes the JSON encoded version of i to w using the encoder e which writes to b.
// It strips the trailing \n from the output before writing to w.
func (i *BulkResponseItem) encode(w io.Writer, b *bytes.Buffer, e *json.Encoder) error {
defer b.Reset()

if err := e.Encode(i); err != nil {
return fmt.Errorf("encode item %v: %w", i, err)
}

if _, err := w.Write(bytes.TrimRight(b.Bytes(), "\n")); err != nil {
return fmt.Errorf("write item: %w", err)
}

return nil
}

// BulkResponseServerInfoItem containing basic server information.
type BulkResponseServerInfoItem struct {
CurrentPlayers int64 `json:"currentPlayers"`
MaxPlayers int64 `json:"maxPlayers"`
Map string `json:"map"`
}

// BulkResponseItemWork is an item returned by an worker containing the data item
// plus any terminal error it encountered.
type BulkResponseItemWork struct {
Item *BulkResponseItem
Err error
}
// queryBulk queries a bulk set of servers from a query file writing the JSON results to output.
func queryBulk(file string, output io.Writer) (err error) {
work := make(chan string, numWorkers) // Buffered to ensure we can busy all workers.
results := make(chan BulkResponseItem, numWorkers) // Buffered to improve worker concurrency.

// queryBulk queries a bulk set of servers using a query file.
func queryBulk(file string) error {
// To simplify the workerpool load all the entries we are going to work on
lines := fileLines(file)
// Create a pool of workers to process work.
var wgWorkers sync.WaitGroup
wgWorkers.Add(numWorkers)
for w := 1; w <= numWorkers; w++ {
c, err := svrquery.NewBulkClient()
if err != nil {
close(work) // Ensure that existing workers return.
return fmt.Errorf("bulk client: %w", err)
}

if len(lines) > maxQueries {
return fmt.Errorf("too many servers requested %d (max %d)", len(lines), maxQueries)
go func() {
defer wgWorkers.Done()
worker(work, results, c)
}()
}

// Make a jobs channel and a number of workers to processes
// work off of the channel.
jobChan := make(chan string, len(lines))
resultsChan := make(chan BulkResponseItemWork)
for w := 1; w <= numWorkers; w++ {
go worker(jobChan, resultsChan)
// Create a writer to write the results to output as they become available.
errc := make(chan error)
go func() {
errc <- writer(output, results)
}()

// Queue work onto the channel.
if err = producer(file, work); err != nil {
err = fmt.Errorf("producer: %w", err)
}

// Wait for all workers to complete so that we can safely close results
// that will trigger writer to return once its processed all results.
wgWorkers.Wait()
close(results)

if werr := <-errc; werr != nil {
if err != nil {
return fmt.Errorf("%w, writer: %s", err, werr)
}
return fmt.Errorf("writer: %w", werr)
}

items := make([]BulkResponseItem, 0, len(lines))
return err
}

// Queue work onto the channel
for _, line := range lines {
jobChan <- line
// writer writes results as JSON encoded array to w.
func writer(w io.Writer, results <-chan BulkResponseItem) (err error) {
if _, err = w.Write([]byte{'['}); err != nil {
return fmt.Errorf("write header: %w", err)
}
close(jobChan)

// Receive results from workers.
var err error
for i := 0; i < len(lines); i++ {
v := <-resultsChan
switch {
case errors.Is(v.Err, errNoItem):
// Not fatal, but no response for this entry was created.
continue
case v.Err != nil:
// We had a major issue processing the list
// Do our best to write valid JSON by ensuring we always write
// the closing ]. If a previous encode fails, this could still
// be insufficient.
defer func() {
if _, werr := w.Write([]byte("]\n")); werr != nil {
werr = fmt.Errorf("write trailer: %w", err)
if err == nil {
err = fmt.Errorf("fatal error: %w", v.Err)
continue
err = werr
}
err = fmt.Errorf("additional error: %w", v.Err)
continue
}
// add the item to our list of items.
items = append(items, *v.Item)
}()

var b bytes.Buffer
e := json.NewEncoder(&b)

// Process the first item before looping so separating
// comma can be written easily.
i, ok := <-results
if !ok {
return nil
}

if err != nil {
if err := i.encode(w, &b, e); err != nil {
return err
}

b, err := json.MarshalIndent(items, "", "\t")
if err != nil {
return err
for i := range results {
if _, err := w.Write([]byte(",")); err != nil {
return fmt.Errorf("write set: %w", err)
}

if err := i.encode(w, &b, e); err != nil {
return err
}
}
fmt.Printf("%s\n", b)

return nil
}

func fileLines(file string) []string {
// producer reads lines from file sending them to work.
// work will be closed before return.
func producer(file string, work chan<- string) error {
defer close(work)

f, err := os.Open(file)
if err != nil {
log.Fatal(err)
return err
}
defer f.Close()

result := make([]string, 0, 1000)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
result = append(result, line)
s := bufio.NewScanner(f)
for s.Scan() {
work <- s.Text()
}
return result

return s.Err()
}

// worker is run in a goroutine to provide processing for the items.
func worker(jobChan <-chan string, results chan<- BulkResponseItemWork) {
for entry := range jobChan {
item, err := processBulkEntry(entry)
results <- BulkResponseItemWork{
Item: item,
Err: err,
}
// worker calls processBulkEntry for each item read from work, writing the result to results.
func worker(work <-chan string, results chan<- BulkResponseItem, client *svrquery.BulkClient) {
for e := range work {
results <- processBulkEntry(e, client)
}
}

// processBulkEntry processes an entry and returns an item containing the result or error.
func processBulkEntry(entry string) (*BulkResponseItem, error) {
// processBulkEntry decodes and processes an entry returning an item containing the result or error.
func processBulkEntry(entry string, client *svrquery.BulkClient) (item BulkResponseItem) {
querySection, addressSection, err := parseEntry(entry)
if err != nil {
return nil, fmt.Errorf("parse file entry: %w", err)
item.Error = fmt.Sprintf("parse file entry: %s", err)
return item
}

item := &BulkResponseItem{
Address: addressSection,
}
item.Address = addressSection

// If the query contains any options retrieve them and
// If the query contains any options retrieve and use them.
querySection, options, err := parseOptions(querySection)
if err != nil {
// These errors are non fatal, as we know which server it is for
item.Error = err.Error()
return item, nil
}

if !protocol.Supported(querySection) {
item.Error = fmt.Sprintf("unsupported protocol: %s", querySection)
return item, nil
return item
}

client, err := svrquery.NewClient(querySection, addressSection, options...)
if err != nil {
item.Error = fmt.Sprintf("create client: %s", err)
return item, nil
}

resp, err := client.Query()
resp, err := client.Query(querySection, addressSection, options...)
if err != nil {
item.Error = fmt.Sprintf("query client: %s", err)
return item, nil
return item
}

item.ServerInfo = &BulkResponseServerInfoItem{
Expand All @@ -177,23 +205,26 @@ func processBulkEntry(entry string) (*BulkResponseItem, error) {
if currentMap, ok := resp.(protocol.Mapper); ok {
item.ServerInfo.Map = currentMap.Map()
}
return item, nil
return item
}

// pareEntry parses the details from entry returning the query and address sections.
func parseEntry(entry string) (querySection, addressSection string, err error) {
entry = strings.TrimSpace(entry)
if entry == "" {
return "", "", fmt.Errorf("process entry: %w", errNoItem)
return "", "", fmt.Errorf("parse entry %q: %w", entry, errNoItem)
}

sections := strings.Split(entry, " ")
if len(sections) != 2 {
return "", "", fmt.Errorf("%w: wrong number of sections", errEntryInvalid)
return "", "", fmt.Errorf("%w %q: wrong number of sections %d", errEntryInvalid, entry, len(sections))
}

return sections[0], sections[1], nil
}

func parseOptions(querySection string) (baseQuery string, options []svrquery.Option, error error) {
// parseOptions parses querySection returning the baseQuery and query options.
func parseOptions(querySection string) (baseQuery string, options []svrquery.Option, err error) {
options = make([]svrquery.Option, 0)
protocolSections := strings.Split(querySection, ",")
for i := 1; i < len(protocolSections); i++ {
Expand All @@ -202,6 +233,7 @@ func parseOptions(querySection string) (baseQuery string, options []svrquery.Opt
return "", nil, fmt.Errorf("key value pair invalid: %v", keyVal)

}

// Only support key at the moment.
switch strings.ToLower(keyVal[0]) {
case "key":
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func main() {

if *file != "" {
// Use bulk file mode
if err := queryBulk(*file); err != nil {
if err := queryBulk(*file, os.Stdout); err != nil {
l.Fatal(err)
}
return
Expand Down
54 changes: 54 additions & 0 deletions lib/svrquery/bulk_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package svrquery

import (
"net"

"github.com/multiplay/go-svrquery/lib/svrquery/protocol"
)

// BulkClient is a client which can be reused with multiple requests.
type BulkClient struct {
client *Client
}

// NewBulkClient creates a new client with no protocol or
func NewBulkClient(options ...Option) (*BulkClient, error) {
c := &Client{
network: DefaultNetwork,
timeout: DefaultTimeout,
}

for _, o := range options {
if err := o(c); err != nil {
return nil, err
}
}

return &BulkClient{client: c}, nil
}

// Query runs a query against addr with proto and options.
func (b *BulkClient) Query(proto, addr string, options ...Option) (protocol.Responser, error) {
f, err := protocol.Get(proto)
if err != nil {
return nil, err
}

for _, o := range options {
if err := o(b.client); err != nil {
return nil, err
}
}

b.client.Queryer = f(b.client)

if b.client.ua, err = net.ResolveUDPAddr(b.client.network, addr); err != nil {
return nil, err
}

if b.client.c, err = net.DialUDP(b.client.network, nil, b.client.ua); err != nil {
return nil, err
}

return b.client.Query()
}

0 comments on commit 33ed3b7

Please sign in to comment.