Skip to content

Commit

Permalink
routing/http: return PeerRecord for FindPeers (ipfs#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias authored Oct 17, 2023
1 parent aa6fa14 commit 0a566c9
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The following emojis are used to highlight certain changes:
- Eliminate `..` elements that begin a rooted path: that is, replace "`/..`" by "`/`" at the beginning of a path.
* 🛠 The signature of `CoreAPI.ResolvePath` in `coreiface` has changed to now return
the remainder segments as a second return value, matching the signature of `resolver.ResolveToLastNode`.
* 🛠 `routing/http/client.FindPeers` now returns `iter.ResultIter[types.PeerRecord]` instead of `iter.ResultIter[types.Record]`. The specification indicates that records for this method will always be Peer Records.

### Removed

Expand Down
54 changes: 38 additions & 16 deletions examples/routing/delegated-routing-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,6 @@ func findProviders(w io.Writer, ctx context.Context, client *client.Client, cidS
return printIter(w, recordsIter)
}

func findPeers(w io.Writer, ctx context.Context, client *client.Client, pidStr string) error {
// Parses the given Peer ID to lookup the information for.
pid, err := peer.Decode(pidStr)
if err != nil {
return err
}

// Ask for information about the peer with the given peer ID.
recordsIter, err := client.FindPeers(ctx, pid)
if err != nil {
return err
}
defer recordsIter.Close()
return printIter(w, recordsIter)
}

func printIter(w io.Writer, iter iter.ResultIter[types.Record]) error {
// The response is streamed. Alternatively, you could use [iter.ReadAll]
// to fetch all the results all at once, instead of iterating as they are
Expand Down Expand Up @@ -118,6 +102,44 @@ func printIter(w io.Writer, iter iter.ResultIter[types.Record]) error {
return nil
}

func findPeers(w io.Writer, ctx context.Context, client *client.Client, pidStr string) error {
// Parses the given Peer ID to lookup the information for.
pid, err := peer.Decode(pidStr)
if err != nil {
return err
}

// Ask for information about the peer with the given peer ID.
recordsIter, err := client.FindPeers(ctx, pid)
if err != nil {
return err
}
defer recordsIter.Close()

// The response is streamed. Alternatively, you could use [iter.ReadAll]
// to fetch all the results all at once, instead of iterating as they are
// streamed.
for recordsIter.Next() {
res := recordsIter.Val()

// Check for error, but do not complain if we exceeded the timeout. We are
// expecting that to happen: we explicitly defined a timeout.
if res.Err != nil {
if !errors.Is(res.Err, context.DeadlineExceeded) {
return res.Err
}

return nil
}

fmt.Fprintln(w, res.Val.ID)
fmt.Fprintln(w, "\tProtocols:", res.Val.Protocols)
fmt.Fprintln(w, "\tAddresses:", res.Val.Addrs)
}

return nil
}

func findIPNS(w io.Writer, ctx context.Context, client *client.Client, nameStr string) error {
// Parses the given name string to get a record for.
name, err := ipns.NameFromString(nameStr)
Expand Down
19 changes: 13 additions & 6 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func (c *measuringIter[T]) Close() error {
return c.Iter.Close()
}

// FindProviders searches for providers that are able to provide the given [cid.Cid].
// In a more generic way, it is also used as a mapping between CIDs and relevant metadata.
func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter.ResultIter[types.Record], err error) {
// TODO test measurements
m := newMeasurement("FindProviders")
Expand Down Expand Up @@ -332,7 +334,8 @@ func (c *Client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri
return 0, nil
}

func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error) {
// FindPeers searches for information for the given [peer.ID].
func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) {
m := newMeasurement("FindPeers")

url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String()
Expand All @@ -359,7 +362,7 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI
if resp.StatusCode == http.StatusNotFound {
resp.Body.Close()
m.record(ctx)
return iter.FromSlice[iter.Result[types.Record]](nil), nil
return iter.FromSlice[iter.Result[*types.PeerRecord]](nil), nil
}

if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -387,24 +390,27 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI
}
}()

var it iter.ResultIter[types.Record]
var it iter.ResultIter[*types.PeerRecord]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.PeersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Peers)
var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewRecordsIter(resp.Body)
it = ndjson.NewPeerRecordsIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")
}

