11use anyhow:: Result ;
2- use aws_sdk_s3:: Client as S3Client ;
3- use clap:: Parser ;
2+ use aws_config:: { BehaviorVersion , Region } ;
3+ use aws_credential_types:: Credentials ;
4+ use aws_sdk_s3:: { config:: Builder as S3ConfigBuilder , Client as S3Client } ;
5+ use clap:: { Parser , ValueEnum } ;
46use rdkafka:: consumer:: Consumer ;
57use tips_audit:: {
68 create_kafka_consumer, KafkaMempoolArchiver , KafkaMempoolReader , S3MempoolEventReaderWriter ,
79} ;
810use tracing:: { info, warn} ;
911use tracing_subscriber:: { layer:: SubscriberExt , util:: SubscriberInitExt } ;
1012
13+ #[ derive( Debug , Clone , ValueEnum ) ]
14+ enum S3ConfigType {
15+ Aws ,
16+ Manual ,
17+ }
18+
1119#[ derive( Parser , Debug ) ]
1220#[ command( author, version, about, long_about = None ) ]
1321struct Args {
@@ -25,6 +33,21 @@ struct Args {
2533
2634 #[ arg( long, env = "TIPS_AUDIT_LOG_LEVEL" , default_value = "info" ) ]
2735 log_level : String ,
36+
37+ #[ arg( long, env = "TIPS_AUDIT_S3_CONFIG_TYPE" , default_value = "aws" ) ]
38+ s3_config_type : S3ConfigType ,
39+
40+ #[ arg( long, env = "TIPS_AUDIT_S3_ENDPOINT" ) ]
41+ s3_endpoint : Option < String > ,
42+
43+ #[ arg( long, env = "TIPS_AUDIT_S3_REGION" , default_value = "us-east-1" ) ]
44+ s3_region : String ,
45+
46+ #[ arg( long, env = "TIPS_AUDIT_S3_ACCESS_KEY_ID" ) ]
47+ s3_access_key_id : Option < String > ,
48+
49+ #[ arg( long, env = "TIPS_AUDIT_S3_SECRET_ACCESS_KEY" ) ]
50+ s3_secret_access_key : Option < String > ,
2851}
2952
3053#[ tokio:: main]
@@ -67,15 +90,47 @@ async fn main() -> Result<()> {
6790 let consumer = create_kafka_consumer ( & args. kafka_brokers , & args. kafka_group_id ) ?;
6891 consumer. subscribe ( & [ & args. kafka_topic ] ) ?;
6992
70- let reader = KafkaMempoolReader :: new ( consumer, args. kafka_topic ) ?;
93+ let reader = KafkaMempoolReader :: new ( consumer, args. kafka_topic . clone ( ) ) ?;
7194
72- let config = aws_config :: load_defaults ( aws_config :: BehaviorVersion :: latest ( ) ) . await ;
73- let s3_client = S3Client :: new ( & config ) ;
74- let writer = S3MempoolEventReaderWriter :: new ( s3_client, args . s3_bucket ) ;
95+ let s3_client = create_s3_client ( & args ) . await ? ;
96+ let s3_bucket = args . s3_bucket . clone ( ) ;
97+ let writer = S3MempoolEventReaderWriter :: new ( s3_client, s3_bucket) ;
7598
7699 let mut archiver = KafkaMempoolArchiver :: new ( reader, writer) ;
77100
78101 info ! ( "Audit archiver initialized, starting main loop" ) ;
79102
80103 archiver. run ( ) . await
81104}
105+
106+ async fn create_s3_client ( args : & Args ) -> Result < S3Client > {
107+ match args. s3_config_type {
108+ S3ConfigType :: Manual => {
109+ let region = args. s3_region . clone ( ) ;
110+ let mut config_builder =
111+ aws_config:: defaults ( BehaviorVersion :: latest ( ) ) . region ( Region :: new ( region) ) ;
112+
113+ if let Some ( endpoint) = & args. s3_endpoint {
114+ config_builder = config_builder. endpoint_url ( endpoint) ;
115+ }
116+
117+ if let ( Some ( access_key) , Some ( secret_key) ) =
118+ ( & args. s3_access_key_id , & args. s3_secret_access_key )
119+ {
120+ let credentials = Credentials :: new ( access_key, secret_key, None , None , "manual" ) ;
121+ config_builder = config_builder. credentials_provider ( credentials) ;
122+ }
123+
124+ let config = config_builder. load ( ) . await ;
125+ let s3_config_builder = S3ConfigBuilder :: from ( & config) . force_path_style ( true ) ;
126+
127+ info ! ( message = "manually configuring s3 client" ) ;
128+ Ok ( S3Client :: from_conf ( s3_config_builder. build ( ) ) )
129+ }
130+ S3ConfigType :: Aws => {
131+ info ! ( message = "using aws s3 client" ) ;
132+ let config = aws_config:: load_defaults ( BehaviorVersion :: latest ( ) ) . await ;
133+ Ok ( S3Client :: new ( & config) )
134+ }
135+ }
136+ }
0 commit comments