Skip to content

Commit 1a83d89

Browse files
authored
fix(core): pipeline doesn’t stop automatically after all registered datasources complete (#317)
* feat: add rpc-block-crawler example * fix: complete pipeline run after all datasources finish * fix(datasources): drop sender channel after complete crawling in rpc block crawler * fix: sort Cargo.toml
1 parent c833334 commit 1a83d89

File tree

17 files changed

+405
-269
lines changed

17 files changed

+405
-269
lines changed

Cargo.lock

Lines changed: 105 additions & 112 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,36 @@ license = "MIT"
1010
repository = "https://github.com/sevenlabs-hq/carbon"
1111

1212
[workspace.dependencies]
13-
# main
14-
carbon-cli = { path = "crates/cli", version = "0.8.1" }
15-
carbon-core = { path = "crates/core", version = "0.8.1" }
16-
carbon-gql-server = { path = "crates/gql-server", version = "0.8.1" }
17-
carbon-macros = { path = "crates/macros", version = "0.8.1" }
18-
carbon-postgres-client = { path = "crates/postgres-client", version = "0.8.1" }
19-
carbon-proc-macros = { path = "crates/proc-macros", version = "0.8.1" }
20-
carbon-test-utils = { path = "crates/test-utils", version = "0.8.1" }
2113

22-
# datasources
23-
carbon-helius-atlas-ws-datasource = { path = "datasources/helius-atlas-ws-datasource", version = "0.8.1" }
24-
carbon-jito-shredstream-grpc-datasource = { path = "datasources/jito-shredstream-grpc-datasource", version = "0.8.1" }
25-
carbon-rpc-block-crawler-datasource = { path = "datasources/rpc-block-crawler-datasource", version = "0.8.1" }
26-
carbon-rpc-block-subscribe-datasource = { path = "datasources/rpc-block-subscribe-datasource", version = "0.8.1" }
27-
carbon-rpc-program-subscribe-datasource = { path = "datasources/rpc-program-subscribe-datasource", version = "0.8.1" }
28-
carbon-rpc-transaction-crawler-datasource = { path = "datasources/rpc-transaction-crawler-datasource", version = "0.8.1" }
29-
carbon-yellowstone-grpc-datasource = { path = "datasources/yellowstone-grpc-datasource", version = "0.8.1" }
30-
31-
# metrics
32-
carbon-log-metrics = { path = "metrics/log-metrics", version = "0.8.1" }
33-
carbon-prometheus-metrics = { path = "metrics/prometheus-metrics", version = "0.8.1" }
14+
# other
15+
anyhow = "1.0.96"
16+
askama = "0.12.1"
17+
async-stream = "0.3.6"
18+
async-trait = { version = "0.1.86" }
19+
axum = "0.8.4"
20+
base64 = "0.22.1"
21+
bincode = "1.3.3"
22+
borsh = { version = "1.5.1" }
23+
borsh-derive-internal = "0.10.3"
24+
bs58 = { version = "0.5.1", default-features = false }
3425

3526
# decoders
3627
carbon-address-lookup-table-decoder = { path = "decoders/address-lookup-table-decoder", version = "0.8.1" }
3728
carbon-associated-token-account-decoder = { path = "decoders/associated-token-account-decoder", version = "0.8.1" }
3829
carbon-boop-decoder = { path = "decoders/boop-decoder", version = "0.8.1" }
30+
# main
31+
carbon-cli = { path = "crates/cli", version = "0.8.1" }
32+
carbon-core = { path = "crates/core", version = "0.8.1" }
3933
carbon-drift-v2-decoder = { path = "decoders/drift-v2-decoder", version = "0.8.1" }
4034
carbon-fluxbeam-decoder = { path = "decoders/fluxbeam-decoder", version = "0.8.1" }
35+
carbon-gql-server = { path = "crates/gql-server", version = "0.8.1" }
36+
37+
# datasources
38+
carbon-helius-atlas-ws-datasource = { path = "datasources/helius-atlas-ws-datasource", version = "0.8.1" }
39+
40+
# misc
41+
carbon-jito-protos = { path = "misc/jito-protos", version = "0.2.4" }
42+
carbon-jito-shredstream-grpc-datasource = { path = "datasources/jito-shredstream-grpc-datasource", version = "0.8.1" }
4143
carbon-jupiter-dca-decoder = { path = "decoders/jupiter-dca-decoder", version = "0.8.1" }
4244
carbon-jupiter-limit-order-2-decoder = { path = "decoders/jupiter-limit-order-2-decoder", version = "0.8.1" }
4345
carbon-jupiter-limit-order-decoder = { path = "decoders/jupiter-limit-order-decoder", version = "0.8.1" }
@@ -47,6 +49,10 @@ carbon-kamino-farms-decoder = { path = "decoders/kamino-farms-decoder", version
4749
carbon-kamino-lending-decoder = { path = "decoders/kamino-lending-decoder", version = "0.8.1" }
4850
carbon-kamino-vault-decoder = { path = "decoders/kamino-vault-decoder", version = "0.8.1" }
4951
carbon-lifinity-amm-v2-decoder = { path = "decoders/lifinity-amm-v2-decoder", version = "0.8.1" }
52+
53+
# metrics
54+
carbon-log-metrics = { path = "metrics/log-metrics", version = "0.8.1" }
55+
carbon-macros = { path = "crates/macros", version = "0.8.1" }
5056
carbon-marginfi-v2-decoder = { path = "decoders/marginfi-v2-decoder", version = "0.8.1" }
5157
carbon-marinade-finance-decoder = { path = "decoders/marinade-finance-decoder", version = "0.8.1" }
5258
carbon-memo-program-decoder = { path = "decoders/memo-program-decoder", version = "0.8.1" }
@@ -60,61 +66,33 @@ carbon-okx-dex-decoder = { path = "decoders/okx-dex-decoder", version = "0.8.1"
6066
carbon-openbook-v2-decoder = { path = "decoders/openbook-v2-decoder", version = "0.8.1" }
6167
carbon-orca-whirlpool-decoder = { path = "decoders/orca-whirlpool-decoder", version = "0.8.1" }
6268
carbon-phoenix-v1-decoder = { path = "decoders/phoenix-v1-decoder", version = "0.8.1" }
69+
carbon-postgres-client = { path = "crates/postgres-client", version = "0.8.1" }
70+
carbon-proc-macros = { path = "crates/proc-macros", version = "0.8.1" }
71+
carbon-prometheus-metrics = { path = "metrics/prometheus-metrics", version = "0.8.1" }
6372
carbon-pump-swap-decoder = { path = "decoders/pump-swap-decoder", version = "0.8.1" }
6473
carbon-pumpfun-decoder = { path = "decoders/pumpfun-decoder", version = "0.8.1" }
6574
carbon-raydium-amm-v4-decoder = { path = "decoders/raydium-amm-v4-decoder", version = "0.8.1" }
6675
carbon-raydium-clmm-decoder = { path = "decoders/raydium-clmm-decoder", version = "0.8.1" }
6776
carbon-raydium-cpmm-decoder = { path = "decoders/raydium-cpmm-decoder", version = "0.8.1" }
6877
carbon-raydium-launchpad-decoder = { path = "decoders/raydium-launchpad-decoder", version = "0.8.1" }
6978
carbon-raydium-liquidity-locking-decoder = { path = "decoders/carbon-raydium-liquidity-locking-decoder", version = "0.8.1" }
79+
carbon-rpc-block-crawler-datasource = { path = "datasources/rpc-block-crawler-datasource", version = "0.8.1" }
80+
carbon-rpc-block-subscribe-datasource = { path = "datasources/rpc-block-subscribe-datasource", version = "0.8.1" }
81+
carbon-rpc-program-subscribe-datasource = { path = "datasources/rpc-program-subscribe-datasource", version = "0.8.1" }
82+
carbon-rpc-transaction-crawler-datasource = { path = "datasources/rpc-transaction-crawler-datasource", version = "0.8.1" }
7083
carbon-sharky-decoder = { path = "decoders/sharky-decoder", version = "0.8.1" }
7184
carbon-solayer-restaking-program-decoder = { path = "decoders/solayer-restaking-program-decoder", version = "0.8.1" }
7285
carbon-stabble-stable-swap-decoder = { path = "decoders/carbon-stabble-stable-swap-decoder", version = "0.8.1" }
7386
carbon-stabble-weighted-swap-decoder = { path = "decoders/carbon-stabble-weighted-swap-decoder", version = "0.8.1" }
7487
carbon-stake-program-decoder = { path = "decoders/carbon-stake-program-decoder", version = "0.8.1" }
7588
carbon-system-program-decoder = { path = "decoders/system-program-decoder", version = "0.8.1" }
89+
carbon-test-utils = { path = "crates/test-utils", version = "0.8.1" }
7690
carbon-token-2022-decoder = { path = "decoders/token-2022-decoder", version = "0.8.1" }
7791
carbon-token-program-decoder = { path = "decoders/token-program-decoder", version = "0.8.1" }
7892
carbon-virtual-curve-decoder = { path = "decoders/virtual-curve-decoder", version = "0.8.1" }
7993
carbon-virtuals-decoder = { path = "decoders/virtuals-decoder", version = "0.8.1" }
94+
carbon-yellowstone-grpc-datasource = { path = "datasources/yellowstone-grpc-datasource", version = "0.8.1" }
8095
carbon-zeta-decoder = { path = "decoders/zeta-decoder", version = "0.8.1" }
81-
82-
# misc
83-
carbon-jito-protos = { path = "misc/jito-protos", version = "0.2.4" }
84-
85-
# solana
86-
solana-account = "2.2"
87-
solana-account-decoder = "2.2"
88-
solana-account-decoder-client-types = "2.2"
89-
solana-client = "2.2"
90-
solana-clock = "2.2"
91-
solana-commitment-config = "2.2"
92-
solana-entry = "2.2"
93-
solana-hash = "2.2"
94-
solana-instruction = { version = "2.2", default-features = false }
95-
solana-message = "2.2"
96-
solana-native-token = "2.2"
97-
solana-program = "2.2"
98-
solana-program-pack = "2.2"
99-
solana-pubkey = { version = "2.2", features = ["serde"] }
100-
solana-signature = "2.2"
101-
solana-transaction = "2.2"
102-
solana-transaction-context = "2.2"
103-
solana-transaction-status = "2.2"
104-
spl-memo = "5.0.0"
105-
spl-token = "6.0.0"
106-
107-
# other
108-
anyhow = "1.0.96"
109-
askama = "0.12.1"
110-
async-stream = "0.3.6"
111-
async-trait = { version = "0.1.86" }
112-
axum = "0.8.4"
113-
base64 = "0.22.1"
114-
bincode = "1.3.3"
115-
borsh = { version = "1.5.1" }
116-
borsh-derive-internal = "0.10.3"
117-
bs58 = { version = "0.5.1", default-features = false }
11896
chrono = { version = "0.4.40", features = ["serde"] }
11997
clap = { version = "4.5.30", features = ["derive"] }
12098
console = "0.15.8"
@@ -147,6 +125,28 @@ serde = { version = "1.0.208", features = ["derive"] }
147125
serde-big-array = "0.5.1"
148126
serde_json = "1.0.138"
149127
sha2 = "0.10.8"
128+
129+
# solana
130+
solana-account = "2.2"
131+
solana-account-decoder = "2.2"
132+
solana-account-decoder-client-types = "2.2"
133+
solana-client = "2.2"
134+
solana-clock = "2.2"
135+
solana-commitment-config = "2.2"
136+
solana-entry = "2.2"
137+
solana-hash = "2.2"
138+
solana-instruction = { version = "2.2", default-features = false }
139+
solana-message = "2.2"
140+
solana-native-token = "2.2"
141+
solana-program = "2.2"
142+
solana-program-pack = "2.2"
143+
solana-pubkey = { version = "2.2", features = ["serde"] }
144+
solana-signature = "2.2"
145+
solana-transaction = "2.2"
146+
solana-transaction-context = "2.2"
147+
solana-transaction-status = "2.2"
148+
spl-memo = "5.0.0"
149+
spl-token = "6.0.0"
150150
sqlx = { version = "0.8.5", features = [
151151
"macros",
152152
"runtime-tokio-rustls",

crates/core/src/datasource.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ use {
102102
pub trait Datasource: Send + Sync {
103103
async fn consume(
104104
&self,
105-
sender: &tokio::sync::mpsc::Sender<Update>,
105+
sender: tokio::sync::mpsc::Sender<Update>,
106106
cancellation_token: CancellationToken,
107107
metrics: Arc<MetricsCollection>,
108108
) -> CarbonResult<()>;

crates/core/src/pipeline.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ impl Pipeline {
354354
tokio::spawn(async move {
355355
if let Err(e) = datasource_clone
356356
.consume(
357-
&sender_clone,
357+
sender_clone,
358358
datasource_cancellation_token_clone,
359359
metrics_collection,
360360
)
@@ -365,6 +365,8 @@ impl Pipeline {
365365
});
366366
}
367367

368+
drop(update_sender);
369+
368370
let mut interval = tokio::time::interval(time::Duration::from_secs(
369371
self.metrics_flush_interval.unwrap_or(5),
370372
));

datasources/helius-atlas-ws-datasource/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl HeliusWebsocket {
9494
impl Datasource for HeliusWebsocket {
9595
async fn consume(
9696
&self,
97-
sender: &Sender<Update>,
97+
sender: Sender<Update>,
9898
cancellation_token: CancellationToken,
9999
metrics: Arc<MetricsCollection>,
100100
) -> CarbonResult<()> {

datasources/jito-shredstream-grpc-datasource/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,10 @@ impl JitoShredstreamGrpcClient {
3434
impl Datasource for JitoShredstreamGrpcClient {
3535
async fn consume(
3636
&self,
37-
sender: &Sender<Update>,
37+
sender: Sender<Update>,
3838
cancellation_token: CancellationToken,
3939
metrics: Arc<MetricsCollection>,
4040
) -> CarbonResult<()> {
41-
let sender = sender.clone();
4241
let endpoint = self.0.clone();
4342

4443
let mut client = ShredstreamProxyClient::connect(endpoint)

0 commit comments

Comments
 (0)