Skip to content

Commit

Permalink
support several signalling servers
Browse files Browse the repository at this point in the history
  • Loading branch information
dmonad committed Dec 5, 2019
1 parent 1fac438 commit 8e0d9e6
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 83 deletions.
69 changes: 29 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,58 +1,47 @@
# WebRTC Connector for [Yjs](https://github.com/y-js/yjs)

It propagates document updates directly to all users via WebRTC. While WebRTC is not the most reliable connector, messages are propagated with almost no delay.
It propagates document updates directly to all users via WebRTC.

* Very fast message propagation (not noticeable)
* Very easy to use
* Very little server load (you still have to set up a [signaling server](http://www.html5rocks.com/en/tutorials/webrtc/infrastructure/))
* Not suited for a large amount of collaborators
* WebRTC is not supported in all browsers, and some have troubles communicating with each other
* Fast message propagation
* No setup required, a default signalling server is available
* Very little server load
* Not suited for a large amount of collaborators on a single document (each peer is connected to each other)

We provide you with a free signaling server (it is used by default), but in production you should set up your own signaling server. You could use the [signalmaster](https://github.com/DadaMonad/signalmaster) from &yet, which is very easy to set up.
## Setup

## Use it!
Retrieve this with bower or npm.
##### Install

##### NPM
```
npm install y-webrtc --save
```sh
npm i y-webrtc
```

##### Bower
```
bower install y-webrtc --save
##### Client code

```js
import * as Y from 'yjs'
import { WebrtcProvider } from '../src/y-webrtc.js'

const ydoc = new Y.Doc()
const provider = new WebrtcProvider('prosemirror', ydoc)
const yarray = ydoc.get('prosemirror', Y.XmlFragment)
```

# Start Hacking
This connector is also a nice starting point to build your own connector. The only 75 SLOCs of code are pretty well documented and understandable. If you have any troubles, don't hesitate to ask me for help!
##### Signalling

### Example
The peers find each other by connecting to a signalling server. This package implements a small signalling server in `./bin/server.js`.

```
Y({
db: {
name: 'memory'
},
connector: {
name: 'webrtc', // use the webrtc connector
room: 'Textarea-example-dev'
},
sourceDir: '/bower_components', // location of the y-* modules
share: {
textarea: 'Text' // y.share.textarea is of type Y.Text
}
}).then(function (y) {
// bind the textarea to a shared text element
y.share.textarea.bind(document.getElementById('textfield'))
}
```sh
# start signalling server
PORT=4444 node ./bin/server.js
```

# Set up Signaling server
This webrtc connector is compatible to [this signaling server](https://github.com/DadaMonad/signalmaster) (signalmaster)
Peers using the same signalling server will find each other. You can specify several custom signalling servers like so:

```js
const provider = new WebrtcProvider('prosemirror', ydoc, { signalling: ['wss://y-webrtc-ckynwnzncc.now.sh', 'ws://localhost:4444'] })
```

## License
Yjs is licensed under the [MIT License](./LICENSE).

<[email protected]>


<[email protected]>
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"preversion": "npm run lint && npm run dist"
},
"bin": {
"y-webrtc-server": "./bin/server.js"
"y-webrtc-signalling": "./bin/server.js"
},
"files": [
"dist/*",
Expand Down
105 changes: 64 additions & 41 deletions src/y-webrtc.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ const messageSync = 0
const messageQueryAwareness = 3
const messageAwareness = 1

const peerId = random.uuidv4()

/**
* @type {Map<string, SignallingConn>}
*/
const signallingConns = new Map()
/**
* @type {Map<string, WebrtcConn>}
*/
const webrtcConns = new Map()

/**
* @type {Map<string,WebrtcRoom>}
*/
const webrtcRooms = new Map()

/**
* @param {WebrtcRoom} webrtcRoom
*/
Expand All @@ -32,17 +48,16 @@ const checkIsSynced = webrtcRoom => {
}

/**
* @param {SignalingConn} signaling
* @param {WebrtcConn} peerConn
* @param {Uint8Array} buf
* @return {encoding.Encoder?}
*/
const readPeerMessage = (signaling, peerConn, buf) => {
const readPeerMessage = (peerConn, buf) => {
const decoder = decoding.createDecoder(buf)
const encoder = encoding.createEncoder()
const messageType = decoding.readVarUint(decoder)
const roomName = decoding.readVarString(decoder)
const webrtcRoom = signaling.rooms.get(roomName)
const webrtcRoom = webrtcRooms.get(roomName)
if (webrtcRoom === undefined) {
return null
}
Expand Down Expand Up @@ -101,7 +116,7 @@ const broadcast = (webrtcRoom, encoder) => {

export class WebrtcConn {
/**
* @param {SignalingConn} signalingConn
* @param {SignallingConn} signalingConn
* @param {boolean} initiator
* @param {string} remotePeerId
* @param {Array<string>} announcedTopics
Expand All @@ -119,12 +134,12 @@ export class WebrtcConn {
*/
this.peer = new Peer({ initiator })
this.peer.on('signal', data => {
signalingConn.send({ type: 'publish', topics: announcedTopics, to: remotePeerId, from: signalingConn.peerId, messageType: 'signal', data })
signalingConn.send({ type: 'publish', topics: announcedTopics, to: remotePeerId, from: peerId, messageType: 'signal', data })
})
this.peer.on('connect', () => {
this.connected = true
announcedTopics.forEach(roomName => {
const room = signalingConn.rooms.get(roomName)
const room = webrtcRooms.get(roomName)
if (room) {
// add peer to room
room.peers.add(this)
Expand All @@ -151,8 +166,8 @@ export class WebrtcConn {
this.peer.on('close', () => {
this.connected = false
this.closed = true
signalingConn.conns.delete(this.remotePeerId)
signalingConn.rooms.forEach(room => {
webrtcConns.delete(this.remotePeerId)
webrtcRooms.forEach(room => {
room.peers.delete(this)
checkIsSynced(room)
})
Expand All @@ -163,7 +178,7 @@ export class WebrtcConn {
this.closed = true
})
this.peer.on('data', data => {
const answer = readPeerMessage(signalingConn, this, data)
const answer = readPeerMessage(this, data)
if (answer !== null) {
send(this, answer)
}
Expand All @@ -189,47 +204,47 @@ export class WebrtcRoom {
}
}

export class SignalingConn extends ws.WebsocketClient {
export class SignallingConn extends ws.WebsocketClient {
constructor (url) {
super(url)
this.peerId = random.uuidv4()
/**
* @type {Map<string,WebrtcRoom>}
* @type {Set<WebrtcProvider>}
*/
this.rooms = new Map()
/**
* @type {Map<string,WebrtcConn>}
*/
this.conns = new Map()
this.afterOpen.push(() => ({ type: 'subscribe', topics: Array.from(this.rooms.keys()) }))
this.afterOpen.push(() => ({ type: 'publish', messageType: 'announce', topics: Array.from(this.rooms.keys()), from: this.peerId }))
this.providers = new Set()
this.afterOpen.push(() => ({ type: 'subscribe', topics: Array.from(webrtcRooms.keys()) }))
this.afterOpen.push(() => ({ type: 'publish', messageType: 'announce', topics: Array.from(webrtcRooms.keys()), from: peerId }))
this.on('message', m => {
if (m.from === this.peerId || (m.to !== undefined && m.to !== this.peerId)) {
if (m.from === peerId || (m.to !== undefined && m.to !== peerId)) {
return
}
switch (m.type) {
case 'publish': {
switch (m.messageType) {
case 'announce':
map.setIfUndefined(this.conns, m.from, () => new WebrtcConn(this, true, m.from, m.topics))
map.setIfUndefined(webrtcConns, m.from, () => new WebrtcConn(this, true, m.from, m.topics))
break
case 'signal':
if (m.to === this.peerId) {
map.setIfUndefined(this.conns, m.from, () => new WebrtcConn(this, false, m.from, m.topics)).peer.signal(m.data)
if (m.to === peerId) {
map.setIfUndefined(webrtcConns, m.from, () => new WebrtcConn(this, false, m.from, m.topics)).peer.signal(m.data)
}
break
}
}
}
})
}
/**
* @param {Array<string>} rooms
*/
subscribe (rooms) {
// only subcribe if connection is established, otherwise the conn automatically subscribes to all webrtcRooms
if (this.connected) {
this.send({ type: 'subscribe', topics: rooms })
this.send({ type: 'publish', messageType: 'announce', topics: Array.from(webrtcRooms.keys()), from: peerId })
}
}
}

/**
* @type {Map<string, SignalingConn>}
*/
const conns = new Map()

/**
* @extends Observable<string>
*/
Expand All @@ -238,22 +253,24 @@ export class WebrtcProvider extends Observable {
* @param {string} room
* @param {Y.Doc} doc
* @param {Object} [opts]
* @param {string} [opts.url]
* @param {Array<string>} [opts.signalling]
*/
constructor (room, doc, { url = 'wss://y-webrtc-rgksxuhaol.now.sh' } = {}) {
constructor (room, doc, { signalling = ['wss://y-webrtc-daliwjiawr.now.sh', 'wss://y-webrtc-lcymcvajue.now.sh'] } = {}) {
super()
this.url = url
this.room = room
this.doc = doc
this.conn = map.setIfUndefined(conns, url, () => new SignalingConn(url))
if (this.conn.rooms.has(room)) {
this.signallingConns = []
signalling.forEach(url => {
const signallingConn = map.setIfUndefined(signallingConns, url, () => new SignallingConn(url))
this.signallingConns.push(signallingConn)
signallingConn.providers.add(this)
signallingConn.subscribe([this.room])
})
if (webrtcRooms.has(room)) {
throw error.create('A Yjs Doc connected to that room already exists!')
}
if (this.conn.connected) {
this.conn.send({ type: 'subscribe', topics: [room] })
}
const webrtcRoom = new WebrtcRoom(doc, this, room)
this.conn.rooms.set(room, webrtcRoom)
webrtcRooms.set(room, webrtcRoom)
/**
* @type {awarenessProtocol.Awareness}
*/
Expand Down Expand Up @@ -294,10 +311,16 @@ export class WebrtcProvider extends Observable {
})
}
destroy () {
if (this.conn.connected) {
this.conn.send({ type: 'unsubscribe', topics: [this.room] })
}
this.conn.rooms.delete(this.room)
this.signallingConns.forEach(conn => {
conn.providers.delete(this)
if (conn.providers.size === 0) {
conn.destroy()
signallingConns.delete(this.room)
} else {
conn.send({ type: 'unsubscribe', topics: [this.room] })
}
})
webrtcRooms.delete(this.room)
this.doc.off('update', this._docUpdateHandler)
this.awareness.off('change', this._awarenessUpdateHandler)
super.destroy()
Expand Down

0 comments on commit 8e0d9e6

Please sign in to comment.