Skip to content

Commit 2696915

Browse files
feat(index): indexing repo in shard when more than 20 (#4366)
* feat(index): indexing repo in shard when more than 20 * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * chore: add env for enabling index repo in shard # Conflicts: # ee/tabby-webserver/src/service/background_job/mod.rs * [autofix.ci] apply automated fixes * fix(job): utilize to_std err to check whether the time is past (#4370) * [autofix.ci] apply automated fixes * fix(test): use cloudflare workers-ai llms.txt for test --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 49ae1e4 commit 2696915

File tree

5 files changed

+73
-19
lines changed

5 files changed

+73
-19
lines changed

crates/tabby-crawler/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ mod tests {
225225
#[tokio::test]
226226
#[traced_test]
227227
async fn test_crawler_llms_success_developers_cloudflare_with_url() {
228-
let base_url = "https://developers.cloudflare.com";
228+
let base_url = "https://developers.cloudflare.com/workers-ai";
229229
let result = crawler_llms(base_url).await;
230230
assert!(result.is_ok(), "Expected success from {base_url}");
231231
let docs = result.unwrap();

ee/tabby-webserver/src/service/background_job/git.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use tabby_inference::Embedding;
99
use tabby_schema::{job::JobService, repository::GitRepositoryService};
1010
use tracing::debug;
1111

12-
use super::{helper::Job, index_commits, BackgroundJobEvent};
12+
use super::{
13+
calculate_current_shard, helper::Job, index_commits, should_process_repository,
14+
BackgroundJobEvent,
15+
};
1316

1417
#[derive(Debug, Serialize, Deserialize, Clone)]
1518
pub struct SchedulerGitJob {
@@ -50,7 +53,7 @@ impl SchedulerGitJob {
5053
}
5154

5255
pub async fn cron(
53-
_now: DateTime<Utc>,
56+
now: DateTime<Utc>,
5457
git_repository: Arc<dyn GitRepositoryService>,
5558
job: Arc<dyn JobService>,
5659
) -> tabby_schema::Result<()> {
@@ -76,10 +79,19 @@ impl SchedulerGitJob {
7679
.chain(config_repositories.into_iter())
7780
.collect();
7881

79-
for repository in repositories {
82+
let number_of_repo = repositories.len();
83+
let current_shard = calculate_current_shard(number_of_repo, now.timestamp());
84+
85+
for (i, repository) in repositories.iter().enumerate() {
86+
if !should_process_repository(i, current_shard, number_of_repo) {
87+
continue;
88+
}
89+
8090
debug!("Scheduling git repository sync for {:?}", repository);
8191
let _ = job
82-
.trigger(BackgroundJobEvent::SchedulerGitRepository(repository).to_command())
92+
.trigger(
93+
BackgroundJobEvent::SchedulerGitRepository(repository.clone()).to_command(),
94+
)
8395
.await;
8496
}
8597
Ok(())

ee/tabby-webserver/src/service/background_job/helper/cron.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use chrono::{DateTime, TimeZone, Utc};
22
pub use cron::Schedule;
33
use futures::Stream;
4-
use tracing::warn;
54

65
/// Represents a stream from a cron schedule with a timezone
76
#[derive(Clone, Debug)]
@@ -35,14 +34,10 @@ where
3534
match next {
3635
Some(next) => {
3736
let to_sleep = next.clone() - timezone.from_utc_datetime(&Utc::now().naive_utc());
38-
if to_sleep.num_seconds() < 0 {
39-
// If the next time is in the past, skip it and get the next one.
40-
continue
41-
}
4237
let to_sleep = match to_sleep.to_std() {
4338
Ok(to_sleep) => to_sleep,
44-
Err(err) => {
45-
warn!("Failed to convert to std duration: {}", err);
39+
Err(_) => {
40+
// If the next time is in the past or conversion fails, skip it and get the next one.
4641
continue;
4742
}
4843
};

ee/tabby-webserver/src/service/background_job/mod.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ mod license_check;
1111
mod third_party_integration;
1212
mod web_crawler;
1313

14-
use std::{str::FromStr, sync::Arc};
14+
use std::{env, str::FromStr, sync::Arc};
1515

1616
use cron::Schedule;
1717
use daily::DailyJob;
@@ -47,6 +47,41 @@ pub use web_crawler::WebCrawlerJob;
4747

4848
use self::third_party_integration::SyncIntegrationJob;
4949

50+
// Sharding configuration constants
51+
pub const REPOSITORIES_PER_SHARD: usize = 7;
52+
pub const SHARDING_THRESHOLD: usize = 20;
53+
54+
/// Calculate the current shard for repository processing
55+
/// Returns Some(shard) if sharding should be used, None otherwise
56+
fn calculate_current_shard(number_of_repo: usize, timestamp_seconds: i64) -> Option<usize> {
57+
// Only run on TABBY_INDEX_REPO_IN_SHARD is not empty and number_of_repo > SHARDING_THRESHOLD
58+
// otherwise return None
59+
if !(env::var("TABBY_INDEX_REPO_IN_SHARD").is_ok_and(|v| !v.is_empty())
60+
&& number_of_repo > SHARDING_THRESHOLD)
61+
{
62+
return None;
63+
}
64+
65+
// `number_of_repo + REPOSITORIES_PER_SHARD - 1` because we should ceil number_of_repo
66+
let number_of_shard = number_of_repo.div_ceil(REPOSITORIES_PER_SHARD);
67+
let timestamp = timestamp_seconds as usize;
68+
Some((timestamp / 3600) % number_of_shard)
69+
}
70+
71+
/// Check if a repository should be processed based on sharding
72+
fn should_process_repository(
73+
repo_index: usize,
74+
current_shard: Option<usize>,
75+
number_of_repo: usize,
76+
) -> bool {
77+
let Some(current_shard) = current_shard else {
78+
return true; // No sharding, process all repositories
79+
};
80+
81+
let number_of_shard = number_of_repo.div_ceil(REPOSITORIES_PER_SHARD); // Math.ceil
82+
repo_index % number_of_shard == current_shard
83+
}
84+
5085
#[derive(Debug, Serialize, Deserialize, Clone)]
5186
pub enum BackgroundJobEvent {
5287
SchedulerGitRepository(CodeRepository),

ee/tabby-webserver/src/service/background_job/third_party_integration.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ use tabby_schema::{
1717
};
1818
use tracing::debug;
1919

20-
use super::{helper::Job, index_commits, BackgroundJobEvent};
20+
use super::{
21+
calculate_current_shard, helper::Job, index_commits, should_process_repository,
22+
BackgroundJobEvent,
23+
};
2124

2225
pub mod error;
2326
mod issues;
@@ -242,17 +245,26 @@ impl SchedulerGithubGitlabJob {
242245
}
243246

244247
pub async fn cron(
245-
_now: DateTime<Utc>,
248+
now: DateTime<Utc>,
246249
repository: Arc<dyn ThirdPartyRepositoryService>,
247250
job: Arc<dyn JobService>,
248251
) -> tabby_schema::Result<()> {
249-
for repository in repository
252+
let repositories = repository
250253
.list_repositories_with_filter(None, None, Some(true), None, None, None, None)
251-
.await?
252-
{
254+
.await?;
255+
256+
let number_of_repo = repositories.len();
257+
let current_shard = calculate_current_shard(number_of_repo, now.timestamp());
258+
259+
for (i, repository) in repositories.iter().enumerate() {
260+
if !should_process_repository(i, current_shard, number_of_repo) {
261+
continue;
262+
}
263+
253264
let _ = job
254265
.trigger(
255-
BackgroundJobEvent::SchedulerGithubGitlabRepository(repository.id).to_command(),
266+
BackgroundJobEvent::SchedulerGithubGitlabRepository(repository.id.clone())
267+
.to_command(),
256268
)
257269
.await;
258270
}

0 commit comments

Comments
 (0)