@@ -55,6 +55,30 @@ pub async fn user(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
55
55
. send ( )
56
56
. await ?;
57
57
} ,
58
+ Main :: ProfileSet ( sig) => {
59
+ let res = ctx. activity ( ProfileSetInput {
60
+ user_id,
61
+ display_name : sig. display_name ,
62
+ account_number : sig. account_number ,
63
+ bio : sig. bio
64
+ } ) . await ?;
65
+
66
+ ctx. msg ( ProfileSetStatus { res : res. clone ( ) } )
67
+ . tag ( "user_id" , user_id)
68
+ . send ( )
69
+ . await ?;
70
+
71
+ if res. is_ok ( ) {
72
+ ctx. activity ( PublishProfileSetAnalyticsInput {
73
+ user_id
74
+ } ) . await ?;
75
+
76
+ ctx. msg ( Update { } )
77
+ . tag ( "user_id" , user_id)
78
+ . send ( )
79
+ . await ?;
80
+ }
81
+ } ,
58
82
Main :: Delete ( _) => {
59
83
return Ok ( Loop :: Break ( ( ) ) ) ;
60
84
} ,
@@ -117,6 +141,121 @@ async fn admin_set(ctx: &ActivityCtx, input: &AdminSetInput) -> GlobalResult<()>
117
141
Ok ( ( ) )
118
142
}
119
143
144
+ // ProfileSet
145
+ #[ derive( Debug , Clone , Serialize , Deserialize , Hash ) ]
146
+ struct ProfileSetInput {
147
+ user_id : Uuid ,
148
+ display_name : Option < String > ,
149
+ account_number : Option < u32 > ,
150
+ bio : Option < String > ,
151
+ }
152
+
153
+ #[ derive( Debug , Clone , Serialize , Deserialize , Hash ) ]
154
+ pub enum ProfileSetError {
155
+ ValidationFailure ,
156
+ MissingParameters ,
157
+ }
158
+
159
+ #[ activity( ProfileSetActivity ) ]
160
+ async fn profile_set ( ctx : & ActivityCtx , input : & ProfileSetInput ) -> GlobalResult < Result < ( ) , ProfileSetError > > {
161
+ let mut query_components = Vec :: new ( ) ;
162
+
163
+ // Check if each component exists
164
+ if input. display_name . is_some ( ) {
165
+ query_components. push ( format ! ( "display_name = ${}" , query_components. len( ) + 2 ) ) ;
166
+ }
167
+ if input. account_number . is_some ( ) {
168
+ query_components. push ( format ! ( "account_number = ${}" , query_components. len( ) + 2 ) ) ;
169
+ }
170
+ if input. bio . is_some ( ) {
171
+ query_components. push ( format ! ( "bio = ${}" , query_components. len( ) + 2 ) ) ;
172
+ }
173
+
174
+ let val = !query_components. is_empty ( ) ;
175
+ if !val {
176
+ return Ok ( Err ( ProfileSetError :: MissingParameters ) ) ;
177
+ }
178
+
179
+ let validation_res = ctx. op ( crate :: ops:: profile_validate:: Input {
180
+ user_id : input. user_id ,
181
+ display_name : input. display_name . clone ( ) ,
182
+ account_number : input. account_number ,
183
+ bio : input. bio . clone ( )
184
+ } )
185
+ . await ?;
186
+
187
+ if !validation_res. errors . is_empty ( ) {
188
+ tracing:: warn!( errors = ?validation_res. errors, "validation errors" ) ;
189
+
190
+ return Ok ( Err ( ProfileSetError :: ValidationFailure ) ) ;
191
+ }
192
+
193
+ ctx. cache ( ) . purge ( "user" , [ input. user_id ] ) . await ?;
194
+
195
+ // Build query
196
+ let built_query = query_components. join ( "," ) ;
197
+ let query_string = format ! (
198
+ "UPDATE db_user.users SET {} WHERE user_id = $1" ,
199
+ built_query
200
+ ) ;
201
+
202
+ // TODO: Convert this to sql_execute! macro
203
+ let query = sqlx:: query ( & query_string) . bind ( input. user_id ) ;
204
+
205
+ // Bind display name
206
+ let query = if let Some ( display_name) = & input. display_name {
207
+ query. bind ( display_name)
208
+ } else {
209
+ query
210
+ } ;
211
+
212
+ // Bind account number
213
+ let query = if let Some ( account_number) = input. account_number {
214
+ query. bind ( account_number as i64 )
215
+ } else {
216
+ query
217
+ } ;
218
+
219
+ // Bind bio
220
+ let query = if let Some ( bio) = & input. bio {
221
+ query. bind ( util:: format:: biography ( bio) )
222
+ } else {
223
+ query
224
+ } ;
225
+
226
+ query. execute ( & ctx. crdb ( ) . await ?) . await ?;
227
+
228
+ Ok ( Ok ( ( ) ) )
229
+ }
230
+
231
+ #[ derive( Debug , Clone , Serialize , Deserialize , Hash ) ]
232
+ struct PublishProfileSetAnalyticsInput {
233
+ user_id : Uuid
234
+ }
235
+
236
+ #[ activity( PublishProfileSetAnalytics ) ]
237
+ async fn publish_profile_set_analytics (
238
+ ctx : & ActivityCtx ,
239
+ input : & PublishProfileSetAnalyticsInput
240
+ ) -> GlobalResult < ( ) > {
241
+ msg ! ( [ ctx] analytics:: msg:: event_create( ) {
242
+ events: vec![
243
+ analytics:: msg:: event_create:: Event {
244
+ event_id: Some ( Uuid :: new_v4( ) . into( ) ) ,
245
+ name: "user.profile_set" . into( ) ,
246
+ properties_json: Some ( serde_json:: to_string( & json!( {
247
+ "user_id" : input. user_id. to_string( )
248
+ } ) ) ?) ,
249
+ ..Default :: default ( )
250
+ } ,
251
+ ] ,
252
+ } )
253
+ . await ?;
254
+
255
+ Ok ( ( ) )
256
+ }
257
+
258
+
120
259
// Creation
121
260
#[ derive( Debug , Clone , Serialize , Deserialize , Hash ) ]
122
261
struct InsertDbInput {
@@ -362,14 +501,30 @@ pub struct Update {}
362
501
#[ message( "user_delete_complete" ) ]
363
502
pub struct DeleteComplete { }
364
503
504
+ #[ message( "user_event" ) ]
505
+ pub struct Event { }
506
+
507
+ #[ message( "user_profile_set_status" ) ]
508
+ pub struct ProfileSetStatus {
509
+ pub res : Result < ( ) , ProfileSetError > ,
510
+ }
511
+
365
512
#[ signal( "user_admin_set" ) ]
366
513
pub struct AdminSet { }
367
514
515
+ #[ signal( "user_profile_set" ) ]
516
+ pub struct ProfileSet {
517
+ pub display_name : Option < String > ,
518
+ pub account_number : Option < u32 > ,
519
+ pub bio : Option < String > ,
520
+ }
521
+
368
522
#[ signal( "user_delete" ) ]
369
523
pub struct Delete { }
370
524
371
525
join_signal ! ( Main {
372
526
AdminSet ,
527
+ ProfileSet ,
373
528
Delete ,
374
529
} ) ;
375
530
0 commit comments