forked from jlelse/GoBlog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathactivityPubSending.go
96 lines (89 loc) · 2.33 KB
/
activityPubSending.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package main
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"io"
"net/http"
"time"
ap "github.com/go-ap/activitypub"
"github.com/go-ap/jsonld"
"go.goblog.app/app/pkgs/bufferpool"
"go.goblog.app/app/pkgs/contenttype"
)
type apRequest struct {
BlogIri, To string
Activity []byte
Try int
}
func (a *goBlog) initAPSendQueue() {
a.listenOnQueue("ap", 30*time.Second, func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) {
var r apRequest
if err := gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r); err != nil {
a.error("Activitypub queue", "err", err)
dequeue()
return
}
if err := a.apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if r.Try++; r.Try < 20 {
// Try it again
buf := bufferpool.Get()
_ = r.encode(buf)
qi.content = buf.Bytes()
reschedule(time.Duration(r.Try) * 10 * time.Minute)
bufferpool.Put(buf)
return
}
a.info("AP request failed for the 20th time", "to", r.To)
_ = a.db.apRemoveInbox(r.To)
}
dequeue()
})
}
func (a *goBlog) apQueueSendSigned(blogIri, to string, activity any) error {
body, err := jsonld.WithContext(jsonld.IRI(ap.ActivityBaseURI), jsonld.IRI(ap.SecurityContextURI)).Marshal(activity)
if err != nil {
return err
}
buf := bufferpool.Get()
defer bufferpool.Put(buf)
if err := (&apRequest{
BlogIri: blogIri,
To: to,
Activity: body,
}).encode(buf); err != nil {
return err
}
return a.enqueue("ap", buf.Bytes(), time.Now())
}
func (r *apRequest) encode(w io.Writer) error {
return gob.NewEncoder(w).Encode(r)
}
func (a *goBlog) apSendSigned(blogIri, to string, activity []byte) error {
// Create request context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// Create request
r, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewReader(activity))
if err != nil {
return err
}
r.Header.Set("Accept-Charset", "utf-8")
r.Header.Set("Accept", contenttype.ASUTF8)
r.Header.Set(contentType, contenttype.ASUTF8)
// Sign request
if err = a.signRequest(r, blogIri); err != nil {
return err
}
// Do request
resp, err := a.httpClient.Do(r)
if err != nil {
return err
}
_ = resp.Body.Close()
if !apRequestIsSuccess(resp.StatusCode) {
return fmt.Errorf("signed request failed with status %d", resp.StatusCode)
}
return nil
}