Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

允许在上传/下载开始之后继续添加新的listener #54

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public void run() {
} catch (IOException e) {
e.printStackTrace();
//当外部发生错误时,使用此方法可以通知所有监听器的 onError 方法
ProgressManager.getInstance().notifyOnErorr(mNewUploadUrl, e);
ProgressManager.getInstance().notifyOnError(mNewUploadUrl, e);
}
}
}).start();
Expand Down Expand Up @@ -317,7 +317,7 @@ public void run() {
} catch (IOException e) {
e.printStackTrace();
//当外部发生错误时,使用此方法可以通知所有监听器的 onError 方法
ProgressManager.getInstance().notifyOnErorr(mNewDownloadUrl, e);
ProgressManager.getInstance().notifyOnError(mNewDownloadUrl, e);
}
}
}).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void run() {
} catch (IOException e) {
e.printStackTrace();
//当外部发生错误时,使用此方法可以通知所有监听器的 onError 方法
ProgressManager.getInstance().notifyOnErorr(mUploadUrl, e);
ProgressManager.getInstance().notifyOnError(mUploadUrl, e);
}
}
}).start();
Expand Down Expand Up @@ -330,7 +330,7 @@ public void run() {
} catch (IOException e) {
e.printStackTrace();
//当外部发生错误时,使用此方法可以通知所有监听器的 onError 方法
ProgressManager.getInstance().notifyOnErorr(mDownloadUrl, e);
ProgressManager.getInstance().notifyOnError(mDownloadUrl, e);
}
}
}).start();
Expand Down
107 changes: 66 additions & 41 deletions progress/src/main/java/me/jessyan/progressmanager/ProgressManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@

import java.io.IOException;
import java.lang.ref.ReferenceQueue;
import java.util.LinkedList;
import java.util.List;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;

