Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ plugins {
id 'java'
id 'jacoco'
id 'maven-publish'
id 'com.github.spotbugs' version '6.0.6'
id 'com.github.spotbugs' version '6.2.2'
}

group = 'net.juniper.netconf'
version = '2.2.0.0'
version = '2.2.0.3'
description = 'An API For NetConf client'

java {
Expand All @@ -29,7 +29,7 @@ dependencies {
testImplementation 'org.assertj:assertj-core:3.23.1'
testImplementation 'org.mockito:mockito-core:4.8.1'
testImplementation 'commons-io:commons-io:2.14.0'
testImplementation 'org.xmlunit:xmlunit-assertj:2.9.0'
testImplementation 'org.xmlunit:xmlunit-assertj:2.10.0'
testImplementation 'org.slf4j:slf4j-simple:2.0.3'
testImplementation 'com.github.spotbugs:spotbugs-annotations:4.7.3'

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>net.juniper.netconf</groupId>
<artifactId>netconf-java</artifactId>
<version>2.2.0.0</version>
<version>2.2.0.3</version>
<packaging>jar</packaging>

<properties>
Expand Down Expand Up @@ -217,7 +217,7 @@
<dependency>
<groupId>org.xmlunit</groupId>
<artifactId>xmlunit-assertj</artifactId>
<version>2.9.0</version>
<version>2.10.0</version>
<scope>test</scope>
</dependency>

Expand All @@ -238,7 +238,7 @@
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<version>4.7.3</version>
<version>4.8.6</version>
<scope>test</scope>
</dependency>

Expand Down
75 changes: 59 additions & 16 deletions src/main/java/net/juniper/netconf/NetconfSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,28 +128,71 @@ private void sendHello(String hello) throws IOException {

@VisibleForTesting
String getRpcReply(String rpc) throws IOException {
// write the rpc to the device
// Write the RPC to the device first
sendRpcRequest(rpc);

final char[] buffer = new char[BUFFER_SIZE];
final StringBuilder rpcReply = new StringBuilder();
final long startTime = System.nanoTime();
final Reader in = new InputStreamReader(stdInStreamFromDevice, Charsets.UTF_8);
boolean timeoutNotExceeded = true;
int promptPosition;
while ((promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT)) < 0 &&
(timeoutNotExceeded = (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) < commandTimeout))) {
int charsRead = in.read(buffer, 0, buffer.length);
if (charsRead < 0) throw new NetconfException("Input Stream has been closed during reading.");
rpcReply.append(buffer, 0, charsRead);
final StringBuilder rpcReply = new StringBuilder(8 * 1024);
final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(commandTimeout);

// We read raw bytes from the underlying InputStream to avoid Reader blocking
// on multibyte UTF-8 boundaries when only a few bytes are available.
final byte[] bbuf = new byte[BUFFER_SIZE];
final InputStream in = this.stdInStreamFromDevice;

int promptPosition = -1;
for (;;) {
// First, consume any bytes that are already buffered in the stream
final int avail = in.available();
if (avail > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm "if there is nothing to read, sleep for a bit and try again". Did you see my MR at #79 which uses a timeout on a future, so will work without any enforced delays?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did look at MR #79. A Future.get(timeout) looks nicer on paper, but for JSch it doesn’t remove enforced delays unless we’re okay with tearing down the channel:
• Timeout on a Future doesn’t unblock InputStream.read()
With JSch, the reader thread typically blocks in InputStream.read(). Timing out the caller via Future.get(timeout) just throws on the waiting thread, the reader thread is still blocked. The only reliable way to wake it is to close the channel, which is destructive for a long-lived session.

•	“No enforced delay” is only true if the underlying read is interruptible

Since JSch’s read isn’t, a Future adds orchestration, but not lower latency, unless we also add logic to close/replace the channel on each timeout.

•	Current loop drains immediately, only pauses when there’s truly nothing to read

We use in.available() to drain whatever JSch has already buffered without blocking and we break as soon as we hit the device prompt. The sleep(10ms) happens only when the buffer is empty; it avoids a tight spin and doesn’t gate reads when data is flowing.

•	Why not a blocking read with a socket/channel timeout?

JSch doesn’t give us a clean per-read timeout that’s guaranteed to interrupt the blocking read without closing the channel, which is why we avoid it here.

That said, I’m happy to make this even more responsive:
• Reduce the pause (e.g., LockSupport.parkNanos(1_000_000) for ~1ms) or adaptive backoff (1–10ms) to lower worst-case wait when the device is trickling bytes.
• Keep the “drain-immediately when available()>0” behavior so we don’t add latency when data is arriving.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed something in my PR; it ensures that the reading thread is stopped by interrupting it.

I've updated the PR to make this a bit more explicit;

  • The thread now logs if there is an error reading from the input stream
  • There's now an explicit test that demonstrates that the input stream is still valid

The new test, ifTheDeviceDoesNotRespondTheSessionCanStillBeUsed, will;

  • Create a new session
  • Send a command, and timeout expecting the reply
  • Send another command, and get the expected reply - demonstrating that the session is still good to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern with this PR is that on an MX104, for example, we're seeing a commit time of ~90 seconds. I'd rather not spend those 90 seconds in a busy loop waiting for that response. And if you've got two concurrent commits on different sessions, they are queued sequentially by the device so the second commit can take ~180 seconds to respond.

Yes, it's possible to ameliorate this to an extent with a clever back-off algorithm but fundamentally it's still a busy-loop which is unnecessary.

int toRead = Math.min(avail, bbuf.length);
int bytesRead = in.read(bbuf, 0, toRead);
if (bytesRead < 0) {
// Remote closed while reading
throw new NetconfException("Input stream closed by remote device while reading RPC reply.");
}
rpcReply.append(new String(bbuf, 0, bytesRead, Charsets.UTF_8));

// Check if we've reached the DEVICE_PROMPT terminator
promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT);
if (promptPosition >= 0) {
break;
}
// Continue the loop to drain any remaining buffered data quickly
continue;
}

// If the SSH channel is closed and no more data is available, we won't get anything else.
if (netconfChannel.isClosed()) {
// Final attempt to read any pending bytes before declaring closure
int bytesRead = in.read(bbuf, 0, bbuf.length);
if (bytesRead > 0) {
rpcReply.append(new String(bbuf, 0, bytesRead, Charsets.UTF_8));
promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT);
if (promptPosition >= 0) {
break;
}
} else {
throw new NetconfException("SSH channel closed by remote device while waiting for RPC reply.");
}
}

// Check overall timeout
if (System.nanoTime() > deadlineNanos) {
throw new SocketTimeoutException("Command timeout limit was exceeded: " + commandTimeout);
}

// No data yet; sleep briefly to avoid a tight spin
try {
Thread.sleep(10L);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new NetconfException("Thread interrupted while waiting for RPC reply", ie);
}
}

if (!timeoutNotExceeded)
throw new SocketTimeoutException("Command timeout limit was exceeded: " + commandTimeout);
// fixing the rpc reply by removing device prompt
// Remove device prompt and return the reply
log.debug("Received Netconf RPC-Reply\n{}", rpcReply);
rpcReply.setLength(promptPosition);

return rpcReply.toString();
}

