Skip to content

Commit

Permalink
refactor: use ThreadPool to replace creating thread explicitly (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicheng-ning authored Mar 18, 2023
1 parent 8221f43 commit 0b855d5
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 113 deletions.
159 changes: 76 additions & 83 deletions src/main/java/cn/nzcer/odapi/cron/SyncDataBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
Expand All @@ -26,7 +26,11 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -127,6 +131,12 @@ public Set<String> parseRepo(String url, int count) throws IOException {
return repoNames;
}

/**
* 插入所有仓库的指标数据
* @throws IOException
* @throws BrokenBarrierException
* @throws InterruptedException
*/
// 每月 3 日凌晨 1 点启动定时任务更新仓库的所有指标数据
@Scheduled(cron = "0 0 1 3 * ?")
public void insertAllRepoMetrics() throws IOException, BrokenBarrierException, InterruptedException {
Expand All @@ -136,130 +146,110 @@ public void insertAllRepoMetrics() throws IOException, BrokenBarrierException, I
Set<String> allRepo = getAllRepo();
log.info("获取到的 repo 数量为: " + allRepo.size());
List<String> tokens = getTokens();
CyclicBarrier cyclicBarrier = new CyclicBarrier(allRepo.size());
List<String> errorRepos = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(allRepo.size());
int repoCnt = 0;
for (String repo : allRepo) {
String token = tokens.get(repoCnt % tokens.size());
Thread thread = new Thread(() -> {
executor.execute(() -> {
try {
insertOneRepoMetric(repo, token);
} catch (IOException e) {
log.warn(e.getMessage(), e);
} catch (Exception e) {
errorRepos.add(repo + ":" + e.getMessage());
throw new RuntimeException(e);
} finally {
try {
cyclicBarrier.await();
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
countDownLatch.countDown();
}
});
thread.start();
repoCnt++;
}
cyclicBarrier.await();
//int cnt = 0;
//String reg = "(?<=/)\\w+(?=\\.json)";
//for (String repo : allRepo) {
// String[] split = repo.split("/");
// String orgName = split[0];
// String repoName = split[1];
// List<String> urls = repoMetricService.getRepoMetricUrls(orgName,repoName);
// List<RepoMetric> repoMetricList = new ArrayList<>();
// for (String url : urls) {
// JSONObject jo = NetUtil.doGet(url);
// if (jo == null) {
// continue;
// }
// jo.forEach((key, value) -> {
// RepoMetric repoMetric = new RepoMetric();
// repoMetric.setOrgName(orgName);
// repoMetric.setRepoName(repoName);
// Date metricTime = DateUtil.parse(key);
// if (metricTime != null) {
// repoMetric.setTMonth(metricTime);
// } else {
// // 跳过本次循环
// return;
// }
// repoMetric.setType(repoMetricService.getRepoMetricTypeFromUrl(url, reg));
// if (value instanceof Integer) {
// repoMetric.setTValue(BigDecimal.valueOf((Integer)value).doubleValue());
// } else {
// repoMetric.setTValue(((BigDecimal)value).doubleValue());
// }
// //RepoMetric repoMetric = new RepoMetric(orgName,repoName, DateUtil.parse(key), repoMetricService.getRepoMetricTypeFromUrl(url,reg), ((BigDecimal)value).doubleValue());
// repoMetricList.add(repoMetric);
// });
// }
// if (repoMetricList.size() == 0) {
// continue;
// }
// log.info("插入第 " + cnt + " 个 repo 的数据:" + repo);
// repoMetricService.insertBatchRepoMetric(repoMetricList);
// cnt++;
//}
countDownLatch.await();
log.info("定时任务完成:" + new Date());
FileWriter writer = new FileWriter("output.txt");
for (String str : errorRepos) {
writer.write(str + System.lineSeparator());
}
writer.close();
}

volatile int cnt = 1;

// 每日凌晨 5 点启动定时任务更新所有仓库的 star 和 fork 数据
// 自定义线程池
private static ExecutorService executor = new ThreadPoolExecutor(20,
100,
10,
TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<Runnable>());

/**
* 插入所有仓库的 star 和 fork 数据
* @throws InterruptedException
* @throws BrokenBarrierException
*/
@Scheduled(cron = "0 0 5 * * ?")
public void insertAllRepoStarAndFork() throws InterruptedException, BrokenBarrierException {
log.info("Repo Statistic 定时任务启动:" + new Date());
log.info("清空 repo_statistic 表");
repoStatisticService.truncateRepoStatistic();
List<Map<String, String>> repoInfo = repoMetricService.getRepoInfo();
log.info(String.valueOf(repoInfo.size()));
log.info("仓库的数量: " + String.valueOf(repoInfo.size()));
List<String> tokens = getTokens();
CyclicBarrier cyclicBarrier = new CyclicBarrier(repoInfo.size());
CountDownLatch countDownLatch = new CountDownLatch(repoInfo.size());
int totalRepo = 0;
for (Map<String, String> map : repoInfo) {
totalRepo++;
List<RepoStatistic> list = new ArrayList<>();
int finalTotalRepo = totalRepo;
Thread thread = new Thread(() -> {
executor.execute(() -> {
try {
insertOneRepoStarAndFork(map, tokens.get(finalTotalRepo % tokens.size()), list);
} catch (IOException e) {
log.warn(e.getMessage(), e);
e.printStackTrace();
} finally {
try {
cyclicBarrier.await();
} catch (Exception e) {
log.info(e.getMessage(), e);
}
countDownLatch.countDown();
}
});
thread.start();
}
cyclicBarrier.await();
countDownLatch.await();
log.info("所有子线程执行完毕");
}


/**
* 插入单个仓库的 star 和 fork 数据
* @param map
* @param token
* @param list
* @throws IOException
*/
private void insertOneRepoStarAndFork(Map<String, String> map, String token, List<RepoStatistic> list) throws IOException {
String orgName = map.get("orgName");
String repoName = map.get("repoName");
String url = " https://api.github.com/repos/" + orgName + "/" + repoName;
log.info(url);
//log.info(url);
JSONObject jsonObject = NetUtil.doGetWithToken(url, token);
if (jsonObject == null) {
return;
if (jsonObject != null) {
Integer star = jsonObject.getInteger("stargazers_count");
Integer fork = jsonObject.getInteger("forks_count");
if (star == null || fork == null) {
log.info(Thread.currentThread().getName() + " 当前仓库" + orgName + "/" + repoName + "无 star 或 fork");
return;
}
RepoStatistic r1 = new RepoStatistic(orgName, repoName, "stargazers_count", star);
RepoStatistic r2 = new RepoStatistic(orgName, repoName, "forks_count", fork);
list.add(r1);
list.add(r2);
repoStatisticService.insertBatchRepoStatistic(list);
log.info(Thread.currentThread().getName() + " 成功插入数据:" + orgName + "/" + repoName);
list.clear();
}
Integer star = jsonObject.getInteger("stargazers_count");
Integer fork = jsonObject.getInteger("forks_count");
RepoStatistic r1 = new RepoStatistic(orgName, repoName, "stargazers_count", star);
RepoStatistic r2 = new RepoStatistic(orgName, repoName, "forks_count", fork);
list.add(r1);
list.add(r2);
repoStatisticService.insertBatchRepoStatistic(list);
log.info("插入第 " + cnt + " 个 repo 的数据:" + orgName + "/" + repoName);
list.clear();
cnt++;
}


/**
* 插入单个仓库的指标数据
* @param repo
* @param token
* @throws IOException
*/
private void insertOneRepoMetric(String repo, String token) throws IOException {
String reg = "(?<=/)\\w+(?=\\.json)";
String[] split = repo.split("/");
Expand Down Expand Up @@ -297,10 +287,13 @@ private void insertOneRepoMetric(String repo, String token) throws IOException {
return;
}
repoMetricService.insertBatchRepoMetric(repoMetricList);
log.info("插入第 " + cnt + " 个 repo 的数据:" + orgName + "/" + repoName);
cnt++;
log.info(Thread.currentThread().getName() + " 成功插入数据:" + orgName + "/" + repoName);
}

/**
* 获取 token 列表
* @return
*/
public List<String> getTokens() {
List<GitHubToken> tokens = gitHubApiConfig.getTokens();
List<String> collect = tokens.stream().map(GitHubToken::getToken).collect(Collectors.toList());
Expand Down
41 changes: 12 additions & 29 deletions src/main/java/cn/nzcer/odapi/util/NetUtil.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package cn.nzcer.odapi.util;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.jetbrains.annotations.NotNull;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
Expand All @@ -23,7 +17,18 @@
*/
@Slf4j
public class NetUtil {
static OkHttpClient client = new OkHttpClient.Builder().readTimeout(100, TimeUnit.SECONDS).build();
//读取超时为500s
private static final long READ_TIMEOUT = 500;
//写入超时为500s
private static final long WRITE_TIMEOUT = 500;
//连接超时为500s
private static final long CONNECT_TIMEOUT = 500;
static OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(READ_TIMEOUT, TimeUnit.SECONDS)
.writeTimeout(WRITE_TIMEOUT, TimeUnit.SECONDS)
.connectTimeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)
.build();

// 同步请求
public static JSONObject doGet(String url) throws IOException {
Request request = new Request.Builder()
Expand Down Expand Up @@ -57,26 +62,4 @@ public static JSONObject doGetWithToken(String url, String token) throws IOExcep
}
}


// 异步请求
//public static void asyncRequest(String url) {
// Request request = new Request.Builder().url(url).build();
// client.newCall(request).enqueue(new Callback() {
// @Override
// public void onFailure(@NotNull Call call, @NotNull IOException e) {
// e.printStackTrace();
// }
//
// @Override
// public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
// if (!response.isSuccessful()) {
// throw new IOException("Unexpected code " + response);
// } else {
// String res = response.body().string();
// System.out.println(JSONObject.parseObject(res));
// }
// }
// });
//}

}
3 changes: 2 additions & 1 deletion src/test/java/cn/nzcer/odapi/cron/SyncDataBaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class SyncDataBaseTest {
SyncDataBase syncDataBase;

@Test
void insertAllRepoMetrics() throws IOException, BrokenBarrierException, InterruptedException {
void insertAllRepoMetrics() throws BrokenBarrierException, IOException, InterruptedException {
// 更新所有仓库的 OpenDigger 指标数据
syncDataBase.insertAllRepoMetrics();
}

Expand Down

0 comments on commit 0b855d5

Please sign in to comment.