return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
}

// GetIPNS tries to retrieve the [ipns.Record] for the given [ipns.Name]. The record is
// validated against the given name. If validation fails, an error is returned, but no
// record.
func (c *Client) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
url := c.baseURL + "/routing/v1/ipns/" + name.String()

Expand Down Expand Up @@ -443,6 +449,7 @@ func (c *Client) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, err
return record, nil
}

// PutIPNS attempts at putting the given [ipns.Record] for the given [ipns.Name].
func (c *Client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
url := c.baseURL + "/routing/v1/ipns/" + name.String()

Expand Down
12 changes: 6 additions & 6 deletions routing/http/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *server.Bits
return args.Get(0).(time.Duration), args.Error(1)
}

func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) {
func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
args := m.Called(ctx, pid, limit)
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
}

func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
Expand Down Expand Up @@ -486,7 +486,7 @@ func TestClient_Provide(t *testing.T) {

func TestClient_FindPeers(t *testing.T) {
peerRecord := makePeerRecord()
peerRecords := []iter.Result[types.Record]{
peerRecords := []iter.Result[*types.PeerRecord]{
{Val: &peerRecord},
}
pid := *peerRecord.ID
Expand All @@ -495,13 +495,13 @@ func TestClient_FindPeers(t *testing.T) {
name string
httpStatusCode int
stopServer bool
routerResult []iter.Result[types.Record]
routerResult []iter.Result[*types.PeerRecord]
routerErr error
clientRequiresStreaming bool
serverStreamingDisabled bool

expErrContains osErrContains
expResult []iter.Result[types.Record]
expResult []iter.Result[*types.PeerRecord]
expStreamingResponse bool
expJSONResponse bool
}{
Expand Down Expand Up @@ -606,7 +606,7 @@ func TestClient_FindPeers(t *testing.T) {
resultIter, err := client.FindPeers(ctx, pid)
c.expErrContains.errContains(t, err)

results := iter.ReadAll[iter.Result[types.Record]](resultIter)
results := iter.ReadAll[iter.Result[*types.PeerRecord]](resultIter)
assert.Equal(t, c.expResult, results)
})
}
Expand Down
31 changes: 9 additions & 22 deletions routing/http/contentrouter/contentrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const ttl = 24 * time.Hour
type Client interface {
FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error)
ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error)
FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error)
FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error)
GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error)
PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error
}
Expand Down Expand Up @@ -196,28 +196,15 @@ func (c *contentRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInf
logger.Warnw("error iterating provider responses: %s", res.Err)
continue
}
v := res.Val
if v.GetSchema() == types.SchemaPeer {
result, ok := v.(*types.PeerRecord)
if !ok {
logger.Errorw(
"problem casting find providers result",
"Schema", v.GetSchema(),
"Type", reflect.TypeOf(v).String(),
)
continue
}

var addrs []multiaddr.Multiaddr
for _, a := range result.Addrs {
addrs = append(addrs, a.Multiaddr)
}

return peer.AddrInfo{
ID: *result.ID,
Addrs: addrs,
}, nil
var addrs []multiaddr.Multiaddr
for _, a := range res.Val.Addrs {
addrs = append(addrs, a.Multiaddr)
}

return peer.AddrInfo{
ID: *res.Val.ID,
Addrs: addrs,
}, nil
}

return peer.AddrInfo{}, err
Expand Down
13 changes: 5 additions & 8 deletions routing/http/contentrouter/contentrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.Resul
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
}

func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[types.Record], error) {
func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[*types.PeerRecord], error) {
args := m.Called(ctx, pid)
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
}