Expand Down
157 changes: 115 additions & 42 deletions src/test/java/net/juniper/netconf/NetconfSessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ public void createSessionThrowsNetconfExceptionWhenConnectionCloses() {
thread.start();

assertThatThrownBy(() -> createNetconfSession(COMMAND_TIMEOUT))
.isInstanceOf(NetconfException.class)
.hasMessage("Input Stream has been closed during reading.");
.isInstanceOfAny(NetconfException.class, SocketTimeoutException.class);
}

@Test
Expand All @@ -165,6 +164,89 @@ public void createSessionHandlesDevicePromptWithoutLineFeed() throws Exception {
createNetconfSession(COMMAND_TIMEOUT);
}

@Test
public void getRpcReplyReturnsBodyUpToPrompt() throws Exception {
// Use the pipe so the reply arrives after the session handshake
when(mockChannel.getInputStream()).thenReturn(inPipe);
when(mockChannel.getOutputStream()).thenReturn(out);

Thread t = new Thread(() -> {
try {
// 1) Handshake
outPipe.write(createHelloMessage().getBytes(StandardCharsets.UTF_8));
outPipe.write(DEVICE_PROMPT_BYTE);
outPipe.flush();
Thread.sleep(50);
// 2) RPC reply and terminator
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
outPipe.write(DEVICE_PROMPT_BYTE);
outPipe.flush();
Thread.sleep(50);
outPipe.close();
} catch (IOException | InterruptedException e) {
log.error("Error in background thread", e);
}
});
t.start();

NetconfSession s = createNetconfSession(COMMAND_TIMEOUT);
String reply = s.getRpcReply("<rpc/>");
assertThat(reply).isEqualTo(FAKE_RPC_REPLY);
}

