Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use ThreadPool to replace creating thread explicitly #24

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 76 additions & 83 deletions src/main/java/cn/nzcer/odapi/cron/SyncDataBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
Expand All @@ -26,7 +26,11 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -127,6 +131,12 @@ public Set<String> parseRepo(String url, int count) throws IOException {
return repoNames;
}

/**
* 插入所有仓库的指标数据
* @throws IOException
* @throws BrokenBarrierException
* @throws InterruptedException
*/
// 每月 3 日凌晨 1 点启动定时任务更新仓库的所有指标数据
@Scheduled(cron = "0 0 1 3 * ?")
public void insertAllRepoMetrics() throws IOException, BrokenBarrierException, InterruptedException {
Expand All @@ -136,130 +146,110 @@ public void insertAllRepoMetrics() throws IOException, BrokenBarrierException, I
Set<String> allRepo = getAllRepo();
log.info("获取到的 repo 数量为: " + allRepo.size());
List<String> tokens = getTokens();
CyclicBarrier cyclicBarrier = new CyclicBarrier(allRepo.size());
List<String> errorRepos = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(allRepo.size());
int repoCnt = 0;
for (String repo : allRepo) {
String token = tokens.get(repoCnt % tokens.size());
Thread thread = new Thread(() -> {
executor.execute(() -> {
try {
insertOneRepoMetric(repo, token);
} catch (IOException e) {
log.warn(e.getMessage(), e);
} catch (Exception e) {
errorRepos.add(repo + ":" + e.getMessage());
throw new RuntimeException(e);
} finally {
try {
cyclicBarrier.await();
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
countDownLatch.countDown();
}
});
thread.start();
repoCnt++;
}
cyclicBarrier.await();
//int cnt = 0;
//String reg = "(?<=/)\\w+(?=\\.json)";
//for (String repo : allRepo) {
// String[] split = repo.split("/");
// String orgName = split[0];
// String repoName = split[1];
// List<String> urls = repoMetricService.getRepoMetricUrls(orgName,repoName);
// List<RepoMetric> repoMetricList = new ArrayList<>();
// for (String url : urls) {
// JSONObject jo = NetUtil.doGet(url);
// if (jo == null) {
// continue;
// }
// jo.forEach((key, value) -> {
// RepoMetric repoMetric = new RepoMetric();
// repoMetric.setOrgName(orgName);
// repoMetric.setRepoName(repoName);
// Date metricTime = DateUtil.parse(key);
// if (metricTime != null) {
// repoMetric.setTMonth(metricTime);
// } else {
// // 跳过本次循环
// return;
// }
// repoMetric.setType(repoMetricService.getRepoMetricTypeFromUrl(url, reg));
// if (value instanceof Integer) {
// repoMetric.setTValue(BigDecimal.valueOf((Integer)value).doubleValue());
// } else {
// repoMetric.setTValue(((BigDecimal)value).doubleValue());
// }
// //RepoMetric repoMetric = new RepoMetric(orgName,repoName, DateUtil.parse(key), repoMetricService.getRepoMetricTypeFromUrl(url,reg), ((BigDecimal)value).doubleValue());
// repoMetricList.add(repoMetric);
// });
// }
// if (repoMetricList.size() == 0) {
// continue;
// }
// log.info("插入第 " + cnt + " 个 repo 的数据:" + repo);
// repoMetricService.insertBatchRepoMetric(repoMetricList);
// cnt++;
//}
countDownLatch.await();
log.info("定时任务完成:" + new Date());
FileWriter writer = new FileWriter("output.txt");
for (String str : errorRepos) {
writer.write(str + System.lineSeparator());
}
writer.close();
}

volatile int cnt = 1;

// 每日凌晨 5 点启动定时任务更新所有仓库的 star 和 fork 数据
// 自定义线程池
private static ExecutorService executor = new ThreadPoolExecutor(20,
100,
10,
TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<Runnable>());

/**
* 插入所有仓库的 star 和 fork 数据
* @throws InterruptedException
* @throws BrokenBarrierException
*/
@Scheduled(cron = "0 0 5 * * ?")
public void insertAllRepoStarAndFork() throws InterruptedException, BrokenBarrierException {
log.info("Repo Statistic 定时任务启动:" + new Date());
log.info("清空 repo_statistic 表");
repoStatisticService.truncateRepoStatistic();
List<Map<String, String>> repoInfo = repoMetricService.getRepoInfo();
log.info(String.valueOf(repoInfo.size()));
log.info("仓库的数量: " + String.valueOf(repoInfo.size()));
List<String> tokens = getTokens();
CyclicBarrier cyclicBarrier = new CyclicBarrier(repoInfo.size());
CountDownLatch countDownLatch = new CountDownLatch(repoInfo.size());
int totalRepo = 0;
for (Map<String, String> map : repoInfo) {
totalRepo++;
List<RepoStatistic> list = new ArrayList<>();
int finalTotalRepo = totalRepo;
Thread thread = new Thread(() -> {
executor.execute(() -> {
try {
insertOneRepoStarAndFork(map, tokens.get(finalTotalRepo % tokens.size()), list);
} catch (IOException e) {
log.warn(e.getMessage(), e);
e.printStackTrace();
} finally {
try {
cyclicBarrier.await();
} catch (Exception e) {
log.info(e.getMessage(), e);
}
countDownLatch.countDown();
}
});
thread.start();
}
cyclicBarrier.await();
countDownLatch.await();
log.info("所有子线程执行完毕");
}


/**
* 插入单个仓库的 star 和 fork 数据
* @param map
* @param token
* @param list
* @throws IOException
*/
private void insertOneRepoStarAndFork(Map<String, String> map, String token, List<RepoStatistic> list) throws IOException {
String orgName = map.get("orgName");
String repoName = map.get("repoName");
String url = " https://api.github.com/repos/" + orgName + "/" + repoName;
log.info(url);
//log.info(url);
JSONObject jsonObject = NetUtil.doGetWithToken(url, token);
if (jsonObject == null) {
return;
if (jsonObject != null) {
Integer star = jsonObject.getInteger("stargazers_count");
Integer fork = jsonObject.getInteger("forks_count");
if (star == null || fork == null) {
log.info(Thread.currentThread().getName() + " 当前仓库" + orgName + "/" + repoName + "无 star 或 fork");
return;
}
RepoStatistic r1 = new RepoStatistic(orgName, repoName, "stargazers_count", star);
RepoStatistic r2 = new RepoStatistic(orgName, repoName, "forks_count", fork);
list.add(r1);
list.add(r2);
repoStatisticService.insertBatchRepoStatistic(list);
log.info(Thread.currentThread().getName() + " 成功插入数据:" + orgName + "/" + repoName);
list.clear();
}
Integer star = jsonObject.getInteger("stargazers_count");
Integer fork = jsonObject.getInteger("forks_count");
RepoStatistic r1 = new RepoStatistic(orgName, repoName, "stargazers_count", star);
RepoStatistic r2 = new RepoStatistic(orgName, repoName, "forks_count", fork);
list.add(r1);
list.add(r2);
repoStatisticService.insertBatchRepoStatistic(list);
log.info("插入第 " + cnt + " 个 repo 的数据:" + orgName + "/" + repoName);
list.clear();
cnt++;
}


/**
* 插入单个仓库的指标数据
* @param repo
* @param token
* @throws IOException
*/
private void insertOneRepoMetric(String repo, String token) throws IOException {
String reg = "(?<=/)\\w+(?=\\.json)";
String[] split = repo.split("/");
Expand Down Expand Up @@ -297,10 +287,13 @@ private void insertOneRepoMetric(String repo, String token) throws IOException {
return;
}
repoMetricService.insertBatchRepoMetric(repoMetricList);
log.info("插入第 " + cnt + " 个 repo 的数据:" + orgName + "/" + repoName);
cnt++;
log.info(Thread.currentThread().getName() + " 成功插入数据:" + orgName + "/" + repoName);
}

/**
* 获取 token 列表
* @return
*/
public List<String> getTokens() {
List<GitHubToken> tokens = gitHubApiConfig.getTokens();
List<String> collect = tokens.stream().map(GitHubToken::getToken).collect(Collectors.toList());
Expand Down
41 changes: 12 additions & 29 deletions src/main/java/cn/nzcer/odapi/util/NetUtil.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package cn.nzcer.odapi.util;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.jetbrains.annotations.NotNull;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
Expand All @@ -23,7 +17,18 @@
*/
@Slf4j
public class NetUtil {
static OkHttpClient client = new OkHttpClient.Builder().readTimeout(100, TimeUnit.SECONDS).build();
//读取超时为500s
private static final long READ_TIMEOUT = 500;
//写入超时为500s
private static final long WRITE_TIMEOUT = 500;
//连接超时为500s
private static final long CONNECT_TIMEOUT = 500;
static OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(READ_TIMEOUT, TimeUnit.SECONDS)
.writeTimeout(WRITE_TIMEOUT, TimeUnit.SECONDS)
.connectTimeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)
.build();

// 同步请求
public static JSONObject doGet(String url) throws IOException {
Request request = new Request.Builder()
Expand Down Expand Up @@ -57,26 +62,4 @@ public static JSONObject doGetWithToken(String url, String token) throws IOExcep
}
}


// 异步请求
//public static void asyncRequest(String url) {
// Request request = new Request.Builder().url(url).build();
// client.newCall(request).enqueue(new Callback() {
// @Override
// public void onFailure(@NotNull Call call, @NotNull IOException e) {
// e.printStackTrace();
// }
//
// @Override
// public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
// if (!response.isSuccessful()) {
// throw new IOException("Unexpected code " + response);
// } else {
// String res = response.body().string();
// System.out.println(JSONObject.parseObject(res));
// }
// }
// });
//}

}
3 changes: 2 additions & 1 deletion src/test/java/cn/nzcer/odapi/cron/SyncDataBaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class SyncDataBaseTest {
SyncDataBase syncDataBase;

@Test
void insertAllRepoMetrics() throws IOException, BrokenBarrierException, InterruptedException {
void insertAllRepoMetrics() throws BrokenBarrierException, IOException, InterruptedException {
// 更新所有仓库的 OpenDigger 指标数据
syncDataBase.insertAllRepoMetrics();
}

Expand Down