Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If the number of handlers is large, split Conference.sendOut on a task queue. #1062

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Draft
81 changes: 57 additions & 24 deletions src/main/java/org/jitsi/videobridge/Conference.java
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,11 @@ public ConferenceShim getShim()
return shim;
}

/** The maximum number of packet handlers we want to execute in one task.
* TODO: this number is pulled out of the air; tune it.
*/
public static final int MAX_HANDLERS_PER_TASK = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this may depend on the number of cores a machine has. Shall we make it configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly; this was just a quick and dirty check to see how much difference this made.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already populate the CPU_POOL based on the number of cores, so if anything maybe we'd want to base this on the size of the cpu pool, so we only make this decision in one place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at YourKit, AbstractExectorService.submit seems to spend a fair amount of its time in LinkedBlockingQueue.offer which has to acquire a lock, so we may well want more tasks than cores here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, we probably just shouldn't be using offer there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's inside the Java implementation -- and never mind, I was looking at putting things on the executor queue, not the execution of the queues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see what you mean now. 👍


/**
* Broadcasts the packet to all endpoints and tentacles that want it.
*
Expand All @@ -1154,45 +1159,73 @@ public ConferenceShim getShim()
private void sendOut(PacketInfo packetInfo)
{
String sourceEndpointId = packetInfo.getEndpointId();
// We want to avoid calling 'clone' for the last receiver of this packet
// since it's unnecessary. To do so, we'll wait before we clone and send
// to an interested handler until after we've determined another handler
// is also interested in the packet. We'll give the last handler the
// original packet (without cloning).
PotentialPacketHandler prevHandler = null;
for (Endpoint endpoint : endpointsCache)

List<Runnable> tasks = new ArrayList<>();
PacketInfoDistributor distributor = new PacketInfoDistributor(packetInfo, 0);

for (Endpoint e: endpointsCache)
{
if (endpoint.getID().equals(sourceEndpointId))
if (e.getID().equals(sourceEndpointId))
{
continue;
}

if (endpoint.wants(packetInfo))
{
if (prevHandler != null)
{
prevHandler.send(packetInfo.clone());
}
prevHandler = endpoint;
}
tasks.add(() -> doSendOut(distributor, e));
}
if (tentacle != null && tentacle.wants(packetInfo))
if (tentacle != null)
{
if (prevHandler != null)
tasks.add(() -> doSendOut(distributor, tentacle));
}

distributor.setCount(tasks.size());

if (tasks.isEmpty()) {
return;
}
else if (tasks.size() == 1) {
tasks.get(0).run();
return;
}

List<Future<?>> taskFutures = new ArrayList<>();

for (Runnable task : tasks)
{
taskFutures.add(TaskPools.CPU_POOL.submit(task));
}

/* This doesn't work - these tasks will be on the same task pool as this
code, causing deadlocks. */
/*
for (Future<?> task: taskFutures)
{
try
{
prevHandler.send(packetInfo.clone());
task.get();
}
catch (InterruptedException | ExecutionException e)
{
logger.warn("Exception waiting for sendOut", e);
}
prevHandler = tentacle;
}
*/
}

