@@ -27,11 +27,13 @@ use databend_common_expression::FromData;
2727use databend_common_meta_app:: principal:: AutoIncrementKey ;
2828use databend_common_meta_app:: schema:: GetAutoIncrementNextValueReq ;
2929use databend_common_meta_app:: schema:: GetSequenceNextValueReq ;
30+ use databend_common_meta_app:: schema:: GetSequenceReply ;
3031use databend_common_meta_app:: schema:: GetSequenceReq ;
3132use databend_common_meta_app:: schema:: SequenceIdent ;
3233use databend_common_pipeline_transforms:: processors:: AsyncTransform ;
3334use databend_common_sql:: binder:: AsyncFunctionDesc ;
3435use databend_common_storages_fuse:: TableContext ;
36+ use databend_common_users:: GrantObjectVisibilityChecker ;
3537use databend_common_users:: Object ;
3638
3739use crate :: pipelines:: processors:: transforms:: transform_dictionary:: DictionaryOperator ;
@@ -135,26 +137,30 @@ impl TransformAsyncFunction {
135137 } else {
136138 // Get or create the sequence counter
137139 let counter = counter_lock. read ( ) . await ;
140+ let fn_range_collect = |start : u64 , end : u64 , step : i64 | {
141+ ( 0 ..end - start)
142+ . map ( |num| start + num * step as u64 )
143+ . collect :: < Vec < _ > > ( )
144+ } ;
145+ // We need to fetch more sequence numbers
146+ let catalog = ctx. get_default_catalog ( ) ?;
138147
139148 // Try to reserve sequence numbers from the counter
140149 if let Some ( ( start, _end) ) = counter. try_reserve ( count) {
150+ let step = fetcher. step ( & ctx, & catalog) . await ?;
141151 // We have enough sequence numbers in the current batch
142- let range = start..start + count;
143- UInt64Type :: from_data ( range. collect :: < Vec < u64 > > ( ) )
152+ UInt64Type :: from_data ( fn_range_collect ( start, start + count, step) )
144153 } else {
145154 // drop the read lock and get the write lock
146155 drop ( counter) ;
147156 let counter = counter_lock. write ( ) . await ;
148157 {
149158 // try reserve again
150159 if let Some ( ( start, _end) ) = counter. try_reserve ( count) {
160+ let step = fetcher. step ( & ctx, & catalog) . await ?;
151161 // We have enough sequence numbers in the current batch
152- let range = start..start + count;
153- UInt64Type :: from_data ( range. collect :: < Vec < u64 > > ( ) )
162+ UInt64Type :: from_data ( fn_range_collect ( start, count, step) )
154163 } else {
155- // We need to fetch more sequence numbers
156- let catalog = ctx. get_default_catalog ( ) ?;
157-
158164 // Get current state of the counter
159165 let current = counter. current . load ( Ordering :: Relaxed ) ;
160166 let max = counter. max . load ( Ordering :: Relaxed ) ;
@@ -163,7 +169,11 @@ impl TransformAsyncFunction {
163169 let remaining = max. saturating_sub ( current) ;
164170 let to_fetch = count. saturating_sub ( remaining) ;
165171
166- let ( start, batch_size) = fetcher. fetch ( & ctx, & catalog, to_fetch) . await ?;
172+ let NextValFetchResult {
173+ start,
174+ batch_size,
175+ step,
176+ } = fetcher. fetch ( & ctx, & catalog, to_fetch) . await ?;
167177
168178 // If we have remaining numbers, use them first
169179 if remaining > 0 {
@@ -175,14 +185,16 @@ impl TransformAsyncFunction {
175185
176186 // Add the remaining numbers
177187 let remaining_to_use = remaining. min ( count) ;
178- numbers. extend (
179- ( current..current + remaining_to_use) . collect :: < Vec < u64 > > ( ) ,
180- ) ;
188+ numbers. extend ( fn_range_collect (
189+ current,
190+ current + remaining_to_use,
191+ step,
192+ ) ) ;
181193
182194 // Add numbers from the new batch if needed
183195 if remaining_to_use < count {
184196 let new_needed = count - remaining_to_use;
185- numbers. extend ( ( start.. start + new_needed) . collect :: < Vec < u64 > > ( ) ) ;
197+ numbers. extend ( fn_range_collect ( start, start + new_needed, step ) ) ;
186198 // Update the counter to reflect that we've used some of the new batch
187199 counter. current . store ( start + new_needed, Ordering :: SeqCst ) ;
188200 }
@@ -192,8 +204,7 @@ impl TransformAsyncFunction {
192204 // No remaining numbers, just use the new batch
193205 counter. update_batch ( start + count, batch_size - count) ;
194206 // Return the sequence numbers needed for this request
195- let range = start..start + count;
196- UInt64Type :: from_data ( range. collect :: < Vec < u64 > > ( ) )
207+ UInt64Type :: from_data ( fn_range_collect ( start, start + count, step) )
197208 }
198209 }
199210 }
@@ -211,7 +222,15 @@ pub trait NextValFetcher {
211222 ctx : & QueryContext ,
212223 catalog : & Arc < dyn Catalog > ,
213224 to_fetch : u64 ,
214- ) -> Result < ( u64 /* start */ , u64 /* batch */ ) > ;
225+ ) -> Result < NextValFetchResult > ;
226+
227+ async fn step ( & self , ctx : & QueryContext , catalog : & Arc < dyn Catalog > ) -> Result < i64 > ;
228+ }
229+
230+ pub struct NextValFetchResult {
231+ start : u64 ,
232+ batch_size : u64 ,
233+ step : i64 ,
215234}
216235
217236pub struct SequenceNextValFetcher {
@@ -224,20 +243,8 @@ impl NextValFetcher for SequenceNextValFetcher {
224243 ctx : & QueryContext ,
225244 catalog : & Arc < dyn Catalog > ,
226245 to_fetch : u64 ,
227- ) -> Result < ( u64 , u64 ) > {
228- let visibility_checker = if ctx
229- . get_settings ( )
230- . get_enable_experimental_sequence_privilege_check ( ) ?
231- {
232- Some ( ctx. get_visibility_checker ( false , Object :: Sequence ) . await ?)
233- } else {
234- None
235- } ;
236-
237- let req = GetSequenceReq {
238- ident : self . sequence_ident . clone ( ) ,
239- } ;
240- let resp = catalog. get_sequence ( req, & visibility_checker) . await ?;
246+ ) -> Result < NextValFetchResult > {
247+ let ( resp, visibility_checker) = self . get_sequence ( ctx, catalog) . await ?;
241248 let step_size = resp. meta . step as u64 ;
242249
243250 // Calculate batch size - take the larger of count or step_size
@@ -252,7 +259,42 @@ impl NextValFetcher for SequenceNextValFetcher {
252259 let resp = catalog
253260 . get_sequence_next_value ( req, & visibility_checker)
254261 . await ?;
255- Ok ( ( resp. start , batch_size) )
262+ Ok ( NextValFetchResult {
263+ start : resp. start ,
264+ batch_size,
265+ step : resp. step ,
266+ } )
267+ }
268+
269+ async fn step ( & self , ctx : & QueryContext , catalog : & Arc < dyn Catalog > ) -> Result < i64 > {
270+ self . get_sequence ( ctx, catalog)
271+ . await
272+ . map ( |( resp, _) | resp. meta . step )
273+ }
274+ }
275+
276+ impl SequenceNextValFetcher {
277+ async fn get_sequence (
278+ & self ,
279+ ctx : & QueryContext ,
280+ catalog : & Arc < dyn Catalog > ,
281+ ) -> Result < ( GetSequenceReply , Option < GrantObjectVisibilityChecker > ) > {
282+ let visibility_checker = if ctx
283+ . get_settings ( )
284+ . get_enable_experimental_sequence_privilege_check ( ) ?
285+ {
286+ Some ( ctx. get_visibility_checker ( false , Object :: Sequence ) . await ?)
287+ } else {
288+ None
289+ } ;
290+
291+ let req = GetSequenceReq {
292+ ident : self . sequence_ident . clone ( ) ,
293+ } ;
294+ catalog
295+ . get_sequence ( req, & visibility_checker)
296+ . await
297+ . map ( |reply| ( reply, visibility_checker) )
256298 }
257299}
258300
@@ -267,24 +309,31 @@ impl NextValFetcher for AutoIncrementNextValFetcher {
267309 ctx : & QueryContext ,
268310 catalog : & Arc < dyn Catalog > ,
269311 to_fetch : u64 ,
270- ) -> Result < ( u64 , u64 ) > {
312+ ) -> Result < NextValFetchResult > {
271313 let step_size = self . expr . step as u64 ;
272314
273315 // Calculate batch size - take the larger of count or step_size
274316 let batch_size = to_fetch. max ( step_size) ;
317+ let step = self . expr . step ;
275318
276319 // Calculate batch size - take the larger of count or step_size
277320 let req = GetAutoIncrementNextValueReq {
278321 tenant : ctx. get_tenant ( ) ,
279322 key : self . key ,
280323 expr : self . expr ,
281- // FIXME: count * step
282- // count: batch_size,
283- count : 1 ,
324+ count : batch_size,
284325 } ;
285326
286327 let resp = catalog. get_autoincrement_next_value ( req) . await ?;
287- Ok ( ( resp. start , batch_size) )
328+ Ok ( NextValFetchResult {
329+ start : resp. start ,
330+ batch_size,
331+ step,
332+ } )
333+ }
334+
335+ async fn step ( & self , _ctx : & QueryContext , _catalog : & Arc < dyn Catalog > ) -> Result < i64 > {
336+ Ok ( self . expr . step )
288337 }
289338}
290339
0 commit comments