@@ -12,14 +12,14 @@ use universalpubsub::{NextOutput, Subscriber};
1212
1313use  crate :: { 
1414	error:: { WorkflowError ,  WorkflowResult } , 
15- 	message:: { Message ,  NatsMessage ,   NatsMessageWrapper } , 
15+ 	message:: { Message ,  PubsubMessage ,   PubsubMessageWrapper } , 
1616	utils:: { self ,  tags:: AsTags } , 
1717} ; 
1818
1919#[ derive( Clone ) ]  
2020pub  struct  MessageCtx  { 
21- 	/// The connection used to communicate with NATS . 
22- nats :  UpsPool , 
21+ 	/// The connection used to communicate with pubsub . 
22+ pubsub :  UpsPool , 
2323
2424	ray_id :  Id , 
2525
@@ -35,7 +35,7 @@ impl MessageCtx {
3535		ray_id :  Id , 
3636	)  -> WorkflowResult < Self >  { 
3737		Ok ( MessageCtx  { 
38- 			nats :  pools. ups ( ) . map_err ( WorkflowError :: PoolsGeneric ) ?, 
38+ 			pubsub :  pools. ups ( ) . map_err ( WorkflowError :: PoolsGeneric ) ?, 
3939			ray_id, 
4040			config :  config. clone ( ) , 
4141		} ) 
@@ -44,7 +44,7 @@ impl MessageCtx {
4444
4545// MARK: Publishing messages 
4646impl  MessageCtx  { 
47- 	/// Publishes a message to NATS  and to a durable message stream if a topic is 
47+ 	/// Publishes a message to pubsub  and to a durable message stream if a topic is 
4848/// set. 
4949/// 
5050/// Use `subscribe` to consume these messages ephemerally and `tail` to read 
@@ -94,7 +94,7 @@ impl MessageCtx {
9494	where 
9595		M :  Message , 
9696	{ 
97- 		let  nats_subject  = M :: nats_subject ( ) ; 
97+ 		let  subject  = M :: subject ( ) ; 
9898		let  duration_since_epoch = std:: time:: SystemTime :: now ( ) 
9999			. duration_since ( std:: time:: UNIX_EPOCH ) 
100100			. unwrap_or_else ( |err| unreachable ! ( "time is broken: {}" ,  err) ) ; 
@@ -109,7 +109,7 @@ impl MessageCtx {
109109
110110		// Serialize message 
111111		let  req_id = Id :: new_v1 ( self . config . dc_label ( ) ) ; 
112- 		let  message = NatsMessageWrapper  { 
112+ 		let  message = PubsubMessageWrapper  { 
113113			req_id, 
114114			ray_id :  self . ray_id , 
115115			tags :  tags. as_tags ( ) ?, 
@@ -119,7 +119,7 @@ impl MessageCtx {
119119		let  message_buf = serde_json:: to_vec ( & message) . map_err ( WorkflowError :: SerializeMessage ) ?; 
120120
121121		tracing:: debug!( 
122- 			%nats_subject , 
122+ 			%subject , 
123123			body_bytes = ?body_buf_len, 
124124			message_bytes = ?message_buf. len( ) , 
125125			"publish message" 
@@ -128,15 +128,15 @@ impl MessageCtx {
128128		// It's important to write to the stream as fast as possible in order to 
129129		// ensure messages are handled quickly. 
130130		let  message_buf = Arc :: new ( message_buf) ; 
131- 		self . message_publish_nats :: < M > ( & nats_subject ,  message_buf) 
131+ 		self . message_publish_pubsub :: < M > ( & subject ,  message_buf) 
132132			. await ; 
133133
134134		Ok ( ( ) ) 
135135	} 
136136
137- 	/// Publishes the message to NATS . 
137+ 	/// Publishes the message to pubsub . 
138138#[ tracing:: instrument( level = "debug" ,  skip_all) ]  
139- 	async  fn  message_publish_nats < M > ( & self ,  nats_subject :  & str ,  message_buf :  Arc < Vec < u8 > > ) 
139+ 	async  fn  message_publish_pubsub < M > ( & self ,  subject :  & str ,  message_buf :  Arc < Vec < u8 > > ) 
140140	where 
141141		M :  Message , 
142142	{ 
@@ -146,19 +146,19 @@ impl MessageCtx {
146146			// Ignore for infinite backoff 
147147			backoff. tick ( ) . await ; 
148148
149- 			let  nats_subject  = nats_subject . to_owned ( ) ; 
149+ 			let  subject  = subject . to_owned ( ) ; 
150150
151151			tracing:: trace!( 
152- 				%nats_subject , 
152+ 				%subject , 
153153				message_len = message_buf. len( ) , 
154- 				"publishing message to nats " 
154+ 				"publishing message to pubsub " 
155155			) ; 
156- 			if  let  Err ( err)  = self . nats . publish ( & nats_subject ,  & ( * message_buf) ) . await  { 
156+ 			if  let  Err ( err)  = self . pubsub . publish ( & subject ,  & ( * message_buf) ) . await  { 
157157				tracing:: warn!( ?err,  "publish message failed, trying again" ) ; 
158158				continue ; 
159159			} 
160160
161- 			tracing:: debug!( "publish nats  message succeeded" ) ; 
161+ 			tracing:: debug!( "publish pubsub  message succeeded" ) ; 
162162			break ; 
163163		} 
164164	} 
@@ -172,21 +172,21 @@ impl MessageCtx {
172172
173173// MARK: Subscriptions 
174174impl  MessageCtx  { 
175- 	/// Listens for gasoline messages globally on NATS . 
175+ 	/// Listens for gasoline messages globally on pubsub . 
176176#[ tracing:: instrument( skip_all,  fields( message = M :: NAME ) ) ]  
177177	pub  async  fn  subscribe < M > ( & self ,  tags :  impl  AsTags )  -> WorkflowResult < SubscriptionHandle < M > > 
178178	where 
179179		M :  Message , 
180180	{ 
181181		self . subscribe_opt :: < M > ( SubscribeOpts  { 
182182			tags :  tags. as_tags ( ) ?, 
183- 			flush_nats :  true , 
183+ 			flush :  true , 
184184		} ) 
185185		. in_current_span ( ) 
186186		. await 
187187	} 
188188
189- 	/// Listens for gasoline messages globally on NATS . 
189+ 	/// Listens for gasoline messages globally on pubsub . 
190190#[ tracing:: instrument( skip_all,  fields( message = M :: NAME ) ) ]  
191191	pub  async  fn  subscribe_opt < M > ( 
192192		& self , 
@@ -195,32 +195,32 @@ impl MessageCtx {
195195	where 
196196		M :  Message , 
197197	{ 
198- 		let  nats_subject  = M :: nats_subject ( ) ; 
198+ 		let  subject  = M :: subject ( ) ; 
199199
200200		// Create subscription and flush immediately. 
201- 		tracing:: debug!( %nats_subject ,  tags = ?opts. tags,  "creating subscription" ) ; 
201+ 		tracing:: debug!( %subject ,  tags = ?opts. tags,  "creating subscription" ) ; 
202202		let  subscription = self 
203- 			. nats 
204- 			. subscribe ( & nats_subject ) 
203+ 			. pubsub 
204+ 			. subscribe ( & subject ) 
205205			. await 
206206			. map_err ( |x| WorkflowError :: CreateSubscription ( x. into ( ) ) ) ?; 
207- 		if  opts. flush_nats  { 
208- 			self . nats 
207+ 		if  opts. flush  { 
208+ 			self . pubsub 
209209				. flush ( ) 
210210				. await 
211- 				. map_err ( |x| WorkflowError :: FlushNats ( x. into ( ) ) ) ?; 
211+ 				. map_err ( |x| WorkflowError :: FlushPubsub ( x. into ( ) ) ) ?; 
212212		} 
213213
214214		// Return handle 
215- 		let  subscription = SubscriptionHandle :: new ( nats_subject ,  subscription,  opts. tags . clone ( ) ) ; 
215+ 		let  subscription = SubscriptionHandle :: new ( subject ,  subscription,  opts. tags . clone ( ) ) ; 
216216		Ok ( subscription) 
217217	} 
218218} 
219219
220220#[ derive( Debug ) ]  
221221pub  struct  SubscribeOpts  { 
222222	pub  tags :  serde_json:: Value , 
223- 	pub  flush_nats :  bool , 
223+ 	pub  flush :  bool , 
224224} 
225225
226226/// Used to receive messages from other contexts. 
@@ -291,15 +291,15 @@ where
291291/// 
292292/// This future can be safely dropped. 
293293#[ tracing:: instrument( name="message_next" ,  skip_all,  fields( message = M :: NAME ) ) ]  
294- 	pub  async  fn  next ( & mut  self )  -> WorkflowResult < NatsMessage < M > >  { 
294+ 	pub  async  fn  next ( & mut  self )  -> WorkflowResult < PubsubMessage < M > >  { 
295295		tracing:: debug!( "waiting for message" ) ; 
296296
297297		loop  { 
298298			// Poll the subscription. 
299299			// 
300300			// Use blocking threads instead of `try_next`, since I'm not sure 
301301			// try_next works as intended. 
302- 			let  nats_message  = match  self . subscription . next ( ) . await  { 
302+ 			let  message  = match  self . subscription . next ( ) . await  { 
303303				Ok ( NextOutput :: Message ( msg) )  => msg, 
304304				Ok ( NextOutput :: Unsubscribed )  => { 
305305					tracing:: debug!( "unsubscribed" ) ; 
@@ -311,11 +311,11 @@ where
311311				} 
312312			} ; 
313313
314- 			let  message_wrapper = NatsMessage :: < M > :: deserialize_wrapper ( & nats_message . payload ) ?; 
314+ 			let  message_wrapper = PubsubMessage :: < M > :: deserialize_wrapper ( & message . payload ) ?; 
315315
316316			// Check if the subscription tags match a subset of the message tags 
317317			if  utils:: is_value_subset ( & self . tags ,  & message_wrapper. tags )  { 
318- 				let  message = NatsMessage :: < M > :: deserialize_from_wrapper ( message_wrapper) ?; 
318+ 				let  message = PubsubMessage :: < M > :: deserialize_from_wrapper ( message_wrapper) ?; 
319319				tracing:: debug!( ?message,  "received message" ) ; 
320320
321321				return  Ok ( message) ; 
@@ -326,7 +326,7 @@ where
326326	} 
327327
328328	/// Converts the subscription in to a stream. 
329- pub  fn  into_stream ( self )  -> impl  futures_util:: Stream < Item  = WorkflowResult < NatsMessage < M > > >  { 
329+ pub  fn  into_stream ( self )  -> impl  futures_util:: Stream < Item  = WorkflowResult < PubsubMessage < M > > >  { 
330330		futures_util:: stream:: try_unfold ( self ,  |mut  sub| { 
331331			async  move  { 
332332				let  message = sub. next ( ) . await ?; 
0 commit comments