11import Atomics
22import NIOEmbedded
3- import Dispatch
3+ import NIOPosix
44import XCTest
55@testable import PostgresNIO
66import NIOCore
77import Logging
88
99final class PostgresRowSequenceTests : XCTestCase {
1010 let logger = Logger ( label: " PSQLRowStreamTests " )
11- let eventLoop = EmbeddedEventLoop ( )
1211
1312 func testBackpressureWorks( ) async throws {
1413 let dataSource = MockRowDataSource ( )
14+ let embeddedEventLoop = EmbeddedEventLoop ( )
1515 let stream = PSQLRowStream (
1616 source: . stream(
1717 [
1818 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
1919 ] ,
2020 dataSource
2121 ) ,
22- eventLoop: self . eventLoop ,
22+ eventLoop: embeddedEventLoop ,
2323 logger: self . logger
2424 )
2525
@@ -41,14 +41,15 @@ final class PostgresRowSequenceTests: XCTestCase {
4141
4242 func testCancellationWorksWhileIterating( ) async throws {
4343 let dataSource = MockRowDataSource ( )
44+ let embeddedEventLoop = EmbeddedEventLoop ( )
4445 let stream = PSQLRowStream (
4546 source: . stream(
4647 [
4748 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
4849 ] ,
4950 dataSource
5051 ) ,
51- eventLoop: self . eventLoop ,
52+ eventLoop: embeddedEventLoop ,
5253 logger: self . logger
5354 )
5455
@@ -72,14 +73,15 @@ final class PostgresRowSequenceTests: XCTestCase {
7273
7374 func testCancellationWorksBeforeIterating( ) async throws {
7475 let dataSource = MockRowDataSource ( )
76+ let embeddedEventLoop = EmbeddedEventLoop ( )
7577 let stream = PSQLRowStream (
7678 source: . stream(
7779 [
7880 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
7981 ] ,
8082 dataSource
8183 ) ,
82- eventLoop: self . eventLoop ,
84+ eventLoop: embeddedEventLoop ,
8385 logger: self . logger
8486 )
8587
@@ -97,14 +99,15 @@ final class PostgresRowSequenceTests: XCTestCase {
9799
98100 func testDroppingTheSequenceCancelsTheSource( ) async throws {
99101 let dataSource = MockRowDataSource ( )
102+ let embeddedEventLoop = EmbeddedEventLoop ( )
100103 let stream = PSQLRowStream (
101104 source: . stream(
102105 [
103106 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
104107 ] ,
105108 dataSource
106109 ) ,
107- eventLoop: self . eventLoop ,
110+ eventLoop: embeddedEventLoop ,
108111 logger: self . logger
109112 )
110113
@@ -117,14 +120,15 @@ final class PostgresRowSequenceTests: XCTestCase {
117120
118121 func testStreamBasedOnCompletedQuery( ) async throws {
119122 let dataSource = MockRowDataSource ( )
123+ let embeddedEventLoop = EmbeddedEventLoop ( )
120124 let stream = PSQLRowStream (
121125 source: . stream(
122126 [
123127 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
124128 ] ,
125129 dataSource
126130 ) ,
127- eventLoop: self . eventLoop ,
131+ eventLoop: embeddedEventLoop ,
128132 logger: self . logger
129133 )
130134
@@ -144,14 +148,15 @@ final class PostgresRowSequenceTests: XCTestCase {
144148
145149 func testStreamIfInitializedWithAllData( ) async throws {
146150 let dataSource = MockRowDataSource ( )
151+ let embeddedEventLoop = EmbeddedEventLoop ( )
147152 let stream = PSQLRowStream (
148153 source: . stream(
149154 [
150155 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
151156 ] ,
152157 dataSource
153158 ) ,
154- eventLoop: self . eventLoop ,
159+ eventLoop: embeddedEventLoop ,
155160 logger: self . logger
156161 )
157162
@@ -172,14 +177,15 @@ final class PostgresRowSequenceTests: XCTestCase {
172177
173178 func testStreamIfInitializedWithError( ) async throws {
174179 let dataSource = MockRowDataSource ( )
180+ let embeddedEventLoop = EmbeddedEventLoop ( )
175181 let stream = PSQLRowStream (
176182 source: . stream(
177183 [
178184 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
179185 ] ,
180186 dataSource
181187 ) ,
182- eventLoop: self . eventLoop ,
188+ eventLoop: embeddedEventLoop ,
183189 logger: self . logger
184190 )
185191
@@ -200,29 +206,30 @@ final class PostgresRowSequenceTests: XCTestCase {
200206
201207 func testSucceedingRowContinuationsWorks( ) async throws {
202208 let dataSource = MockRowDataSource ( )
209+ let eventLoop = NIOSingletons . posixEventLoopGroup. next ( )
203210 let stream = PSQLRowStream (
204211 source: . stream(
205212 [
206213 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
207214 ] ,
208215 dataSource
209216 ) ,
210- eventLoop: self . eventLoop,
217+ eventLoop: eventLoop,
211218 logger: self . logger
212219 )
213220
214- let rowSequence = stream. asyncSequence ( )
221+ let rowSequence = try await eventLoop . submit { stream. asyncSequence ( ) } . get ( )
215222 var rowIterator = rowSequence. makeAsyncIterator ( )
216223
217- DispatchQueue . main . asyncAfter ( deadline : . now ( ) + . seconds( 1 ) ) {
224+ eventLoop . scheduleTask ( in : . seconds( 1 ) ) {
218225 let dataRows : [ DataRow ] = ( 0 ..< 1 ) . map { [ ByteBuffer ( integer: Int64 ( $0) ) ] }
219226 stream. receive ( dataRows)
220227 }
221228
222229 let row1 = try await rowIterator. next ( )
223230 XCTAssertEqual ( try row1? . decode ( Int . self) , 0 )
224231
225- DispatchQueue . main . asyncAfter ( deadline : . now ( ) + . seconds( 1 ) ) {
232+ eventLoop . scheduleTask ( in : . seconds( 1 ) ) {
226233 stream. receive ( completion: . success( " SELECT 1 " ) )
227234 }
228235
@@ -232,29 +239,30 @@ final class PostgresRowSequenceTests: XCTestCase {
232239
233240 func testFailingRowContinuationsWorks( ) async throws {
234241 let dataSource = MockRowDataSource ( )
242+ let eventLoop = NIOSingletons . posixEventLoopGroup. next ( )
235243 let stream = PSQLRowStream (
236244 source: . stream(
237245 [
238246 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
239247 ] ,
240248 dataSource
241249 ) ,
242- eventLoop: self . eventLoop,
250+ eventLoop: eventLoop,
243251 logger: self . logger
244252 )
245253
246- let rowSequence = stream. asyncSequence ( )
254+ let rowSequence = try await eventLoop . submit { stream. asyncSequence ( ) } . get ( )
247255 var rowIterator = rowSequence. makeAsyncIterator ( )
248256
249- DispatchQueue . main . asyncAfter ( deadline : . now ( ) + . seconds( 1 ) ) {
257+ eventLoop . scheduleTask ( in : . seconds( 1 ) ) {
250258 let dataRows : [ DataRow ] = ( 0 ..< 1 ) . map { [ ByteBuffer ( integer: Int64 ( $0) ) ] }
251259 stream. receive ( dataRows)
252260 }
253261
254262 let row1 = try await rowIterator. next ( )
255263 XCTAssertEqual ( try row1? . decode ( Int . self) , 0 )
256264
257- DispatchQueue . main . asyncAfter ( deadline : . now ( ) + . seconds( 1 ) ) {
265+ eventLoop . scheduleTask ( in : . seconds( 1 ) ) {
258266 stream. receive ( completion: . failure( PSQLError . serverClosedConnection ( underlying: nil ) ) )
259267 }
260268
@@ -268,14 +276,15 @@ final class PostgresRowSequenceTests: XCTestCase {
268276
269277 func testAdaptiveRowBufferShrinksAndGrows( ) async throws {
270278 let dataSource = MockRowDataSource ( )
279+ let embeddedEventLoop = EmbeddedEventLoop ( )
271280 let stream = PSQLRowStream (
272281 source: . stream(
273282 [
274283 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
275284 ] ,
276285 dataSource
277286 ) ,
278- eventLoop: self . eventLoop ,
287+ eventLoop: embeddedEventLoop ,
279288 logger: self . logger
280289 )
281290
@@ -332,14 +341,15 @@ final class PostgresRowSequenceTests: XCTestCase {
332341
333342 func testAdaptiveRowShrinksToMin( ) async throws {
334343 let dataSource = MockRowDataSource ( )
344+ let embeddedEventLoop = EmbeddedEventLoop ( )
335345 let stream = PSQLRowStream (
336346 source: . stream(
337347 [
338348 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
339349 ] ,
340350 dataSource
341351 ) ,
342- eventLoop: self . eventLoop ,
352+ eventLoop: embeddedEventLoop ,
343353 logger: self . logger
344354 )
345355
@@ -386,14 +396,15 @@ final class PostgresRowSequenceTests: XCTestCase {
386396
387397 func testStreamBufferAcceptsNewRowsEventhoughItDidntAskForIt( ) async throws {
388398 let dataSource = MockRowDataSource ( )
399+ let embeddedEventLoop = EmbeddedEventLoop ( )
389400 let stream = PSQLRowStream (
390401 source: . stream(
391402 [
392403 . init( name: " test " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . int8, dataTypeSize: 8 , dataTypeModifier: 0 , format: . binary)
393404 ] ,
394405 dataSource
395406 ) ,
396- eventLoop: self . eventLoop ,
407+ eventLoop: embeddedEventLoop ,
397408 logger: self . logger
398409 )
399410
0 commit comments