diff --git a/cmd/CLIENT/main.go b/cmd/CLIENT/main.go index 98dfab6..4648d2b 100644 --- a/cmd/CLIENT/main.go +++ b/cmd/CLIENT/main.go @@ -655,7 +655,7 @@ func (c *Client) downloadFile(cidStr string) error { } fmt.Printf("Found %d providers. Getting file manifest...\n", len(providers)) - relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWKsLZ7VmZTq7qBHj2cv4DczbEoNFLLDaLLk9ADVxDnqS6" + relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWMbTZL5taZH4CK9hCkTLkXaPadBoMR3KJZRFhbYBPrdkK" var manifest controlMessage var firstPeer *webRTC.SimpleWebRTCPeer @@ -789,34 +789,65 @@ func (c *Client) downloadFile(cidStr string) error { return nil } -func (c *Client) downloadChunksFromPeer(peer *webRTC.SimpleWebRTCPeer, state *DownloadState, startPiece, endPiece int) { - for i := startPiece; i < endPiece; i++ { - state.mu.Lock() - if state.PieceStatus[i] { - state.mu.Unlock() - continue - } - state.PieceAssignees[i] = peer.GetSignalingStream().Conn().RemotePeer() - state.mu.Unlock() +// func (c *Client) downloadChunksFromPeer(peer *webRTC.SimpleWebRTCPeer, state *DownloadState, startPiece, endPiece int) { +// for i := startPiece; i < endPiece; i++ { +// state.mu.Lock() +// if state.PieceStatus[i] { +// state.mu.Unlock() +// continue +// } +// state.PieceAssignees[i] = peer.GetSignalingStream().Conn().RemotePeer() +// state.mu.Unlock() + +// req := controlMessage{ +// Command: "REQUEST_PIECE", +// CID: state.Manifest.CID, +// Index: int64(i), +// } + +// state.mu.Lock() //piece timeout +// state.pieceTimers[i] = time.AfterFunc(PieceTimeout, func() { +// log.Printf("Piece %d timed out, re-requesting...", i) +// c.reRequestPiece(state, i) +// }) +// state.mu.Unlock() + +// if err := peer.SendJSONReliable(req); err != nil { +// log.Printf("Failed to request piece %d from %s: %v", i, peer.GetSignalingStream().Conn().RemotePeer(), err) +// return +// } +// } +// } - req := controlMessage{ - Command: "REQUEST_PIECE", - CID: state.Manifest.CID, - Index: int64(i), - } - - state.mu.Lock() //piece timeout - state.pieceTimers[i] = time.AfterFunc(PieceTimeout, func() { - log.Printf("Piece %d timed out, re-requesting...", i) - c.reRequestPiece(state, i) - }) - state.mu.Unlock() - - if err := peer.SendJSONReliable(req); err != nil { - log.Printf("Failed to request piece %d from %s: %v", i, peer.GetSignalingStream().Conn().RemotePeer(), err) - return - } - } +func (c *Client) downloadChunksFromPeer(peer *webRTC.SimpleWebRTCPeer, state *DownloadState, startPiece, endPiece int) { + for i := startPiece; i < endPiece; i++ { + state.mu.Lock() + if state.PieceStatus[i] { + state.mu.Unlock() + continue + } + state.PieceAssignees[i] = peer.GetSignalingStream().Conn().RemotePeer() + state.mu.Unlock() + + req := controlMessage{ + Command: "REQUEST_PIECE", + CID: state.Manifest.CID, + Index: int64(i), + } + + state.mu.Lock() + state.pieceTimers[i] = time.AfterFunc(PieceTimeout, func() { + log.Printf("Piece %d timed out, re-requesting...", i) + c.reRequestPiece(state, i) + }) + state.mu.Unlock() + + // Use reliable channel for piece requests (control messages) + if err := peer.SendJSONReliable(req); err != nil { + log.Printf("Failed to request piece %d from %s: %v", i, peer.GetSignalingStream().Conn().RemotePeer(), err) + return + } + } } func (c *Client) reRequestPiece(state *DownloadState, pieceIndex int) { @@ -1214,6 +1245,8 @@ func (c *Client) handlePieceRequest(ctx context.Context, ctrl controlMessage, pe return } + channelIndex := peer.GetFileChannelForPiece(int(ctrl.Index)) + totalChunks := (len(pieceBuffer) + MaxChunk - 1) / MaxChunk for i := 0; i < totalChunks; i++ { start := i * MaxChunk @@ -1245,10 +1278,10 @@ func (c *Client) handlePieceRequest(ctx context.Context, ctrl controlMessage, pe c.unackedChunksMux.Unlock() time.AfterFunc(RetransmissionTimeout, func() { c.retransmitChunk(peer, chunkMsg) }) - if err := peer.SendJSON(chunkMsg); err != nil { - log.Printf("Failed to send chunk %d of piece %d: %v", i, ctrl.Index, err) - return - } + if err := peer.SendOnFileChannel(channelIndex, chunkMsg); err != nil { + log.Printf("Failed to send chunk %d of piece %d on channel %d: %v", i, ctrl.Index, channelIndex, err) + return + } delay := c.congestionCtrl[peer.GetSignalingStream().Conn().RemotePeer()] time.Sleep(delay) } diff --git a/internal/client/webrtc.go b/internal/client/webrtc.go index fb05e00..cb63280 100644 --- a/internal/client/webrtc.go +++ b/internal/client/webrtc.go @@ -7,6 +7,7 @@ import ( "log" "sync" "time" + "strings" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -46,7 +47,8 @@ type SimpleWebRTCPeer struct { ID peer.ID pc *webrtc.PeerConnection dc *webrtc.DataChannel - reliableDC *webrtc.DataChannel + reliableDC *webrtc.DataChannel // Control messages + fileChannels []*webrtc.DataChannel onMessage func(msg webrtc.DataChannelMessage, peer *SimpleWebRTCPeer) onCloseCallback func(peerID peer.ID) fileWriter io.WriteCloser @@ -72,13 +74,14 @@ func NewSimpleWebRTCPeer(onMessage func(msg webrtc.DataChannelMessage, peer *Sim pc: pc, onMessage: onMessage, onCloseCallback: onClose, + fileChannels: make([]*webrtc.DataChannel, 3), closeCh: make(chan struct{}), state: ConnectionStateNew, reliableDCOpen: make(chan struct{}), //Initialize the new channel } - //two data channels: "data" and "reliable" - peer.dcOpenWg.Add(2) + //four data channels: "data" and "reliable" + peer.dcOpenWg.Add(4) peer.setupConnectionHandlers() return peer, nil @@ -109,84 +112,113 @@ func (p *SimpleWebRTCPeer) setupConnectionHandlers() { }) p.pc.OnDataChannel(func(dc *webrtc.DataChannel) { - log.Printf("New DataChannel %s", dc.Label()) - if dc.Label() == "reliable" { - p.reliableDC = dc - } else { - p.dc = dc - } - p.setupDataChannel(dc) - }) + log.Printf("New DataChannel %s", dc.Label()) + + if dc.Label() == "reliable" { + p.reliableDC = dc + } else if strings.HasPrefix(dc.Label(), "file_") { + var idx int + fmt.Sscanf(dc.Label(), "file_%d", &idx) + if idx >= 0 && idx < len(p.fileChannels) { + p.fileChannels[idx] = dc + } + } + p.setupDataChannel(dc) + }) } func (p *SimpleWebRTCPeer) setupDataChannel(dc *webrtc.DataChannel) { - dc.OnOpen(func() { - log.Printf("Data channel '%s' opened", dc.Label()) - p.setConnectionState(ConnectionStateConnected) - - //Signal that this data channel is open - p.dcOpenWg.Done() - - if dc.Label() == "reliable" { - //Specifically signal that the reliable channel is open - close(p.reliableDCOpen) - } - }) - - dc.OnClose(func() { - log.Printf("Data channel '%s' closed", dc.Label()) - p.setConnectionState(ConnectionStateClosed) - }) - - dc.OnMessage(func(msg webrtc.DataChannelMessage) { - p.onMessage(msg, p) - }) + dc.OnOpen(func() { + log.Printf("Data channel '%s' opened", dc.Label()) + p.setConnectionState(ConnectionStateConnected) + + // Signal that this data channel is open + p.dcOpenWg.Done() + + if dc.Label() == "reliable" { + close(p.reliableDCOpen) + } + }) + + dc.OnClose(func() { + log.Printf("Data channel '%s' closed", dc.Label()) + p.setConnectionState(ConnectionStateClosed) + }) + + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + p.onMessage(msg, p) + }) } -func (p *SimpleWebRTCPeer) CreateOffer() (string, error) { - // Create the unreliable data channel for file chunks - dc, err := p.pc.CreateDataChannel("data", nil) - if err != nil { - return "", fmt.Errorf("failed to create data channel: %w", err) - } - p.dc = dc - p.setupDataChannel(p.dc) - - // Create the reliable data channel for control messages - ordered := true - maxRetransmits := uint16(5) - reliableDC, err := p.pc.CreateDataChannel("reliable", &webrtc.DataChannelInit{ - Ordered: &ordered, - MaxRetransmits: &maxRetransmits, - }) - if err != nil { - return "", fmt.Errorf("failed to create reliable data channel: %w", err) - } - p.reliableDC = reliableDC - p.setupDataChannel(p.reliableDC) - - offer, err := p.pc.CreateOffer(nil) - if err != nil { - return "", err - } - - gatherComplete := webrtc.GatheringCompletePromise(p.pc) +func (p *SimpleWebRTCPeer) SendOnFileChannel(channelIndex int, v interface{}) error { + if channelIndex < 0 || channelIndex >= len(p.fileChannels) { + return fmt.Errorf("invalid channel index: %d", channelIndex) + } + + dc := p.fileChannels[channelIndex] + if dc == nil || dc.ReadyState() != webrtc.DataChannelStateOpen { + return fmt.Errorf("file channel %d is not open", channelIndex) + } + + data, err := json.Marshal(v) + if err != nil { + return err + } + return dc.SendText(string(data)) +} - if err := p.pc.SetLocalDescription(offer); err != nil { - return "", err - } +func (p *SimpleWebRTCPeer) GetFileChannelForPiece(pieceIdx int) int{ + return pieceIdx % 3 +} - select { - case <-gatherComplete: - case <-time.After(maxICEGatheringTimeout): - log.Println("ICE gathering timed out") - } - offerJSON, err := json.Marshal(p.pc.LocalDescription()) - if err != nil { - return "", err - } - return string(offerJSON), nil +func (p *SimpleWebRTCPeer) CreateOffer() (string, error) { + // Create the reliable data channel for control messages + ordered := true + maxRetransmits := uint16(5) + reliableDC, err := p.pc.CreateDataChannel("reliable", &webrtc.DataChannelInit{ + Ordered: &ordered, + MaxRetransmits: &maxRetransmits, + }) + if err != nil { + return "", fmt.Errorf("failed to create reliable data channel: %w", err) + } + p.reliableDC = reliableDC + p.setupDataChannel(p.reliableDC) + + //3 data channels for file transfer + for i := 0; i < 3; i++ { + channel := fmt.Sprintf("file_%d", i) + fileChannel, err := p.pc.CreateDataChannel(channel, nil) // Unordered by default + if err != nil { + return "", fmt.Errorf("failed to create file channel %d: %w", i, err) + } + p.fileChannels[i] = fileChannel + p.setupDataChannel(fileChannel) + } + + offer, err := p.pc.CreateOffer(nil) + if err != nil { + return "", err + } + + gatherComplete := webrtc.GatheringCompletePromise(p.pc) + + if err := p.pc.SetLocalDescription(offer); err != nil { + return "", err + } + + select { + case <-gatherComplete: + case <-time.After(maxICEGatheringTimeout): + log.Println("ICE gathering timed out") + } + + offerJSON, err := json.Marshal(p.pc.LocalDescription()) + if err != nil { + return "", err + } + return string(offerJSON), nil } func (p *SimpleWebRTCPeer) HandleOffer(offerStr string) (string, error) { @@ -375,22 +407,24 @@ func (p *SimpleWebRTCPeer) GetConnectionState() ConnectionState { } func (p *SimpleWebRTCPeer) startKeepAlive() { + if p.keepAliveTick != nil { - return - } - p.keepAliveTick = time.NewTicker(keepAliveInterval) - go func() { - for { - select { - case <-p.keepAliveTick.C: - if err := p.SendJSON(map[string]string{"type": "ping"}); err != nil { - log.Printf("Failed to send keepalive: %v", err) - } - case <-p.closeCh: - return - } - } - }() + return + } + p.keepAliveTick = time.NewTicker(keepAliveInterval) + go func() { + for { + select { + case <-p.keepAliveTick.C: + // Use SendJSONReliable instead of SendJSON + if err := p.SendJSONReliable(map[string]string{"type": "ping"}); err != nil { + log.Printf("Failed to send keepalive: %v", err) + } + case <-p.closeCh: + return + } + } + }() } func (p *SimpleWebRTCPeer) stopKeepAlive() { diff --git a/internal/p2p/host.go b/internal/p2p/host.go index b8397bb..e35746e 100644 --- a/internal/p2p/host.go +++ b/internal/p2p/host.go @@ -58,7 +58,7 @@ func NewHost( } // Relay config (static relay on Render) - relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWS7jchAU23xcSYasitheTvTyBpjSx4KuRgj5rv5GBBYoB" + relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWMbTZL5taZH4CK9hCkTLkXaPadBoMR3KJZRFhbYBPrdkK" relayMaddr, err := ma.NewMultiaddr(relayAddrStr) if err != nil {