import me.jessyan.progressmanager.body.ProgressInfo;
Expand Down Expand Up @@ -61,8 +62,8 @@ public final class ProgressManager {
* 为什么这样做? 因为如果直接使用 String mUrl = "url", 这个 {@code url} 字符串会被加入全局字符串常量池, 池中的字符串将不会被回收
* 既然 {@code key} 没被回收, 那 {@link WeakHashMap} 中的值也不会被移除
*/
private final Map<String, List<ProgressListener>> mRequestListeners = new WeakHashMap<>();
private final Map<String, List<ProgressListener>> mResponseListeners = new WeakHashMap<>();
private final Map<String, Set<ProgressListener>> mRequestListeners = new HashMap<>();
private final Map<String, Set<ProgressListener>> mResponseListeners = new HashMap<>();
private final Handler mHandler; //所有监听器在 Handler 中被执行,所以可以保证所有监听器在主线程中被执行
private final Interceptor mInterceptor;
private int mRefreshTime = DEFAULT_REFRESH_TIME; //进度刷新时间(单位ms),避免高频率调用
Expand Down Expand Up @@ -134,11 +135,12 @@ public void setRefreshTime(int refreshTime) {
public void addRequestListener(String url, ProgressListener listener) {
checkNotNull(url, "url cannot be null");
checkNotNull(listener, "listener cannot be null");
List<ProgressListener> progressListeners;
url = new String(url);
Set<ProgressListener> progressListeners;
synchronized (ProgressManager.class) {
progressListeners = mRequestListeners.get(url);
if (progressListeners == null) {
progressListeners = new LinkedList<>();
progressListeners = Collections.newSetFromMap(new WeakHashMap<ProgressListener, Boolean>());
mRequestListeners.put(url, progressListeners);
}
}
Expand All @@ -154,11 +156,12 @@ public void addRequestListener(String url, ProgressListener listener) {
public void addResponseListener(String url, ProgressListener listener) {
checkNotNull(url, "url cannot be null");
checkNotNull(listener, "listener cannot be null");
List<ProgressListener> progressListeners;
url = new String(url);
Set<ProgressListener> progressListeners;
synchronized (ProgressManager.class) {
progressListeners = mResponseListeners.get(url);
if (progressListeners == null) {
progressListeners = new LinkedList<>();
progressListeners = Collections.newSetFromMap(new WeakHashMap<ProgressListener, Boolean>());
mResponseListeners.put(url, progressListeners);
}
}
Expand All @@ -169,13 +172,13 @@ public void addResponseListener(String url, ProgressListener listener) {
/**
* 当在 {@link ProgressRequestBody} 和 {@link ProgressResponseBody} 内部处理二进制流时发生错误
* 会主动调用 {@link ProgressListener#onError(long, Exception)},但是有些错误并不是在它们内部发生的
* 但同样会引起网络请求的失败,所以向外面提供{@link ProgressManager#notifyOnErorr},当外部发生错误时
* 但同样会引起网络请求的失败,所以向外面提供{@link ProgressManager#notifyOnError},当外部发生错误时
* 手动调用此方法,以通知所有的监听器
*
* @param url {@code url} 作为标识符
* @param e 错误
*/
public void notifyOnErorr(String url, Exception e) {
public void notifyOnError(String url, Exception e) {
checkNotNull(url, "url cannot be null");
forEachListenersOnError(mRequestListeners, url, e);
forEachListenersOnError(mResponseListeners, url, e);
Expand Down Expand Up @@ -209,13 +212,21 @@ public Request wrapRequestBody(Request request) {

if (request.body() == null)
return request;
if (mRequestListeners.containsKey(key)) {
List<ProgressListener> listeners = mRequestListeners.get(key);
return request.newBuilder()
.method(request.method(), new ProgressRequestBody(mHandler, request.body(), listeners, mRefreshTime))
.build();

//Double Check
Set<ProgressListener> listeners = mRequestListeners.get(key);
if (listeners == null){
synchronized (ProgressManager.class){
listeners = mRequestListeners.get(key);
if (listeners == null){
listeners = Collections.newSetFromMap(new WeakHashMap<ProgressListener, Boolean>());
mRequestListeners.put(key,listeners);
}
}
}
return request;
return request.newBuilder()
.method(request.method(), new ProgressRequestBody(mHandler, request.body(), listeners, mRefreshTime))
.build();
}

/**
Expand Down Expand Up @@ -263,13 +274,19 @@ public Response wrapResponseBody(Response response) {
if (response.body() == null)
return response;

if (mResponseListeners.containsKey(key)) {
List<ProgressListener> listeners = mResponseListeners.get(key);
return response.newBuilder()
.body(new ProgressResponseBody(mHandler, response.body(), listeners, mRefreshTime))
.build();
Set<ProgressListener> listeners = mResponseListeners.get(key);
if (listeners == null){
synchronized (ProgressManager.class){
listeners = mResponseListeners.get(key);
if (listeners == null){
listeners = Collections.newSetFromMap(new WeakHashMap<ProgressListener, Boolean>());
mResponseListeners.put(key,listeners);
}
}
}
return response;
return response.newBuilder()
.body(new ProgressResponseBody(mHandler, response.body(), listeners, mRefreshTime))
.build();
}

/**
Expand Down Expand Up @@ -334,7 +351,7 @@ public String addDiffResponseListenerOnSameUrl(String originUrl, ProgressListene
* } catch (IOException e) {
* e.printStackTrace();
* //当外部发生错误时,使用此方法可以通知所有监听器的 onError 方法,这里也要使用 newUrl
* ProgressManager.getInstance().notifyOnErorr(newUrl, e);
* ProgressManager.getInstance().notifyOnError(newUrl, e);
* }
* }
* }).start();
Expand Down Expand Up @@ -401,7 +418,7 @@ public String addDiffRequestListenerOnSameUrl(String originUrl, ProgressListener
* } catch (IOException e) {
* e.printStackTrace();
* //当外部发生错误时,使用此方法可以通知所有监听器的 onError 方法,这里也要使用 newUrl
* ProgressManager.getInstance().notifyOnErorr(newUrl, e);
* ProgressManager.getInstance().notifyOnError(newUrl, e);
* }
* }
* }).start();
Expand All @@ -425,23 +442,31 @@ public String addDiffRequestListenerOnSameUrl(String originUrl, String key, Prog
* @param response 原始的 {@link Response}
* @param url {@code url} 地址
*/
private String resolveRedirect(Map<String, List<ProgressListener>> map, Response response, String url) {
private String resolveRedirect(Map<String, Set<ProgressListener>> map, Response response, String url) {
String location = null;
List<ProgressListener> progressListeners = map.get(url); //查看此重定向 url ,是否已经注册过监听器
if (progressListeners != null && progressListeners.size() > 0) {
location = response.header(LOCATION_HEADER);// 重定向地址
if (!TextUtils.isEmpty(location)) {
if (url.contains(IDENTIFICATION_NUMBER) && !location.contains(IDENTIFICATION_NUMBER)) { //如果 url 有标识符,那也将标识符加入用于重定向的 location
location += url.substring(url.indexOf(IDENTIFICATION_NUMBER), url.length());
Set<ProgressListener> progressListeners = map.get(url); //查看此重定向 url ,是否已经注册过监听器
if (progressListeners == null){
synchronized (ProgressManager.class){
progressListeners = map.get(url);
if (progressListeners == null){
progressListeners = Collections.newSetFromMap(new WeakHashMap<ProgressListener, Boolean>());
map.put(url,progressListeners);
}
if (!map.containsKey(location)) {
map.put(location, progressListeners); //将需要重定向地址的监听器,提供给重定向地址,保证重定向后也可以监听进度
} else {
List<ProgressListener> locationListener = map.get(location);
for (ProgressListener listener : progressListeners) {
if (!locationListener.contains(listener)) {
locationListener.add(listener);
}
}
}

location = response.header(LOCATION_HEADER);// 重定向地址
if (!TextUtils.isEmpty(location)) {
if (url.contains(IDENTIFICATION_NUMBER) && !location.contains(IDENTIFICATION_NUMBER)) { //如果 url 有标识符,那也将标识符加入用于重定向的 location
location += url.substring(url.indexOf(IDENTIFICATION_NUMBER), url.length());
}
if (!map.containsKey(location)) {
map.put(location, progressListeners); //将需要重定向地址的监听器,提供给重定向地址,保证重定向后也可以监听进度
} else {
Set<ProgressListener> locationListeners = map.get(location);
for (ProgressListener listener : progressListeners) {
if (!locationListeners.contains(listener)) {
locationListeners.add(listener);
}
}
}
Expand All @@ -450,9 +475,9 @@ private String resolveRedirect(Map<String, List<ProgressListener>> map, Response
}


private void forEachListenersOnError(Map<String, List<ProgressListener>> map, String url, Exception e) {
private void forEachListenersOnError(Map<String, Set<ProgressListener>> map, String url, Exception e) {
if (map.containsKey(url)) {
List<ProgressListener> progressListeners = map.get(url);
Set<ProgressListener> progressListeners = map.get(url);
ProgressListener[] array = progressListeners.toArray(new ProgressListener[progressListeners.size()]);
for (int i = 0; i < array.length; i++) {
array[i].onError(-1, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import android.os.SystemClock;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import me.jessyan.progressmanager.ProgressListener;
import okhttp3.MediaType;
Expand All @@ -44,14 +44,14 @@ public class ProgressRequestBody extends RequestBody {
protected Handler mHandler;
protected int mRefreshTime;
protected final RequestBody mDelegate;
protected final ProgressListener[] mListeners;
protected final Set<ProgressListener> mListeners;
protected final ProgressInfo mProgressInfo;
private BufferedSink mBufferedSink;


public ProgressRequestBody(Handler handler, RequestBody delegate, List<ProgressListener> listeners, int refreshTime) {
public ProgressRequestBody(Handler handler, RequestBody delegate, Set<ProgressListener> listeners, int refreshTime) {
this.mDelegate = delegate;
this.mListeners = listeners.toArray(new ProgressListener[listeners.size()]);
this.mListeners = listeners;
this.mHandler = handler;
this.mRefreshTime = refreshTime;
this.mProgressInfo = new ProgressInfo(System.currentTimeMillis());
Expand Down Expand Up @@ -82,8 +82,8 @@ public void writeTo(BufferedSink sink) throws IOException {
mBufferedSink.flush();
} catch (IOException e) {
e.printStackTrace();
for (int i = 0; i < mListeners.length; i++) {
mListeners[i].onError(mProgressInfo.getId(), e);
for (ProgressListener listener:mListeners) {
listener.onError(mProgressInfo.getId(), e);
}
throw e;
}
Expand All @@ -104,8 +104,8 @@ public void write(Buffer source, long byteCount) throws IOException {
super.write(source, byteCount);
} catch (IOException e) {
e.printStackTrace();
for (int i = 0; i < mListeners.length; i++) {
mListeners[i].onError(mProgressInfo.getId(), e);
for (ProgressListener listener:mListeners) {
listener.onError(mProgressInfo.getId(), e);
}
throw e;
}
Expand All @@ -114,14 +114,13 @@ public void write(Buffer source, long byteCount) throws IOException {
}
totalBytesRead += byteCount;
tempSize += byteCount;
if (mListeners != null) {
if (!mListeners.isEmpty()) {
long curTime = SystemClock.elapsedRealtime();
if (curTime - lastRefreshTime >= mRefreshTime || totalBytesRead == mProgressInfo.getContentLength()) {
final long finalTempSize = tempSize;
final long finalTotalBytesRead = totalBytesRead;
final long finalIntervalTime = curTime - lastRefreshTime;
for (int i = 0; i < mListeners.length; i++) {
final ProgressListener listener = mListeners[i];
for (final ProgressListener listener:mListeners) {
mHandler.post(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import android.os.SystemClock;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import me.jessyan.progressmanager.ProgressListener;
import okhttp3.MediaType;
Expand All @@ -44,13 +44,13 @@ public class ProgressResponseBody extends ResponseBody {
protected Handler mHandler;
protected int mRefreshTime;
protected final ResponseBody mDelegate;
protected final ProgressListener[] mListeners;
protected final Set<ProgressListener> mListeners;
protected final ProgressInfo mProgressInfo;
private BufferedSource mBufferedSource;

public ProgressResponseBody(Handler handler, ResponseBody responseBody, List<ProgressListener> listeners, int refreshTime) {
public ProgressResponseBody(Handler handler, ResponseBody responseBody, Set<ProgressListener> listeners, int refreshTime) {
this.mDelegate = responseBody;
this.mListeners = listeners.toArray(new ProgressListener[listeners.size()]);
this.mListeners = listeners;
this.mHandler = handler;
this.mRefreshTime = refreshTime;
this.mProgressInfo = new ProgressInfo(System.currentTimeMillis());
Expand Down Expand Up @@ -87,8 +87,8 @@ public long read(Buffer sink, long byteCount) throws IOException {
bytesRead = super.read(sink, byteCount);
} catch (IOException e) {
e.printStackTrace();
for (int i = 0; i < mListeners.length; i++) {
mListeners[i].onError(mProgressInfo.getId(), e);
for (ProgressListener listener:mListeners) {
listener.onError(mProgressInfo.getId(), e);
}
throw e;
}
Expand All @@ -98,15 +98,14 @@ public long read(Buffer sink, long byteCount) throws IOException {
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
tempSize += bytesRead != -1 ? bytesRead : 0;
if (mListeners != null) {
if (!mListeners.isEmpty()) {
long curTime = SystemClock.elapsedRealtime();
if (curTime - lastRefreshTime >= mRefreshTime || bytesRead == -1 || totalBytesRead == mProgressInfo.getContentLength()) {
final long finalBytesRead = bytesRead;
final long finalTempSize = tempSize;
final long finalTotalBytesRead = totalBytesRead;
final long finalIntervalTime = curTime - lastRefreshTime;
for (int i = 0; i < mListeners.length; i++) {
final ProgressListener listener = mListeners[i];
for (final ProgressListener listener:mListeners) {
mHandler.post(new Runnable() {
@Override
public void run() {
Expand Down