Skip to content

Commit

Permalink
Start implementing IO#timeout
Browse files Browse the repository at this point in the history
This only implements it for a couple forms of read and for
Socket#connect. Other places still need to be wired up.
  • Loading branch information
headius committed Feb 27, 2025
1 parent 3a7f3a6 commit c0e5008
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 11 deletions.
26 changes: 18 additions & 8 deletions core/src/main/java/org/jruby/RubyIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -4127,14 +4127,7 @@ private static long prepareTimeout(ThreadContext context, IRubyObject[] argv) {
timeout = context.nil;
}

if (timeout.isNil()) {
tv = -1;
}
else {
tv = (long)(RubyTime.convertTimeInterval(context, timeout) * 1000);
if (tv < 0) throw argumentError(context, "time interval must be positive");
}
return tv;
return RubyThread.prepareTimeout(context, timeout);
}

// MRI: rb_io_advise
Expand Down Expand Up @@ -5186,6 +5179,23 @@ private static IRubyObject doWait(ThreadContext context, RubyIO io, OpenFile fpt
return context.nil;
}

@JRubyMethod(name = "timeout")
public IRubyObject timeout(ThreadContext context) {
return getOpenFile().timeout();
}

@JRubyMethod(name = "timeout=")
public IRubyObject setTimeout(ThreadContext context, IRubyObject timeout) {
// validate timeout
if (timeout.isTrue()) {
RubyTime.convertTimeInterval(context, timeout);
}

getOpenFile().timeout(timeout);

return timeout;
}

@JRubyMethod(name = {"path", "to_path"})
public IRubyObject path(ThreadContext context) {
final String path = getPath();
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/java/org/jruby/RubyThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,37 @@ public boolean select(Channel channel, RubyIO io, int ops, long timeout) {
return select(channel, io == null ? null : io.getOpenFile(), ops, timeout);
}

public boolean selectFor(ThreadContext context, Channel channel, OpenFile fptr, int ops) {
long timeout = prepareTimeout(context, fptr.timeout());
boolean result = select(channel, fptr, ops, timeout);

if (timeout >= 0 && result == false) {
throw RubyIO.RubyIOTimeoutError.newIOTimeoutError(context.runtime, "IO operation timed out").toThrowable();
}

return result;
}

public boolean selectFor(ThreadContext context, Channel channel, RubyIO io, int ops) {
if (io == null) {
return context.getThread().select(channel, io, ops);
}

return selectFor(context, channel, io.getOpenFile(), ops);
}

public static long prepareTimeout(ThreadContext context, IRubyObject timeout) {
long tv;
if (timeout.isNil()) {
tv = -1;
}
else {
tv = (long)(RubyTime.convertTimeInterval(context, timeout) * 1000);
if (tv < 0) throw argumentError(context, "time interval must be positive");
}
return tv;
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/jruby/ext/socket/RubySocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.api.Access;
Expand Down Expand Up @@ -539,7 +540,11 @@ private boolean tryConnect(ThreadContext context, Channel channel, SocketAddress

if (!blocking || result) return result;

while (!context.getThread().select(channel, this, SelectionKey.OP_CONNECT)) {
while (true) {
boolean selected = context.getThread().selectFor(context, channel, this, SelectionKey.OP_CONNECT);

if (selected) break;

context.blockingThreadPoll();
}
}
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/java/org/jruby/util/io/OpenFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public OpenFile(RubyIO io, IRubyObject nil) {
writeconvPreEcopts = nil;
encs.ecopts = nil;
posix = new PosixShim(runtime);
timeout = nil;
fiberScheduler = Options.FIBER_SCHEDULER.load();
}

Expand Down Expand Up @@ -170,6 +171,8 @@ public static class Buffer {

private final boolean fiberScheduler;

private IRubyObject timeout;

public void clearStdio() {
stdio_file = null;
}
Expand Down Expand Up @@ -1621,7 +1624,11 @@ private static void selectForRead(ThreadContext context, OpenFile fptr, ChannelF
&& !fptr.nonblock) {

// keep selecting for read until ready, polling each time we wake up
while (!context.getThread().select(fd.chSelect, fptr, SelectionKey.OP_READ)) {
while (true) {
boolean result = context.getThread().selectFor(context, fd.chSelect, fptr, SelectionKey.OP_READ);

if (result) break;

context.pollThreadEvents();
}
}
Expand Down Expand Up @@ -1660,7 +1667,7 @@ boolean waitReadable(ThreadContext context, ChannelFD fd) {
if (fd.chSelect != null) {
unlock();
try {
return context.getThread().select(fd.chSelect, this, SelectionKey.OP_READ);
return context.getThread().selectFor(context, fd.chSelect, this, SelectionKey.OP_READ);
} finally {
lock();
}
Expand Down Expand Up @@ -3042,6 +3049,14 @@ public boolean lockedByMe() {
return lock.isHeldByCurrentThread();
}

public IRubyObject timeout() {
return timeout;
}

public void timeout(IRubyObject timeout) {
this.timeout = timeout;
}

@Deprecated
public long binwrite(ThreadContext context, byte[] ptrBytes, int ptr, int len, boolean nosync) {
return binwriteInt(context, ptrBytes, ptr, len, nosync);
Expand Down

0 comments on commit c0e5008

Please sign in to comment.