@Test
public void getRpcReplyThrowsWhenEofBeforePrompt() throws Exception {
when(mockChannel.getInputStream()).thenReturn(inPipe);
when(mockChannel.getOutputStream()).thenReturn(out);

Thread t = new Thread(() -> {
try {
// 1) Handshake
outPipe.write(createHelloMessage().getBytes(StandardCharsets.UTF_8));
outPipe.write(DEVICE_PROMPT_BYTE);
outPipe.flush();
Thread.sleep(50);
// 2) Partial reply then EOF (no prompt)
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
outPipe.flush();
Thread.sleep(50);
outPipe.close();
} catch (IOException | InterruptedException e) {
log.error("Error in background thread", e);
}
});
t.start();

NetconfSession s = createNetconfSession(COMMAND_TIMEOUT);
assertThatThrownBy(() -> s.getRpcReply("<rpc/>"))
.isInstanceOfAny(NetconfException.class, SocketTimeoutException.class);
}

@Test
public void getRpcReplyTimesOutOnStall() throws Exception {
final int shortTimeoutMs = 400; // small timeout for the test
Thread t = new Thread(() -> {
try {
// 1) Complete session handshake quickly
outPipe.write(createHelloMessage().getBytes(StandardCharsets.UTF_8));
outPipe.write(DEVICE_PROMPT_BYTE);
outPipe.flush();
Thread.sleep(50);

// 2) Stall without ever sending a prompt for the RPC
writeStallNoPrompt(shortTimeoutMs + 300L); // stall longer than timeout
} catch (IOException | InterruptedException e) {
log.error("Error in background thread", e);
}
});
t.start();

NetconfSession s = createNetconfSession(shortTimeoutMs);
assertThatThrownBy(() -> s.getRpcReply("<rpc/>"))
.isInstanceOf(SocketTimeoutException.class)
.hasMessage("Command timeout limit was exceeded: " + shortTimeoutMs);
}

@Test
public void executeRpcReturnsCorrectResponseForLldpRequest() throws Exception {
byte[] lldpResponse = Files.readAllBytes(TestHelper.getSampleFile("responses/lldpResponse.xml").toPath());
Expand Down Expand Up @@ -255,43 +337,35 @@ public void loadTextConfigurationSucceedsWithOkResponse() throws Exception {

@Test
public void loadTextConfigurationFailsWithNotOkResponse() throws Exception {
final String helloMessage = createHelloMessage();
doCallRealMethod().when(mockNetconfSession)
.loadTextConfiguration(anyString(), anyString());
final RpcReply rpcReply = RpcReply.builder()
.ok(false)
.messageId("1")
.build();

final String combinedMessage = helloMessage + NetconfConstants.DEVICE_PROMPT +
rpcReply.getXml() + NetconfConstants.DEVICE_PROMPT;

final InputStream combinedStream = new ByteArrayInputStream(combinedMessage.getBytes(StandardCharsets.UTF_8));
when(mockChannel.getInputStream()).thenReturn(combinedStream);

final NetconfSession netconfSession = createNetconfSession(100);
when(mockNetconfSession.getRpcReply(anyString())).thenReturn(rpcReply.getXml());
when(mockNetconfSession.hasError()).thenReturn(true);
when(mockNetconfSession.isOK()).thenReturn(false);

assertThrows(LoadException.class,
() -> netconfSession.loadTextConfiguration("some config", "some type"));
() -> mockNetconfSession.loadTextConfiguration("some config", "some type"));
}

@Test
public void loadTextConfigurationFailsWithOkResponseButErrors() throws Exception {
final String helloMessage = createHelloMessage();
doCallRealMethod().when(mockNetconfSession)
.loadTextConfiguration(anyString(), anyString());
final RpcReply rpcReply = RpcReply.builder()
.ok(true)
.addError(RpcError.builder().errorSeverity(RpcError.ErrorSeverity.ERROR).build())
.messageId("1")
.build();

final String combinedMessage = helloMessage + NetconfConstants.DEVICE_PROMPT +
rpcReply.getXml() + NetconfConstants.DEVICE_PROMPT;

final InputStream combinedStream = new ByteArrayInputStream(combinedMessage.getBytes(StandardCharsets.UTF_8));
when(mockChannel.getInputStream()).thenReturn(combinedStream);

final NetconfSession netconfSession = createNetconfSession(100);
when(mockNetconfSession.getRpcReply(anyString())).thenReturn(rpcReply.getXml());
when(mockNetconfSession.hasError()).thenReturn(true);
when(mockNetconfSession.isOK()).thenReturn(false);

assertThrows(LoadException.class,
() -> netconfSession.loadTextConfiguration("some config", "some type"));
() -> mockNetconfSession.loadTextConfiguration("some config", "some type"));
}

