Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setThreadFactory support #251

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/main/java/com/neovisionaries/ws/client/ConnectThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package com.neovisionaries.ws.client;


import java.util.concurrent.ThreadFactory;

class ConnectThread extends WebSocketThread
{
public ConnectThread(WebSocket ws)
public ConnectThread(ThreadFactory factory, WebSocket ws)
{
super("ConnectThread", ws, ThreadType.CONNECT_THREAD);
super(factory, "ConnectThread", ws, ThreadType.CONNECT_THREAD);
}


Expand Down
6 changes: 4 additions & 2 deletions src/main/java/com/neovisionaries/ws/client/FinishThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package com.neovisionaries.ws.client;


import java.util.concurrent.ThreadFactory;

class FinishThread extends WebSocketThread
{
public FinishThread(WebSocket ws)
public FinishThread(ThreadFactory factory, WebSocket ws)
{
super("FinishThread", ws, ThreadType.FINISH_THREAD);
super(factory, "FinishThread", ws, ThreadType.FINISH_THREAD);
}


Expand Down
12 changes: 7 additions & 5 deletions src/main/java/com/neovisionaries/ws/client/ReadingThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ThreadFactory;

import com.neovisionaries.ws.client.StateManager.CloseInitiator;


Expand All @@ -48,9 +50,9 @@ class ReadingThread extends WebSocketThread
private boolean mNotWaitForCloseFrame;


public ReadingThread(WebSocket websocket)
public ReadingThread(ThreadFactory factory, WebSocket websocket)
{
super("ReadingThread", websocket, ThreadType.READING_THREAD);
super(factory, "ReadingThread", websocket, ThreadType.READING_THREAD);

mPMCE = websocket.getPerMessageCompressionExtension();
}
Expand Down Expand Up @@ -137,7 +139,7 @@ void requestStop(long closeDelay)
// interrupt() here may not work. interrupt() in Java is different
// from signal-based interruption in C which unblocks a read() system
// call. Anyway, let's mark this thread as interrupted.
interrupt();
getThread().interrupt();

// To surely unblock a read() call, Socket.close() needs to be called.
// Or, shutdownInterrupt() may work, but it is not explicitly stated
Expand Down Expand Up @@ -361,7 +363,7 @@ private WebSocketFrame readFrame()
}
catch (IOException e)
{
if (mStopRequested && isInterrupted())
if (mStopRequested && getThread().isInterrupted())
{
// Socket.close() interrupted a blocking socket read operation.
// This thread has been interrupted intentionally.
Expand Down Expand Up @@ -1125,7 +1127,7 @@ private void waitForCloseFrame()
break;
}

if (isInterrupted())
if (getThread().isInterrupted())
{
break;
}
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/com/neovisionaries/ws/client/WebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.*;

import com.neovisionaries.ws.client.StateManager.CloseInitiator;


Expand Down Expand Up @@ -1104,6 +1102,7 @@ public class WebSocket
{
private static final long DEFAULT_CLOSE_DELAY = 10 * 1000L;
private final WebSocketFactory mWebSocketFactory;
private final ThreadFactory mThreadFactory;
private final SocketConnector mSocketConnector;
private final StateManager mStateManager;
private HandshakeBuilder mHandshakeBuilder;
Expand Down Expand Up @@ -1139,6 +1138,7 @@ public class WebSocket
String host, String path, SocketConnector connector)
{
mWebSocketFactory = factory;
mThreadFactory = factory.getThreadFactory();
mSocketConnector = connector;
mStateManager = new StateManager();
mHandshakeBuilder = new HandshakeBuilder(secure, userInfo, host, path);
Expand Down Expand Up @@ -2451,14 +2451,14 @@ public Callable<WebSocket> connectable()
*/
public WebSocket connectAsynchronously()
{
Thread thread = new ConnectThread(this);
WebSocketThread thread = new ConnectThread(mThreadFactory, this);

// Get the reference (just in case)
ListenerManager lm = mListenerManager;

if (lm != null)
{
lm.callOnThreadCreated(ThreadType.CONNECT_THREAD, thread);
lm.callOnThreadCreated(ThreadType.CONNECT_THREAD, thread.getThread());
}

thread.start();
Expand Down Expand Up @@ -3454,8 +3454,8 @@ private Map<String, List<String>> readHandshake(WebSocketInputStream input, Stri
*/
private void startThreads()
{
ReadingThread readingThread = new ReadingThread(this);
WritingThread writingThread = new WritingThread(this);
ReadingThread readingThread = new ReadingThread(mThreadFactory, this);
WritingThread writingThread = new WritingThread(mThreadFactory, this);

synchronized (mThreadsLock)
{
Expand Down Expand Up @@ -3765,7 +3765,7 @@ void finish()
*/
private void finishAsynchronously()
{
WebSocketThread thread = new FinishThread(this);
WebSocketThread thread = new FinishThread(mThreadFactory, this);

// Execute onThreadCreated() of the listeners.
thread.callOnThreadCreated();
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/neovisionaries/ws/client/WebSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.concurrent.ThreadFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
Expand All @@ -39,6 +40,7 @@ public class WebSocketFactory
private int mDualStackFallbackDelay = 250;
private boolean mVerifyHostname = true;
private String[] mServerNames;
private ThreadFactory mThreadFactory;


/**
Expand Down Expand Up @@ -84,6 +86,39 @@ public WebSocketFactory(WebSocketFactory other)
}
}

/**
* Get the thread factory that has been set by {@link
* #setThreadFactory(ThreadFactory)}.
*
* @return
* The thread factory.
*/
public ThreadFactory getThreadFactory()
{
return this.mThreadFactory;
}


/**
* Set a thread factory.
* The thread name is overwritten after creation.
*
* <p>
* Use {@link WebSocketListener#onThreadCreated(WebSocket, ThreadType, Thread)} for further configuration.
* </p>
*
* @param factory
* A thread factory.
*
* @return
* {@code this} instance.
*/
public WebSocketFactory setThreadFactory(ThreadFactory factory)
{
this.mThreadFactory = factory;
return this;
}


/**
* Get the socket factory that has been set by {@link
Expand Down
30 changes: 22 additions & 8 deletions src/main/java/com/neovisionaries/ws/client/WebSocketThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,52 @@
package com.neovisionaries.ws.client;


abstract class WebSocketThread extends Thread
import java.util.concurrent.ThreadFactory;

abstract class WebSocketThread implements Runnable
{
protected final WebSocket mWebSocket;
private final ThreadType mThreadType;
private final Thread mThread;


WebSocketThread(String name, WebSocket ws, ThreadType type)
WebSocketThread(ThreadFactory factory, String name, WebSocket ws, ThreadType type)
{
super(name);

mWebSocket = ws;
mThreadType = type;
mThread = factory == null ? new Thread(this) : factory.newThread(this);
mThread.setName(name);
}


public Thread getThread()
{
return mThread;
}


public void start()
{
mThread.start();
}


@Override
public void run()
{
ListenerManager lm = mWebSocket.getListenerManager();

if (lm != null)
{
// Execute onThreadStarted() of the listeners.
lm.callOnThreadStarted(mThreadType, this);
lm.callOnThreadStarted(mThreadType, getThread());
}

runMain();

if (lm != null)
{
// Execute onThreadStopping() of the listeners.
lm.callOnThreadStopping(mThreadType, this);
lm.callOnThreadStopping(mThreadType, getThread());
}
}

Expand All @@ -58,7 +72,7 @@ public void callOnThreadCreated()

if (lm != null)
{
lm.callOnThreadCreated(mThreadType, this);
lm.callOnThreadCreated(mThreadType, getThread());
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/com/neovisionaries/ws/client/WritingThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static com.neovisionaries.ws.client.WebSocketState.CLOSING;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ThreadFactory;

import com.neovisionaries.ws.client.StateManager.CloseInitiator;


Expand All @@ -38,9 +40,9 @@ class WritingThread extends WebSocketThread
private boolean mStopped;


public WritingThread(WebSocket websocket)
public WritingThread(ThreadFactory factory, WebSocket websocket)
{
super("WritingThread", websocket, ThreadType.WRITING_THREAD);
super(factory, "WritingThread", websocket, ThreadType.WRITING_THREAD);

mFrames = new LinkedList<WebSocketFrame>();
mPMCE = websocket.getPerMessageCompressionExtension();
Expand Down