Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 65 additions & 32 deletions cmd/CLIENT/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (c *Client) downloadFile(cidStr string) error {
}

fmt.Printf("Found %d providers. Getting file manifest...\n", len(providers))
relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWKsLZ7VmZTq7qBHj2cv4DczbEoNFLLDaLLk9ADVxDnqS6"
relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWMbTZL5taZH4CK9hCkTLkXaPadBoMR3KJZRFhbYBPrdkK"

var manifest controlMessage
var firstPeer *webRTC.SimpleWebRTCPeer
Expand Down Expand Up @@ -789,34 +789,65 @@ func (c *Client) downloadFile(cidStr string) error {
return nil
}

func (c *Client) downloadChunksFromPeer(peer *webRTC.SimpleWebRTCPeer, state *DownloadState, startPiece, endPiece int) {
for i := startPiece; i < endPiece; i++ {
state.mu.Lock()
if state.PieceStatus[i] {
state.mu.Unlock()
continue
}
state.PieceAssignees[i] = peer.GetSignalingStream().Conn().RemotePeer()
state.mu.Unlock()
// func (c *Client) downloadChunksFromPeer(peer *webRTC.SimpleWebRTCPeer, state *DownloadState, startPiece, endPiece int) {
// for i := startPiece; i < endPiece; i++ {
// state.mu.Lock()
// if state.PieceStatus[i] {
// state.mu.Unlock()
// continue
// }
// state.PieceAssignees[i] = peer.GetSignalingStream().Conn().RemotePeer()
// state.mu.Unlock()

// req := controlMessage{
// Command: "REQUEST_PIECE",
// CID: state.Manifest.CID,
// Index: int64(i),
// }

// state.mu.Lock() //piece timeout
// state.pieceTimers[i] = time.AfterFunc(PieceTimeout, func() {
// log.Printf("Piece %d timed out, re-requesting...", i)
// c.reRequestPiece(state, i)
// })
// state.mu.Unlock()

// if err := peer.SendJSONReliable(req); err != nil {
// log.Printf("Failed to request piece %d from %s: %v", i, peer.GetSignalingStream().Conn().RemotePeer(), err)
// return
// }
// }
// }

req := controlMessage{
Command: "REQUEST_PIECE",
CID: state.Manifest.CID,
Index: int64(i),
}

state.mu.Lock() //piece timeout
state.pieceTimers[i] = time.AfterFunc(PieceTimeout, func() {
log.Printf("Piece %d timed out, re-requesting...", i)
c.reRequestPiece(state, i)
})
state.mu.Unlock()

if err := peer.SendJSONReliable(req); err != nil {
log.Printf("Failed to request piece %d from %s: %v", i, peer.GetSignalingStream().Conn().RemotePeer(), err)
return
}
}
func (c *Client) downloadChunksFromPeer(peer *webRTC.SimpleWebRTCPeer, state *DownloadState, startPiece, endPiece int) {
for i := startPiece; i < endPiece; i++ {
state.mu.Lock()
if state.PieceStatus[i] {
state.mu.Unlock()
continue
}
state.PieceAssignees[i] = peer.GetSignalingStream().Conn().RemotePeer()
state.mu.Unlock()

req := controlMessage{
Command: "REQUEST_PIECE",
CID: state.Manifest.CID,
Index: int64(i),
}

state.mu.Lock()
state.pieceTimers[i] = time.AfterFunc(PieceTimeout, func() {
log.Printf("Piece %d timed out, re-requesting...", i)
c.reRequestPiece(state, i)
})
state.mu.Unlock()

// Use reliable channel for piece requests (control messages)
if err := peer.SendJSONReliable(req); err != nil {
log.Printf("Failed to request piece %d from %s: %v", i, peer.GetSignalingStream().Conn().RemotePeer(), err)
return
}
}
}

func (c *Client) reRequestPiece(state *DownloadState, pieceIndex int) {
Expand Down Expand Up @@ -1214,6 +1245,8 @@ func (c *Client) handlePieceRequest(ctx context.Context, ctrl controlMessage, pe
return
}

channelIndex := peer.GetFileChannelForPiece(int(ctrl.Index))

totalChunks := (len(pieceBuffer) + MaxChunk - 1) / MaxChunk
for i := 0; i < totalChunks; i++ {
start := i * MaxChunk
Expand Down Expand Up @@ -1245,10 +1278,10 @@ func (c *Client) handlePieceRequest(ctx context.Context, ctrl controlMessage, pe
c.unackedChunksMux.Unlock()
time.AfterFunc(RetransmissionTimeout, func() { c.retransmitChunk(peer, chunkMsg) })

if err := peer.SendJSON(chunkMsg); err != nil {
log.Printf("Failed to send chunk %d of piece %d: %v", i, ctrl.Index, err)
return
}
if err := peer.SendOnFileChannel(channelIndex, chunkMsg); err != nil {
log.Printf("Failed to send chunk %d of piece %d on channel %d: %v", i, ctrl.Index, channelIndex, err)
return
}
delay := c.congestionCtrl[peer.GetSignalingStream().Conn().RemotePeer()]
time.Sleep(delay)
}
Expand Down
210 changes: 122 additions & 88 deletions internal/client/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"sync"
"time"
"strings"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -46,7 +47,8 @@ type SimpleWebRTCPeer struct {
ID peer.ID
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
reliableDC *webrtc.DataChannel
reliableDC *webrtc.DataChannel // Control messages
fileChannels []*webrtc.DataChannel
onMessage func(msg webrtc.DataChannelMessage, peer *SimpleWebRTCPeer)
onCloseCallback func(peerID peer.ID)
fileWriter io.WriteCloser
Expand All @@ -72,13 +74,14 @@ func NewSimpleWebRTCPeer(onMessage func(msg webrtc.DataChannelMessage, peer *Sim
pc: pc,
onMessage: onMessage,
onCloseCallback: onClose,
fileChannels: make([]*webrtc.DataChannel, 3),
closeCh: make(chan struct{}),
state: ConnectionStateNew,
reliableDCOpen: make(chan struct{}), //Initialize the new channel
}

//two data channels: "data" and "reliable"
peer.dcOpenWg.Add(2)
//four data channels: "data" and "reliable"
peer.dcOpenWg.Add(4)

peer.setupConnectionHandlers()
return peer, nil
Expand Down Expand Up @@ -109,84 +112,113 @@ func (p *SimpleWebRTCPeer) setupConnectionHandlers() {
})

p.pc.OnDataChannel(func(dc *webrtc.DataChannel) {
log.Printf("New DataChannel %s", dc.Label())
if dc.Label() == "reliable" {
p.reliableDC = dc
} else {
p.dc = dc
}
p.setupDataChannel(dc)
})
log.Printf("New DataChannel %s", dc.Label())

if dc.Label() == "reliable" {
p.reliableDC = dc
} else if strings.HasPrefix(dc.Label(), "file_") {
var idx int
fmt.Sscanf(dc.Label(), "file_%d", &idx)
if idx >= 0 && idx < len(p.fileChannels) {
p.fileChannels[idx] = dc
}
}
p.setupDataChannel(dc)
})
}

func (p *SimpleWebRTCPeer) setupDataChannel(dc *webrtc.DataChannel) {
dc.OnOpen(func() {
log.Printf("Data channel '%s' opened", dc.Label())
p.setConnectionState(ConnectionStateConnected)

//Signal that this data channel is open
p.dcOpenWg.Done()

if dc.Label() == "reliable" {
//Specifically signal that the reliable channel is open
close(p.reliableDCOpen)
}
})

dc.OnClose(func() {
log.Printf("Data channel '%s' closed", dc.Label())
p.setConnectionState(ConnectionStateClosed)
})

dc.OnMessage(func(msg webrtc.DataChannelMessage) {
p.onMessage(msg, p)
})
dc.OnOpen(func() {
log.Printf("Data channel '%s' opened", dc.Label())
p.setConnectionState(ConnectionStateConnected)

// Signal that this data channel is open
p.dcOpenWg.Done()

if dc.Label() == "reliable" {
close(p.reliableDCOpen)
}
})

dc.OnClose(func() {
log.Printf("Data channel '%s' closed", dc.Label())
p.setConnectionState(ConnectionStateClosed)
})

dc.OnMessage(func(msg webrtc.DataChannelMessage) {
p.onMessage(msg, p)
})
}

func (p *SimpleWebRTCPeer) CreateOffer() (string, error) {
// Create the unreliable data channel for file chunks
dc, err := p.pc.CreateDataChannel("data", nil)
if err != nil {
return "", fmt.Errorf("failed to create data channel: %w", err)
}
p.dc = dc
p.setupDataChannel(p.dc)

// Create the reliable data channel for control messages
ordered := true
maxRetransmits := uint16(5)
reliableDC, err := p.pc.CreateDataChannel("reliable", &webrtc.DataChannelInit{
Ordered: &ordered,
MaxRetransmits: &maxRetransmits,
})
if err != nil {
return "", fmt.Errorf("failed to create reliable data channel: %w", err)
}
p.reliableDC = reliableDC
p.setupDataChannel(p.reliableDC)

offer, err := p.pc.CreateOffer(nil)
if err != nil {
return "", err
}

gatherComplete := webrtc.GatheringCompletePromise(p.pc)
func (p *SimpleWebRTCPeer) SendOnFileChannel(channelIndex int, v interface{}) error {
if channelIndex < 0 || channelIndex >= len(p.fileChannels) {
return fmt.Errorf("invalid channel index: %d", channelIndex)
}

dc := p.fileChannels[channelIndex]
if dc == nil || dc.ReadyState() != webrtc.DataChannelStateOpen {
return fmt.Errorf("file channel %d is not open", channelIndex)
}

data, err := json.Marshal(v)
if err != nil {
return err
}
return dc.SendText(string(data))
}

if err := p.pc.SetLocalDescription(offer); err != nil {
return "", err
}
func (p *SimpleWebRTCPeer) GetFileChannelForPiece(pieceIdx int) int{
return pieceIdx % 3
}

select {
case <-gatherComplete:
case <-time.After(maxICEGatheringTimeout):
log.Println("ICE gathering timed out")
}

offerJSON, err := json.Marshal(p.pc.LocalDescription())
if err != nil {
return "", err
}
return string(offerJSON), nil
func (p *SimpleWebRTCPeer) CreateOffer() (string, error) {
// Create the reliable data channel for control messages
ordered := true
maxRetransmits := uint16(5)
reliableDC, err := p.pc.CreateDataChannel("reliable", &webrtc.DataChannelInit{
Ordered: &ordered,
MaxRetransmits: &maxRetransmits,
})
if err != nil {
return "", fmt.Errorf("failed to create reliable data channel: %w", err)
}
p.reliableDC = reliableDC
p.setupDataChannel(p.reliableDC)

//3 data channels for file transfer
for i := 0; i < 3; i++ {
channel := fmt.Sprintf("file_%d", i)
fileChannel, err := p.pc.CreateDataChannel(channel, nil) // Unordered by default
if err != nil {
return "", fmt.Errorf("failed to create file channel %d: %w", i, err)
}
p.fileChannels[i] = fileChannel
p.setupDataChannel(fileChannel)
}

offer, err := p.pc.CreateOffer(nil)
if err != nil {
return "", err
}

gatherComplete := webrtc.GatheringCompletePromise(p.pc)

if err := p.pc.SetLocalDescription(offer); err != nil {
return "", err
}

select {
case <-gatherComplete:
case <-time.After(maxICEGatheringTimeout):
log.Println("ICE gathering timed out")
}

offerJSON, err := json.Marshal(p.pc.LocalDescription())
if err != nil {
return "", err
}
return string(offerJSON), nil
}

func (p *SimpleWebRTCPeer) HandleOffer(offerStr string) (string, error) {
Expand Down Expand Up @@ -375,22 +407,24 @@ func (p *SimpleWebRTCPeer) GetConnectionState() ConnectionState {
}

func (p *SimpleWebRTCPeer) startKeepAlive() {

if p.keepAliveTick != nil {
return
}
p.keepAliveTick = time.NewTicker(keepAliveInterval)
go func() {
for {
select {
case <-p.keepAliveTick.C:
if err := p.SendJSON(map[string]string{"type": "ping"}); err != nil {
log.Printf("Failed to send keepalive: %v", err)
}
case <-p.closeCh:
return
}
}
}()
return
}
p.keepAliveTick = time.NewTicker(keepAliveInterval)
go func() {
for {
select {
case <-p.keepAliveTick.C:
// Use SendJSONReliable instead of SendJSON
if err := p.SendJSONReliable(map[string]string{"type": "ping"}); err != nil {
log.Printf("Failed to send keepalive: %v", err)
}
case <-p.closeCh:
return
}
}
}()
}

func (p *SimpleWebRTCPeer) stopKeepAlive() {
Expand Down
2 changes: 1 addition & 1 deletion internal/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewHost(
}

// Relay config (static relay on Render)
relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWS7jchAU23xcSYasitheTvTyBpjSx4KuRgj5rv5GBBYoB"
relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWMbTZL5taZH4CK9hCkTLkXaPadBoMR3KJZRFhbYBPrdkK"

relayMaddr, err := ma.NewMultiaddr(relayAddrStr)
if err != nil {
Expand Down