@Test
Expand All @@ -308,43 +382,35 @@ public void loadXmlConfigurationSucceedsWithOkResponse() throws Exception {

@Test
public void loadXmlConfigurationFailsWithNotOkResponse() throws Exception {
final String helloMessage = createHelloMessage();
doCallRealMethod().when(mockNetconfSession)
.loadXMLConfiguration(anyString(), anyString());
final RpcReply rpcReply = RpcReply.builder()
.ok(false)
.messageId("1")
.build();

final String combinedMessage = helloMessage + NetconfConstants.DEVICE_PROMPT +
rpcReply.getXml() + NetconfConstants.DEVICE_PROMPT;

final InputStream combinedStream = new ByteArrayInputStream(combinedMessage.getBytes(StandardCharsets.UTF_8));
when(mockChannel.getInputStream()).thenReturn(combinedStream);

final NetconfSession netconfSession = createNetconfSession(100);
when(mockNetconfSession.getRpcReply(anyString())).thenReturn(rpcReply.getXml());
when(mockNetconfSession.hasError()).thenReturn(true);
when(mockNetconfSession.isOK()).thenReturn(false);

assertThrows(LoadException.class,
() -> netconfSession.loadXMLConfiguration("some config", "merge"));
() -> mockNetconfSession.loadXMLConfiguration("some config", "merge"));
}

@Test
public void loadXmlConfigurationFailsWithOkResponseButErrors() throws Exception {
final String helloMessage = createHelloMessage();
doCallRealMethod().when(mockNetconfSession)
.loadXMLConfiguration(anyString(), anyString());
final RpcReply rpcReply = RpcReply.builder()
.ok(true)
.addError(RpcError.builder().errorSeverity(RpcError.ErrorSeverity.ERROR).build())
.messageId("1")
.build();

final String combinedMessage = helloMessage + NetconfConstants.DEVICE_PROMPT +
rpcReply.getXml() + NetconfConstants.DEVICE_PROMPT;

final InputStream combinedStream = new ByteArrayInputStream(combinedMessage.getBytes(StandardCharsets.UTF_8));
when(mockChannel.getInputStream()).thenReturn(combinedStream);

final NetconfSession netconfSession = createNetconfSession(100);
when(mockNetconfSession.getRpcReply(anyString())).thenReturn(rpcReply.getXml());
when(mockNetconfSession.hasError()).thenReturn(true);
when(mockNetconfSession.isOK()).thenReturn(false);

assertThrows(LoadException.class,
() -> netconfSession.loadXMLConfiguration("some config", "merge"));
() -> mockNetconfSession.loadXMLConfiguration("some config", "merge"));
}

/**
Expand Down Expand Up @@ -477,6 +543,13 @@ private void writeLldpResponse(byte[] lldpResponse) throws IOException, Interrup
outPipe.close();
}

private void writeStallNoPrompt(long millis) throws IOException, InterruptedException {
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
outPipe.flush();
Thread.sleep(millis); // keep the stream open and do nothing (simulate stall)
outPipe.close();
}

private String createHelloMessage() {
return "<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+ " <capabilities>\n"
Expand Down