Skip to content

Commit 22b4010

Browse files
authored
Merge pull request #159 from matrix-org/travis/webhooks
Internal: Refactor how webhooks work
2 parents 0eb934a + e988cd4 commit 22b4010

41 files changed

Lines changed: 278 additions & 192 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func makeApi(t *testing.T) *Api {
3030
pubsub := test.NewMemoryPubsub(t)
3131
assert.NotNil(t, pubsub)
3232

33-
communityManager, err := community.NewManager(cnf, db, pubsub, test.MustMakeAuditQueue(5))
33+
communityManager, err := community.NewManager(cnf, db, pubsub, test.NewMatrixNotifier(t))
3434
assert.NoError(t, err)
3535
assert.NotNil(t, communityManager)
3636

cmd/app/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import (
1515

1616
"github.com/go-co-op/gocron/v2"
1717
"github.com/matrix-org/policyserv/config"
18-
"github.com/matrix-org/policyserv/filter/audit"
1918
"github.com/matrix-org/policyserv/homeserver"
2019
"github.com/matrix-org/policyserv/logging" // import this for side effects if this isn't needed directly anymore
20+
"github.com/matrix-org/policyserv/notifiers"
2121
"github.com/matrix-org/policyserv/pubsub"
2222
"github.com/matrix-org/policyserv/redaction"
2323
"github.com/matrix-org/policyserv/storage"
@@ -52,12 +52,12 @@ func main() {
5252
defer db.Close()
5353
defer pubsubClient.Close()
5454

55-
auditQueue, err := audit.NewQueue(instanceConfig.WebhookPoolSize)
55+
notifier, err := notifiers.NewWebhookMatrixNotifier(db, instanceConfig.WebhookPoolSize, instanceConfig.AllowedWebhookDomains)
5656
if err != nil {
5757
log.Fatal(err)
5858
}
5959

60-
communityManager, err := setupCommunityManager(instanceConfig, db, pubsubClient, auditQueue)
60+
communityManager, err := setupCommunityManager(instanceConfig, db, pubsubClient, notifier)
6161
if err != nil {
6262
log.Fatal(err)
6363
}

cmd/app/setup_community_manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package main
33
import (
44
"github.com/matrix-org/policyserv/community"
55
"github.com/matrix-org/policyserv/config"
6-
"github.com/matrix-org/policyserv/filter/audit"
6+
"github.com/matrix-org/policyserv/notifiers"
77
"github.com/matrix-org/policyserv/pubsub"
88
"github.com/matrix-org/policyserv/storage"
99
)
1010

11-
func setupCommunityManager(instanceConfig *config.InstanceConfig, storage storage.PersistentStorage, pubsub pubsub.Client, auditQueue *audit.Queue) (*community.Manager, error) {
12-
return community.NewManager(instanceConfig, storage, pubsub, auditQueue)
11+
func setupCommunityManager(instanceConfig *config.InstanceConfig, storage storage.PersistentStorage, pubsub pubsub.Client, notifier notifiers.MatrixNotifier) (*community.Manager, error) {
12+
return community.NewManager(instanceConfig, storage, pubsub, notifier)
1313
}

community/manager.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111
"github.com/matrix-org/policyserv/config"
1212
"github.com/matrix-org/policyserv/content"
1313
"github.com/matrix-org/policyserv/filter"
14-
"github.com/matrix-org/policyserv/filter/audit"
1514
"github.com/matrix-org/policyserv/internal"
15+
"github.com/matrix-org/policyserv/notifiers"
1616
"github.com/matrix-org/policyserv/pubsub"
1717
"github.com/matrix-org/policyserv/storage"
1818
)
@@ -23,10 +23,10 @@ type Manager struct {
2323
roomToCommunityCache *cache.Cache[string, string] // room ID -> community ID
2424
instanceConfig *config.InstanceConfig
2525
pubsubClient pubsub.Client
26-
auditQueue *audit.Queue
26+
notifier notifiers.MatrixNotifier
2727
}
2828

29-
func NewManager(instanceConfig *config.InstanceConfig, storage storage.PersistentStorage, pubsubClient pubsub.Client, auditQueue *audit.Queue) (*Manager, error) {
29+
func NewManager(instanceConfig *config.InstanceConfig, storage storage.PersistentStorage, pubsubClient pubsub.Client, notifier notifiers.MatrixNotifier) (*Manager, error) {
3030
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
3131
defer cancel()
3232

@@ -58,7 +58,7 @@ func NewManager(instanceConfig *config.InstanceConfig, storage storage.Persisten
5858
roomToCommunityCache: roomIdCache,
5959
instanceConfig: instanceConfig,
6060
pubsubClient: pubsubClient,
61-
auditQueue: auditQueue,
61+
notifier: notifier,
6262
}, nil
6363
}
6464

@@ -210,7 +210,7 @@ func (m *Manager) GetFilterSetForCommunityId(ctx context.Context, communityId st
210210
MaximumSpamVectorValue: 1.0,
211211
}},
212212
}
213-
filterSet, err := filter.NewSet(setConfig, m.storage, m.pubsubClient, m.auditQueue, scanner)
213+
filterSet, err := filter.NewSet(setConfig, m.storage, m.pubsubClient, m.notifier, scanner)
214214
if err != nil {
215215
return nil, err
216216
}

community/manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func makeManager(t *testing.T) *Manager {
2424
assert.NoError(t, err)
2525
assert.NotNil(t, cnf)
2626

27-
manager, err := NewManager(cnf, db, pubsubClient, test.MustMakeAuditQueue(5))
27+
manager, err := NewManager(cnf, db, pubsubClient, test.NewMatrixNotifier(t))
2828
assert.NoError(t, err)
2929
assert.NotNil(t, manager)
3030

filter/audit.go

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,14 @@ import (
66
"fmt"
77
"html"
88
"log"
9-
"net/url"
109
"slices"
1110
"sync"
12-
"testing"
1311
"time"
1412

15-
"github.com/matrix-org/policyserv/config"
16-
"github.com/matrix-org/policyserv/filter/audit"
13+
"github.com/matrix-org/gomatrixserverlib"
1714
"github.com/matrix-org/policyserv/filter/classification"
1815
"github.com/matrix-org/policyserv/filter/confidence"
19-
"github.com/matrix-org/gomatrixserverlib"
16+
"github.com/matrix-org/policyserv/notifiers"
2017
)
2118

2219
type auditContext struct {
@@ -25,26 +22,26 @@ type auditContext struct {
2522
FinalVectors confidence.Vectors
2623
IncrementalVectors []confidence.Vectors
2724
FilterResponses map[string][]classification.Classification
28-
WebhookUrl string
25+
CommunityId string
2926

30-
lock sync.Mutex // use a lock instead of a sync.Map because sync.Map doesn't support generics (and library support appears lacking in quality)
31-
instanceConfig *config.InstanceConfig
27+
lock sync.Mutex // use a lock instead of a sync.Map because sync.Map doesn't support generics (and library support appears lacking in quality)
28+
notifier notifiers.MatrixNotifier
3229
}
3330

34-
func newAuditContext(instanceConfig *config.InstanceConfig, event gomatrixserverlib.PDU, webhookUrl string) (*auditContext, error) {
31+
func newAuditContext(notifier notifiers.MatrixNotifier, communityId string, event gomatrixserverlib.PDU) (*auditContext, error) {
3532
return &auditContext{
3633
Event: event,
3734
FilterResponses: make(map[string][]classification.Classification),
3835
IncrementalVectors: make([]confidence.Vectors, 0),
39-
WebhookUrl: webhookUrl,
36+
CommunityId: communityId,
4037

4138
// Populated later
4239
IsSpam: false,
4340
FinalVectors: nil,
4441

4542
// Internal
46-
lock: sync.Mutex{},
47-
instanceConfig: instanceConfig,
43+
lock: sync.Mutex{},
44+
notifier: notifier,
4845
}, nil
4946
}
5047

@@ -60,32 +57,18 @@ func (c *auditContext) AppendSetGroupVectors(vectors confidence.Vectors) {
6057
c.IncrementalVectors = append(c.IncrementalVectors, vectors)
6158
}
6259

63-
func (c *auditContext) Publish(workQueue *audit.Queue) error {
60+
func (c *auditContext) Publish() error {
6461
c.lock.Lock()
6562
defer c.lock.Unlock()
6663

6764
// Note: we log the audit context so if the webhook fails (or isn't configured) then we
6865
// have an idea of what happened.
6966
log.Printf("[%s | %s | %s] Audit publish: %#v", c.Event.EventID(), c.Event.RoomID(), c.Event.SenderID(), c)
7067

71-
if c.WebhookUrl == "" || !c.IsSpam {
68+
if !c.IsSpam {
7269
return nil // nothing to publish
7370
}
7471

75-
// Validate URL
76-
whUrl, err := url.Parse(c.WebhookUrl)
77-
if err != nil {
78-
return err
79-
}
80-
if !testing.Testing() {
81-
if whUrl.Scheme != "https" {
82-
return fmt.Errorf("webhook URL must be HTTPS")
83-
}
84-
}
85-
if !slices.Contains(c.instanceConfig.AllowedWebhookDomains, whUrl.Host) {
86-
return fmt.Errorf("webhook URL host not allowed")
87-
}
88-
8972
respsJson, err := json.MarshalIndent(c.FilterResponses, "", " ")
9073
if err != nil {
9174
return err // "should never happen"
@@ -116,5 +99,15 @@ func (c *auditContext) Publish(workQueue *audit.Queue) error {
11699
htmlAudit += "</details>" // close the details block from earlier
117100
}
118101

119-
return workQueue.Submit(c.Event.EventID(), htmlAudit, whUrl.String())
102+
// we don't html2text this because long events can cause hookshot to only show text versions, making all
103+
// of our work to contain the spam to a <details> block useless. We still put some sort of message here
104+
// though so clients which don't support HTML can still see something useful.
105+
textAudit := "This event requires HTML."
106+
107+
msgId, err := c.notifier.Send(c.CommunityId, textAudit, htmlAudit)
108+
if err != nil {
109+
return fmt.Errorf("failed to send audit message: %w", err)
110+
}
111+
log.Printf("[%s | %s | %s] Audit message sent: %s", c.Event.EventID(), c.Event.RoomID(), c.Event.SenderID(), msgId)
112+
return nil
120113
}

filter/audit/queue.go

Lines changed: 0 additions & 76 deletions
This file was deleted.

filter/filter_density_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestDensityFilter(t *testing.T) {
2929
defer memStorage.Close()
3030
ps := test.NewMemoryPubsub(t)
3131
defer ps.Close()
32-
set, err := NewSet(cnf, memStorage, ps, test.MustMakeAuditQueue(5), nil)
32+
set, err := NewSet(cnf, memStorage, ps, test.NewMatrixNotifier(t), nil)
3333
assert.NoError(t, err)
3434
assert.NotNil(t, set)
3535

filter/filter_event_type_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestEventTypeFilter(t *testing.T) {
2727
defer memStorage.Close()
2828
ps := test.NewMemoryPubsub(t)
2929
defer ps.Close()
30-
set, err := NewSet(cnf, memStorage, ps, test.MustMakeAuditQueue(5), nil)
30+
set, err := NewSet(cnf, memStorage, ps, test.NewMatrixNotifier(t), nil)
3131
assert.NoError(t, err)
3232
assert.NotNil(t, set)
3333

filter/filter_frequency_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestFrequencyFilter(t *testing.T) {
3232
defer memStorage.Close()
3333
ps := test.NewMemoryPubsub(t)
3434
defer ps.Close()
35-
set, err := NewSet(cnf, memStorage, ps, test.MustMakeAuditQueue(5), nil)
35+
set, err := NewSet(cnf, memStorage, ps, test.NewMatrixNotifier(t), nil)
3636
assert.NoError(t, err)
3737
assert.NotNil(t, set)
3838

0 commit comments

Comments
 (0)