Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions crates/core/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ pub trait AccountDecoder<'a> {
/// The input type for the account processor.
///
/// - `T`: The account type, as determined by the decoder.
pub type AccountProcessorInputType<T> = (AccountMetadata, DecodedAccount<T>);
pub type AccountProcessorInputType<T> =
(AccountMetadata, DecodedAccount<T>, solana_account::Account);

/// A processing pipe that decodes and processes Solana account updates.
///
Expand Down Expand Up @@ -207,7 +208,14 @@ impl<T: Send> AccountPipes for AccountPipe<T> {

if let Some(decoded_account) = self.decoder.decode_account(&account_with_metadata.1) {
self.processor
.process((account_with_metadata.0, decoded_account), metrics)
.process(
(
account_with_metadata.0.clone(),
decoded_account,
account_with_metadata.1,
),
metrics.clone(),
)
.await?;
}
Ok(())
Expand Down
12 changes: 2 additions & 10 deletions crates/core/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ pub trait InstructionDecoder<'a> {
/// The input type for the instruction processor.
///
/// - `T`: The instruction type
pub type InstructionProcessorInputType<T> = (
InstructionMetadata,
DecodedInstruction<T>,
NestedInstructions,
);
pub type InstructionProcessorInputType<T> = (NestedInstruction, DecodedInstruction<T>);

/// A processing pipeline for instructions, using a decoder and processor.
///
Expand Down Expand Up @@ -178,11 +174,7 @@ impl<T: Send + 'static> InstructionPipes<'_> for InstructionPipe<T> {
{
self.processor
.process(
(
nested_instruction.metadata.clone(),
decoded_instruction,
nested_instruction.inner_instructions.clone(),
),
(nested_instruction.clone(), decoded_instruction),
metrics.clone(),
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ impl PipelineBuilder {
///
/// - `decoder`: An `AccountDecoder` that decodes the account data.
/// - `processor`: A `Processor` that processes the decoded account data.
/// - `processor_with_raw`: A `Processor` that processes decoded account data with the raw account data.
///
/// # Example
///
Expand Down
3 changes: 2 additions & 1 deletion examples/block-crawler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ impl Processor for PumpfunInstructionProcessor {
data: Self::InputType,
_metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let (metadata, pumpfun_instruction, _) = data;
let (nested_instruction, pumpfun_instruction) = data;
let metadata = nested_instruction.metadata;

match pumpfun_instruction.data {
PumpfunInstruction::CreateEvent(create_event) => {
Expand Down
21 changes: 10 additions & 11 deletions examples/jupiter-swap-alerts/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
async_trait::async_trait,
carbon_core::{
error::CarbonResult,
instruction::{DecodedInstruction, InstructionMetadata, NestedInstructions},
instruction::{DecodedInstruction, NestedInstruction},
metrics::MetricsCollection,
processor::Processor,
},
Expand Down Expand Up @@ -76,16 +76,15 @@ pub struct JupiterSwapInstructionProcessor;
#[async_trait]
impl Processor for JupiterSwapInstructionProcessor {
type InputType = (
InstructionMetadata,
NestedInstruction,
DecodedInstruction<JupiterSwapInstruction>,
NestedInstructions,
);
async fn process(
&mut self,
(metadata, instruction, nested_instructions): Self::InputType,
(nested_instruction, instruction): Self::InputType,
_metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let signature = metadata.transaction_metadata.signature;
let signature = nested_instruction.metadata.transaction_metadata.signature;

match instruction.data {
JupiterSwapInstruction::Claim(claim) => {
Expand All @@ -105,7 +104,7 @@ impl Processor for JupiterSwapInstructionProcessor {
}
JupiterSwapInstruction::ExactOutRoute(exact_out_route) => {
assert!(
!nested_instructions.is_empty(),
!nested_instruction.inner_instructions.is_empty(),
"nested instructions empty: {} ",
signature
);
Expand All @@ -115,15 +114,15 @@ impl Processor for JupiterSwapInstructionProcessor {
}
JupiterSwapInstruction::Route(route) => {
assert!(
!nested_instructions.is_empty(),
!nested_instruction.inner_instructions.is_empty(),
"nested instructions empty: {} ",
signature
);
log::info!("route: signature: {signature}, route: {route:?}");
}
JupiterSwapInstruction::RouteWithTokenLedger(route_with_token_ledger) => {
assert!(
!nested_instructions.is_empty(),
!nested_instruction.inner_instructions.is_empty(),
"nested instructions empty: {} ",
signature
);
Expand All @@ -136,15 +135,15 @@ impl Processor for JupiterSwapInstructionProcessor {
shared_accounts_exact_out_route,
) => {
assert!(
!nested_instructions.is_empty(),
!nested_instruction.inner_instructions.is_empty(),
"nested instructions empty: {} ",
signature
);
log::info!("shared_accounts_exact_out_route: signature: {signature}, shared_accounts_exact_out_route: {shared_accounts_exact_out_route:?}");
}
JupiterSwapInstruction::SharedAccountsRoute(shared_accounts_route) => {
assert!(
!nested_instructions.is_empty(),
!nested_instruction.inner_instructions.is_empty(),
"nested instructions empty: {} ",
signature
);
Expand All @@ -154,7 +153,7 @@ impl Processor for JupiterSwapInstructionProcessor {
shared_accounts_route_with_token_ledger,
) => {
assert!(
!nested_instructions.is_empty(),
!nested_instruction.inner_instructions.is_empty(),
"nested instructions empty: {} ",
signature
);
Expand Down
1 change: 1 addition & 0 deletions examples/kamino-alerts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ async-trait = { workspace = true }
dotenv = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
solana-account = { workspace = true }
tokio = { workspace = true, features = ["full"] }
yellowstone-grpc-proto = { workspace = true }
15 changes: 9 additions & 6 deletions examples/kamino-alerts/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
carbon_core::{
account::{AccountMetadata, DecodedAccount},
error::CarbonResult,
instruction::{DecodedInstruction, InstructionMetadata, NestedInstructions},
instruction::{DecodedInstruction, NestedInstruction},
metrics::MetricsCollection,
processor::Processor,
},
Expand Down Expand Up @@ -80,17 +80,16 @@ pub struct KaminoLendingInstructionProcessor;
#[async_trait]
impl Processor for KaminoLendingInstructionProcessor {
type InputType = (
InstructionMetadata,
NestedInstruction,
DecodedInstruction<KaminoLendingInstruction>,
NestedInstructions,
);

async fn process(
&mut self,
(metadata, instruction, _nested_instructions): Self::InputType,
(nested_instruction, instruction): Self::InputType,
_metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let signature = metadata.transaction_metadata.signature;
let signature = nested_instruction.metadata.transaction_metadata.signature;

let signature = format!(
"{}...{}",
Expand All @@ -111,7 +110,11 @@ impl Processor for KaminoLendingInstructionProcessor {
pub struct KaminoLendingAccountProcessor;
#[async_trait]
impl Processor for KaminoLendingAccountProcessor {
type InputType = (AccountMetadata, DecodedAccount<KaminoLendingAccount>);
type InputType = (
AccountMetadata,
DecodedAccount<KaminoLendingAccount>,
solana_account::Account,
);

async fn process(
&mut self,
Expand Down
7 changes: 3 additions & 4 deletions examples/meteora-activities/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
async_trait::async_trait,
carbon_core::{
error::CarbonResult,
instruction::{DecodedInstruction, InstructionMetadata, NestedInstructions},
instruction::{DecodedInstruction, NestedInstruction},
metrics::MetricsCollection,
processor::Processor,
},
Expand Down Expand Up @@ -49,17 +49,16 @@ pub struct MeteoraInstructionProcessor;
#[async_trait]
impl Processor for MeteoraInstructionProcessor {
type InputType = (
InstructionMetadata,
NestedInstruction,
DecodedInstruction<MeteoraDlmmInstruction>,
NestedInstructions,
);

async fn process(
&mut self,
data: Self::InputType,
_metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let (_instruction_metadata, decoded_instruction, _nested_instructions) = data;
let (_, decoded_instruction) = data;

// Process all Meteora Events and add each to DB
match decoded_instruction.data {
Expand Down
14 changes: 5 additions & 9 deletions examples/moonshot-alerts/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
carbon_core::{
deserialize::ArrangeAccounts,
error::CarbonResult,
instruction::{DecodedInstruction, InstructionMetadata, NestedInstructions},
instruction::{DecodedInstruction, NestedInstruction},
metrics::MetricsCollection,
processor::Processor,
},
Expand Down Expand Up @@ -56,19 +56,15 @@ pub struct MoonshotInstructionProcessor;

#[async_trait]
impl Processor for MoonshotInstructionProcessor {
type InputType = (
InstructionMetadata,
DecodedInstruction<MoonshotInstruction>,
NestedInstructions,
);
type InputType = (NestedInstruction, DecodedInstruction<MoonshotInstruction>);

async fn process(
&mut self,
(metadata, instruction, _nested_instructions): Self::InputType,
(nested_instruction, instruction): Self::InputType,
_metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let signature = metadata.transaction_metadata.signature;
let accounts = instruction.accounts;
let signature = nested_instruction.metadata.transaction_metadata.signature;
let accounts = nested_instruction.instruction.accounts;

match instruction.data {
MoonshotInstruction::TokenMint(token_mint) => {
Expand Down
12 changes: 4 additions & 8 deletions examples/openbook-v2-alerts/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
async_trait::async_trait,
carbon_core::{
error::CarbonResult,
instruction::{DecodedInstruction, InstructionMetadata, NestedInstructions},
instruction::{DecodedInstruction, NestedInstruction},
metrics::MetricsCollection,
processor::Processor,
},
Expand Down Expand Up @@ -52,18 +52,14 @@ pub struct OpenbookV2InstructionProcessor;

#[async_trait]
impl Processor for OpenbookV2InstructionProcessor {
type InputType = (
InstructionMetadata,
DecodedInstruction<OpenbookV2Instruction>,
NestedInstructions,
);
type InputType = (NestedInstruction, DecodedInstruction<OpenbookV2Instruction>);

async fn process(
&mut self,
(metadata, instruction, _nested_instructions): Self::InputType,
(nested_instruction, instruction): Self::InputType,
_metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let signature = metadata.transaction_metadata.signature;
let signature = nested_instruction.metadata.transaction_metadata.signature;

match instruction.data {
OpenbookV2Instruction::CreateMarket(create_market) => {
Expand Down
1 change: 1 addition & 0 deletions examples/raydium-alerts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async-trait = { workspace = true }
dotenv = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
solana-account = { workspace = true }
tokio = { workspace = true, features = ["full"] }
yellowstone-grpc-proto = { workspace = true }

Expand Down
15 changes: 9 additions & 6 deletions examples/raydium-alerts/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
account::{AccountMetadata, DecodedAccount},
deserialize::ArrangeAccounts,
error::CarbonResult,
instruction::{DecodedInstruction, InstructionMetadata, NestedInstructions},
instruction::{DecodedInstruction, NestedInstruction},
metrics::MetricsCollection,
processor::Processor,
},
Expand Down Expand Up @@ -92,17 +92,16 @@ pub struct RaydiumAmmV4InstructionProcessor;
#[async_trait]
impl Processor for RaydiumAmmV4InstructionProcessor {
type InputType = (
InstructionMetadata,
NestedInstruction,
DecodedInstruction<RaydiumAmmV4Instruction>,
NestedInstructions,
);

async fn process(
&mut self,
(metadata, instruction, _nested_instructions): Self::InputType,
(nested_instruction, instruction): Self::InputType,
_metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let signature = metadata.transaction_metadata.signature;
let signature = nested_instruction.metadata.transaction_metadata.signature;
let accounts = instruction.accounts;

match instruction.data {
Expand Down Expand Up @@ -195,7 +194,11 @@ impl Processor for RaydiumAmmV4InstructionProcessor {
pub struct RaydiumAmmV4AccountProcessor;
#[async_trait]
impl Processor for RaydiumAmmV4AccountProcessor {
type InputType = (AccountMetadata, DecodedAccount<RaydiumAmmV4Account>);
type InputType = (
AccountMetadata,
DecodedAccount<RaydiumAmmV4Account>,
solana_account::Account,
);

async fn process(
&mut self,
Expand Down
Loading
Loading