11use arrow:: legacy:: time_zone:: Tz ;
22use arrow:: temporal_conversions:: * ;
3- use chrono:: NaiveDateTime ;
43#[ cfg( feature = "timezones" ) ]
5- use chrono:: TimeZone ;
4+ use chrono:: { Datelike , LocalResult , NaiveDateTime , TimeZone } ;
5+ #[ cfg( not( feature = "timezones" ) ) ]
6+ use chrono:: NaiveDateTime ;
67use now:: DateTimeNow ;
78use polars_core:: prelude:: * ;
89
910use crate :: prelude:: * ;
11+ #[ cfg( feature = "timezones" ) ]
12+ use crate :: utils:: unlocalize_datetime;
13+
14+ // Probe forward to find first valid time after a DST gap.
15+ // Handles variable DST offsets (30m or 60m).
16+ #[ cfg( feature = "timezones" ) ]
17+ fn resolve_nonexistent_ns ( local_ndt : NaiveDateTime , tz : & Tz ) -> i64 {
18+ for minutes in [ 30i64 , 60 , 90 , 120 ] {
19+ let probe = local_ndt + chrono:: Duration :: minutes ( minutes) ;
20+ match tz. from_local_datetime ( & probe) {
21+ LocalResult :: Single ( dt) => return datetime_to_timestamp_ns ( dt. naive_utc ( ) ) ,
22+ LocalResult :: Ambiguous ( earliest, _) => {
23+ return datetime_to_timestamp_ns ( earliest. naive_utc ( ) )
24+ } ,
25+ LocalResult :: None => continue ,
26+ }
27+ }
28+ datetime_to_timestamp_ns ( local_ndt) + 3_600_000_000_000
29+ }
30+
31+ #[ cfg( feature = "timezones" ) ]
32+ fn resolve_nonexistent_us ( local_ndt : NaiveDateTime , tz : & Tz ) -> i64 {
33+ for minutes in [ 30i64 , 60 , 90 , 120 ] {
34+ let probe = local_ndt + chrono:: Duration :: minutes ( minutes) ;
35+ match tz. from_local_datetime ( & probe) {
36+ LocalResult :: Single ( dt) => return datetime_to_timestamp_us ( dt. naive_utc ( ) ) ,
37+ LocalResult :: Ambiguous ( earliest, _) => {
38+ return datetime_to_timestamp_us ( earliest. naive_utc ( ) )
39+ } ,
40+ LocalResult :: None => continue ,
41+ }
42+ }
43+ datetime_to_timestamp_us ( local_ndt) + 3_600_000_000
44+ }
45+
46+ #[ cfg( feature = "timezones" ) ]
47+ fn resolve_nonexistent_ms ( local_ndt : NaiveDateTime , tz : & Tz ) -> i64 {
48+ for minutes in [ 30i64 , 60 , 90 , 120 ] {
49+ let probe = local_ndt + chrono:: Duration :: minutes ( minutes) ;
50+ match tz. from_local_datetime ( & probe) {
51+ LocalResult :: Single ( dt) => return datetime_to_timestamp_ms ( dt. naive_utc ( ) ) ,
52+ LocalResult :: Ambiguous ( earliest, _) => {
53+ return datetime_to_timestamp_ms ( earliest. naive_utc ( ) )
54+ } ,
55+ LocalResult :: None => continue ,
56+ }
57+ }
58+ datetime_to_timestamp_ms ( local_ndt) + 3_600_000
59+ }
60+
61+ #[ cfg( feature = "timezones" ) ]
62+ fn days_in_month ( year : i32 , month : u32 ) -> u32 {
63+ match month {
64+ 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31 ,
65+ 4 | 6 | 9 | 11 => 30 ,
66+ 2 => {
67+ if year % 4 == 0 && ( year % 100 != 0 || year % 400 == 0 ) {
68+ 29
69+ } else {
70+ 28
71+ }
72+ } ,
73+ _ => 30 ,
74+ }
75+ }
76+
77+ #[ cfg( feature = "timezones" ) ]
78+ fn add_duration_in_local_time ( local_dt : NaiveDateTime , duration : & Duration ) -> NaiveDateTime {
79+ if duration. months ( ) == 0 && duration. weeks ( ) == 0 && duration. days ( ) == 0 {
80+ local_dt + chrono:: Duration :: nanoseconds ( duration. duration_ns ( ) )
81+ } else {
82+ let days_to_add = duration. days ( ) + duration. weeks ( ) * 7 ;
83+ let months_to_add = duration. months ( ) ;
84+
85+ let mut result = local_dt;
86+
87+ if months_to_add != 0 {
88+ let month = result. month ( ) as i64 ;
89+ let year = result. year ( ) as i64 ;
90+ let total_months = year * 12 + month - 1 + months_to_add;
91+ let new_year = ( total_months / 12 ) as i32 ;
92+ let new_month = ( ( total_months % 12 ) + 1 ) as u32 ;
93+ let day = result. day ( ) . min ( days_in_month ( new_year, new_month) ) ;
94+ result = result
95+ . with_year ( new_year)
96+ . and_then ( |d| d. with_month ( new_month) )
97+ . and_then ( |d| d. with_day ( day) )
98+ . unwrap_or ( result) ;
99+ }
100+
101+ if days_to_add != 0 {
102+ result = result + chrono:: Duration :: days ( days_to_add) ;
103+ }
104+
105+ result + chrono:: Duration :: nanoseconds ( duration. nanoseconds ( ) )
106+ }
107+ }
108+
109+ // Add duration, handling DST transitions:
110+ // - Ambiguous times (fall back): use earliest
111+ // - Non-existent times (spring forward): find next valid time
112+ fn add_duration_ns_with_dst_handling ( duration : & Duration , t : i64 , tz : Option < & Tz > ) -> i64 {
113+ match duration. add_ns ( t, tz) {
114+ Ok ( result) => result,
115+ Err ( _) => {
116+ #[ cfg( feature = "timezones" ) ]
117+ if let Some ( tz) = tz {
118+ let utc_dt = timestamp_ns_to_datetime ( t) ;
119+ let local_dt = unlocalize_datetime ( utc_dt, tz) ;
120+ let result_local = add_duration_in_local_time ( local_dt, duration) ;
121+
122+ return match tz. from_local_datetime ( & result_local) {
123+ LocalResult :: Single ( dt) => datetime_to_timestamp_ns ( dt. naive_utc ( ) ) ,
124+ LocalResult :: Ambiguous ( earliest, _) => {
125+ datetime_to_timestamp_ns ( earliest. naive_utc ( ) )
126+ } ,
127+ LocalResult :: None => resolve_nonexistent_ns ( result_local, tz) ,
128+ } ;
129+ }
130+ t + duration. duration_ns ( )
131+ } ,
132+ }
133+ }
134+
135+ fn add_duration_us_with_dst_handling ( duration : & Duration , t : i64 , tz : Option < & Tz > ) -> i64 {
136+ match duration. add_us ( t, tz) {
137+ Ok ( result) => result,
138+ Err ( _) => {
139+ #[ cfg( feature = "timezones" ) ]
140+ if let Some ( tz) = tz {
141+ let utc_dt = timestamp_us_to_datetime ( t) ;
142+ let local_dt = unlocalize_datetime ( utc_dt, tz) ;
143+ let result_local = add_duration_in_local_time ( local_dt, duration) ;
144+
145+ return match tz. from_local_datetime ( & result_local) {
146+ LocalResult :: Single ( dt) => datetime_to_timestamp_us ( dt. naive_utc ( ) ) ,
147+ LocalResult :: Ambiguous ( earliest, _) => {
148+ datetime_to_timestamp_us ( earliest. naive_utc ( ) )
149+ } ,
150+ LocalResult :: None => resolve_nonexistent_us ( result_local, tz) ,
151+ } ;
152+ }
153+ t + duration. duration_us ( )
154+ } ,
155+ }
156+ }
157+
158+ fn add_duration_ms_with_dst_handling ( duration : & Duration , t : i64 , tz : Option < & Tz > ) -> i64 {
159+ match duration. add_ms ( t, tz) {
160+ Ok ( result) => result,
161+ Err ( _) => {
162+ #[ cfg( feature = "timezones" ) ]
163+ if let Some ( tz) = tz {
164+ let utc_dt = timestamp_ms_to_datetime ( t) ;
165+ let local_dt = unlocalize_datetime ( utc_dt, tz) ;
166+ let result_local = add_duration_in_local_time ( local_dt, duration) ;
167+
168+ return match tz. from_local_datetime ( & result_local) {
169+ LocalResult :: Single ( dt) => datetime_to_timestamp_ms ( dt. naive_utc ( ) ) ,
170+ LocalResult :: Ambiguous ( earliest, _) => {
171+ datetime_to_timestamp_ms ( earliest. naive_utc ( ) )
172+ } ,
173+ LocalResult :: None => resolve_nonexistent_ms ( result_local, tz) ,
174+ } ;
175+ }
176+ t + duration. duration_ms ( )
177+ } ,
178+ }
179+ }
10180
11181/// Ensure that earliest datapoint (`t`) is in, or in front of, first window.
12182///
@@ -346,19 +516,41 @@ impl Iterator for BoundsIter<'_> {
346516 if self . bi . start < self . boundary . stop {
347517 let out = self . bi ;
348518 match self . tu {
349- // TODO: find some way to propagate error instead of unwrapping?
350- // Issue is that `next` needs to return `Option`.
351519 TimeUnit :: Nanoseconds => {
352- self . bi . start = self . window . every . add_ns ( self . bi . start , self . tz ) . unwrap ( ) ;
353- self . bi . stop = self . window . period . add_ns ( self . bi . start , self . tz ) . unwrap ( ) ;
520+ self . bi . start = add_duration_ns_with_dst_handling (
521+ & self . window . every ,
522+ self . bi . start ,
523+ self . tz ,
524+ ) ;
525+ self . bi . stop = add_duration_ns_with_dst_handling (
526+ & self . window . period ,
527+ self . bi . start ,
528+ self . tz ,
529+ ) ;
354530 } ,
355531 TimeUnit :: Microseconds => {
356- self . bi . start = self . window . every . add_us ( self . bi . start , self . tz ) . unwrap ( ) ;
357- self . bi . stop = self . window . period . add_us ( self . bi . start , self . tz ) . unwrap ( ) ;
532+ self . bi . start = add_duration_us_with_dst_handling (
533+ & self . window . every ,
534+ self . bi . start ,
535+ self . tz ,
536+ ) ;
537+ self . bi . stop = add_duration_us_with_dst_handling (
538+ & self . window . period ,
539+ self . bi . start ,
540+ self . tz ,
541+ ) ;
358542 } ,
359543 TimeUnit :: Milliseconds => {
360- self . bi . start = self . window . every . add_ms ( self . bi . start , self . tz ) . unwrap ( ) ;
361- self . bi . stop = self . window . period . add_ms ( self . bi . start , self . tz ) . unwrap ( ) ;
544+ self . bi . start = add_duration_ms_with_dst_handling (
545+ & self . window . every ,
546+ self . bi . start ,
547+ self . tz ,
548+ ) ;
549+ self . bi . stop = add_duration_ms_with_dst_handling (
550+ & self . window . period ,
551+ self . bi . start ,
552+ self . tz ,
553+ ) ;
362554 } ,
363555 }
364556 Some ( out)
@@ -372,22 +564,40 @@ impl Iterator for BoundsIter<'_> {
372564 if self . bi . start < self . boundary . stop {
373565 match self . tu {
374566 TimeUnit :: Nanoseconds => {
375- self . bi . start = ( self . window . every * n)
376- . add_ns ( self . bi . start , self . tz )
377- . unwrap ( ) ;
378- self . bi . stop = ( self . window . period ) . add_ns ( self . bi . start , self . tz ) . unwrap ( ) ;
567+ self . bi . start = add_duration_ns_with_dst_handling (
568+ & ( self . window . every * n) ,
569+ self . bi . start ,
570+ self . tz ,
571+ ) ;
572+ self . bi . stop = add_duration_ns_with_dst_handling (
573+ & self . window . period ,
574+ self . bi . start ,
575+ self . tz ,
576+ ) ;
379577 } ,
380578 TimeUnit :: Microseconds => {
381- self . bi . start = ( self . window . every * n)
382- . add_us ( self . bi . start , self . tz )
383- . unwrap ( ) ;
384- self . bi . stop = ( self . window . period ) . add_us ( self . bi . start , self . tz ) . unwrap ( ) ;
579+ self . bi . start = add_duration_us_with_dst_handling (
580+ & ( self . window . every * n) ,
581+ self . bi . start ,
582+ self . tz ,
583+ ) ;
584+ self . bi . stop = add_duration_us_with_dst_handling (
585+ & self . window . period ,
586+ self . bi . start ,
587+ self . tz ,
588+ ) ;
385589 } ,
386590 TimeUnit :: Milliseconds => {
387- self . bi . start = ( self . window . every * n)
388- . add_ms ( self . bi . start , self . tz )
389- . unwrap ( ) ;
390- self . bi . stop = ( self . window . period ) . add_ms ( self . bi . start , self . tz ) . unwrap ( ) ;
591+ self . bi . start = add_duration_ms_with_dst_handling (
592+ & ( self . window . every * n) ,
593+ self . bi . start ,
594+ self . tz ,
595+ ) ;
596+ self . bi . stop = add_duration_ms_with_dst_handling (
597+ & self . window . period ,
598+ self . bi . start ,
599+ self . tz ,
600+ ) ;
391601 } ,
392602 }
393603 self . next ( )
0 commit comments