if (prevHandler != null)
/**
* Handles a packet for one packet handler.
*
* @param distributor A distributor for the packet
* @param handler the packet handlers to send it to
*/
private static void doSendOut(PacketInfoDistributor distributor, PotentialPacketHandler handler)
{
if (handler.wants(distributor.previewPacketInfo()))
{
prevHandler.send(packetInfo);
distributor.usePacketInfoReference(handler::send);
}
else
{
// No one wanted the packet, so the buffer is now free!
ByteBufferPool.returnBuffer(packetInfo.getPacket().getBuffer());
distributor.releasePacketInfoReference();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/jitsi/videobridge/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public double getRtt()
* {@inheritDoc}
*/
@Override
public boolean wants(PacketInfo packetInfo)
public synchronized boolean wants(PacketInfo packetInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be synchronized now? We're only parallelizing the send out of a single packet at a time, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the receive pipeline farms out the parallelized sendOut calls and then returns, without waiting for the executor tasks, there's nothing stopping the next packet's receive pipeline's executor tasks from starting while the previous one's are still running.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think this could result in reordering the packets: I don't think we'd guarantee that the lock on Endpoint A will be taken when processing packet "1" before a thread handling packet "2" for Endpoint A is scheduled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. It's not the end of the world if it happens; worst case the destination sends a spurious NACK.

Alternately we could synchronize the sender to wait for all its tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm thinking something more like the latter as maintaining packet order within our own pipeline would be desirable, I think. Depending on how fancy we want/need to get, we could also look at mailboxes per source endpoint for the packets, and we only process one packet from one endpoint at a time (but could still parallelize across packets for multiple endpoints).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I discovered that synchronizing to wait for the tasks won't work, because they're executed on the same task pool as the rtp receiver, and an earlier task waiting for a later task is a deadlock.

I don't quite follow your description of the fancier methods, but they potentially sound promising if this turns out to be needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that we could do something like:

  • Have a Map<String, PacketInfoQueue>, where the key is an endpoint ID in the conference.
  • Receiver thread drops the 'fully processed' incoming packet from its endpoint into the appropriate queue.

Now, this functions the same way our other queues do (we process the packets within a single queue serially), we just have N of them. Basically mirror the input queue we have for each endpoint already. This way we can parallelize the work across endpoints, but not within a single endpoint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see.

The disadvantage (over what I have now) is that the packet cloning optimization is weaker -- right now as long as at least one of the last batch of packet handlers wants the packet, we get the optimization, whereas with this the very last one would need to.

If we do have these queues, I'm not sure there's an architectural difference between them and the RTP sender queues. Is there any way we could just inject these as a leading node (or other process) on the RTP sender rather than having a separate, third set of queues?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disadvantage (over what I have now) is that the packet cloning optimization is weaker -- right now as long as at least one of the last batch of packet handlers wants the packet, we get the optimization, whereas with this the very last one would need to.

Hmm, maybe I'm missing something here. The optimization should be the same, as we're still looking at every potential "sender" for a given ingress packet, so we can know how many. We just don't do anything with the next packet for that same "receiver" until we're done with the previous one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as there's more than one reference outstanding in the PacketInfoDistributor, it has to clone, rather than hand out its reference. It's only on the very last reference that it can optimize by handing out a non-cloned buffer. If the last reference turns out not to want it, we lose the optimization.

If every projection processing runs in parallel, we have to have a reference for every sender, rather than a reference for every batch of senders.

{
if (!isTransportConnected())
{
Expand Down Expand Up @@ -441,7 +441,7 @@ else if (packet instanceof RtcpFbPliPacket ||
* TODO Brian
*/
@Override
public void send(PacketInfo packetInfo)
public synchronized void send(PacketInfo packetInfo)
{
Packet packet = packetInfo.getPacket();
if (packet instanceof VideoRtpPacket)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/jitsi/videobridge/octo/OctoTentacle.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void addPayloadType(PayloadType payloadType)
* {@inheritDoc}
*/
@Override
public void send(PacketInfo packetInfo)
public synchronized void send(PacketInfo packetInfo)
{
Packet packet = packetInfo.getPacket();
if (packet != null)
Expand Down
125 changes: 125 additions & 0 deletions src/main/java/org/jitsi/videobridge/util/PacketInfoDistributor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright @ 2018 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.jitsi.videobridge.util;

import org.jitsi.nlj.*;

import java.util.function.*;

public class PacketInfoDistributor
{
private final PacketInfo packetInfo;
private int totalReferences;
private int outstandingReferences;

private Consumer<PacketInfo> prevConsumer = null;

public PacketInfoDistributor(PacketInfo pi, int count)
{
packetInfo = pi;
totalReferences = count;
outstandingReferences = count;
/* We create a PacketInfoDistributor on each packet so we don't want to instantiate a new logger each time. */
}

public void setCount(int count)
{
if (outstandingReferences < totalReferences)
{
throw new IllegalStateException("Count set after references have been taken");
}
totalReferences = count;
outstandingReferences = count;
}

public synchronized PacketInfo previewPacketInfo()
{
if (outstandingReferences <= 0)
{
throw new IllegalStateException("Packet previewed after all references taken");
}
return packetInfo;
}

public void usePacketInfoReference(Consumer<PacketInfo> consumer)
{
// We want to avoid calling 'clone' for the last receiver of this packet
// since it's unnecessary. To do so, we'll wait before we clone and send
// to an interested handler until after we've determined another handler
// is also interested in the packet. We'll give the last handler the
// original packet (without cloning).
Consumer<PacketInfo> c1 = null, c2 = null;
PacketInfo p1 = null, p2 = null;
synchronized(this) {
outstandingReferences--;
if (outstandingReferences < 0)
{
throw new IllegalStateException("Too many references taken");
}
if (prevConsumer != null) {
c1 = prevConsumer;
p1 = packetInfo.clone();
}
prevConsumer = consumer;
if (outstandingReferences == 0)
{
c2 = prevConsumer;
p2 = packetInfo;
}
}

if (c1 != null)
{
c1.accept(p1);
}
if (c2 != null)
{
c2.accept(p2);
}
}

public void releasePacketInfoReference()
{
Consumer<PacketInfo> c = null;
PacketInfo p = null;
synchronized (this)
{
outstandingReferences--;
if (outstandingReferences < 0)
{
throw new IllegalStateException("Too many references taken");
}
if (outstandingReferences == 0)
{
if (prevConsumer != null)
{
c = prevConsumer;
p = packetInfo;
}
else
{
ByteBufferPool.returnBuffer(packetInfo.getPacket().getBuffer());
}
}
}
if (c != null)
{
c.accept(p);
}
}
}