func (m *mockClient) Ready(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -183,17 +183,14 @@ func TestFindPeer(t *testing.T) {
crc := NewContentRoutingClient(client)

p1 := peer.ID("peer1")
ais := []types.Record{
&types.UnknownRecord{
Schema: "unknown",
},
&types.PeerRecord{
ais := []*types.PeerRecord{
{
Schema: types.SchemaPeer,
ID: &p1,
Protocols: []string{"transport-bitswap"},
},
}
aisIter := iter.ToResultIter[types.Record](iter.FromSlice(ais))
aisIter := iter.ToResultIter[*types.PeerRecord](iter.FromSlice(ais))

client.On("FindPeers", ctx, p1).Return(aisIter, nil)

Expand Down
10 changes: 5 additions & 5 deletions routing/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type ContentRouter interface {

// FindPeers searches for peers who have the provided [peer.ID].
// Limit indicates the maximum amount of results to return; 0 means unbounded.
FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error)
FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error)

// GetIPNS searches for an [ipns.Record] for the given [ipns.Name].
GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error)
Expand Down Expand Up @@ -267,7 +267,7 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
}

var (
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record])
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord])
recordsLimit int
)

Expand Down Expand Up @@ -347,7 +347,7 @@ func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) {
writeJSONResult(w, "Provide", resp)
}

func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) {
func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) {
defer peersIter.Close()

peers, err := iter.ReadAllResults(peersIter)
Expand All @@ -361,7 +361,7 @@ func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[
})
}

func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) {
func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) {
writeResultsIterNDJSON(w, peersIter)
}

Expand Down Expand Up @@ -491,7 +491,7 @@ func logErr(method, msg string, err error) {
logger.Infow(msg, "Method", method, "Error", err)
}

func writeResultsIterNDJSON(w http.ResponseWriter, resultIter iter.ResultIter[types.Record]) {
func writeResultsIterNDJSON[T any](w http.ResponseWriter, resultIter iter.ResultIter[T]) {
defer resultIter.Close()

w.Header().Set("Content-Type", mediaTypeNDJSON)
Expand Down
8 changes: 4 additions & 4 deletions routing/http/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestPeers(t *testing.T) {
t.Parallel()

_, pid := makePeerID(t)
results := iter.FromSlice([]iter.Result[types.Record]{
results := iter.FromSlice([]iter.Result[*types.PeerRecord]{
{Val: &types.PeerRecord{
Schema: types.SchemaPeer,
ID: &pid,
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestPeers(t *testing.T) {
t.Parallel()

_, pid := makePeerID(t)
results := iter.FromSlice([]iter.Result[types.Record]{
results := iter.FromSlice([]iter.Result[*types.PeerRecord]{
{Val: &types.PeerRecord{
Schema: types.SchemaPeer,
ID: &pid,
Expand Down Expand Up @@ -374,9 +374,9 @@ func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *BitswapWrit
return args.Get(0).(time.Duration), args.Error(1)
}

func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) {
func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
args := m.Called(ctx, pid, limit)
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
}

func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
Expand Down
2 changes: 1 addition & 1 deletion routing/http/types/json/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type ProvidersResponse struct {

// PeersResponse is the result of a GET Peers request.
type PeersResponse struct {
Peers RecordsArray
Peers []*types.PeerRecord
}

// RecordsArray is an array of [types.Record]
Expand Down
27 changes: 27 additions & 0 deletions routing/http/types/ndjson/records.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,30 @@ func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] {

return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn)
}

// NewPeerRecordsIter returns an iterator that reads [types.PeerRecord] from the given [io.Reader].
// Records with a different schema are safely ignored. If you want to read all records, use
// [NewRecordsIter] instead.
func NewPeerRecordsIter(r io.Reader) iter.Iter[iter.Result[*types.PeerRecord]] {
jsonIter := iter.FromReaderJSON[types.UnknownRecord](r)
mapFn := func(upr iter.Result[types.UnknownRecord]) iter.Result[*types.PeerRecord] {
var result iter.Result[*types.PeerRecord]
if upr.Err != nil {
result.Err = upr.Err
return result
}
switch upr.Val.Schema {
case types.SchemaPeer:
var prov types.PeerRecord
err := json.Unmarshal(upr.Val.Bytes, &prov)
if err != nil {
result.Err = err
return result
}
result.Val = &prov
}
return result
}

return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn)
}

0 comments on commit 0a566c9

Please sign in to comment.