Skip to content

Commit 0a89bb3

Browse files
committed
Add safer path operators and improve documentation
The documentation now does not bring the unsafe `raw` operators first, and explains better what are the existing operators and what to be careful about. This also adds a new `through` operator, that allows for streaming handling of path matches.
1 parent 2de68df commit 0a89bb3

File tree

5 files changed

+178
-35
lines changed

5 files changed

+178
-35
lines changed

finite-state/shared/src/main/scala/fs2/data/pfsa/TreeQueryPipe.scala

+3
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ private[data] abstract class TreeQueryPipe[F[_]: Concurrent, T, O <: T, Matcher,
138138
final def topmost(s: Stream[F, T]): Stream[F, T] =
139139
raw(Int.MaxValue, 0)(s).parJoinUnbounded
140140

141+
final def through(s: Stream[F, T], pipe: Pipe[F, T, Nothing], maxMatch: Int, maxNest: Int): Stream[F, Nothing] =
142+
raw(maxMatch = maxMatch, maxNest = maxNest)(s).map(_.through(pipe)).parJoinUnbounded
143+
141144
final def aggregate[U](s: Stream[F, T],
142145
f: Stream[F, T] => F[U],
143146
deterministic: Boolean,

json/src/main/scala/fs2/data/json/jsonpath/package.scala

+27-2
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,20 @@ package object jsonpath {
4949
* E.g., if you want to emit only the top most matches, set it to `0`.
5050
*
5151
* '''Warning''': make sure you actually consume all the emitted streams otherwise
52-
* this can lead to memory problems.
52+
* this can lead to memory problems. The streams must all be consumed in parallel
53+
* to avoid hanging programs.
5354
*/
54-
def raw(path: JsonPath, maxMatch: Int = Int.MaxValue, maxNest: Int = Int.MaxValue)(implicit
55+
def unsafeRaw(path: JsonPath, maxMatch: Int = Int.MaxValue, maxNest: Int = Int.MaxValue)(implicit
5556
F: Concurrent[F]): Pipe[F, Token, Stream[F, Token]] =
5657
_.through(JsonTagger.pipe)
5758
.through(new JsonQueryPipe(compileJsonPath(path)).raw(maxMatch, maxNest)(_))
5859
.map(_.map(untag(_)).unNone)
5960

61+
@deprecated(message = "Use `filter.unsafeRaw()` instead", since = "fs2-data 1.12.0")
62+
def raw(path: JsonPath, maxMatch: Int = Int.MaxValue, maxNest: Int = Int.MaxValue)(implicit
63+
F: Concurrent[F]): Pipe[F, Token, Stream[F, Token]] =
64+
unsafeRaw(path = path, maxMatch = maxMatch, maxNest = maxNest)
65+
6066
/** Selects the first match in the input stream. The tokens of the first matching
6167
* value are emitted as they are read.
6268
*
@@ -96,6 +102,25 @@ package object jsonpath {
96102
maxNest))
97103
.flatMap(Stream.emits(_))
98104

105+
/** Selects all matching elements in the input stream, feeding them to the provided [[fs2.Pipe]] in parallel.
106+
* Each match results in a new stream of [[fs2.data.json.Token Token]] fed to the `pipe`. All the matches are processed in parallel as soon as new tokens are available.
107+
*
108+
* The `maxMatch` parameter controls how many matches are to be emitted at most.
109+
* Further matches won't be emitted if any.
110+
*
111+
* The `maxNest` parameter controls the maximum level of match nesting to be emitted.
112+
* E.g., if you want to emit only the top most matches, set it to `0`.
113+
*
114+
*/
115+
def through(path: JsonPath,
116+
pipe: Pipe[F, Token, Nothing],
117+
maxMatch: Int = Int.MaxValue,
118+
maxNest: Int = Int.MaxValue)(implicit F: Concurrent[F]): Pipe[F, Token, Nothing] =
119+
_.through(JsonTagger.pipe)
120+
.through(
121+
new JsonQueryPipe(compileJsonPath(path))
122+
.through(_, _.map(untag(_)).unNone.through(pipe), maxMatch, maxNest))
123+
99124
/** Selects all matching elements in the input stream, and applies the [[fs2.Collector]] to it.
100125
*
101126
* If `deterministic` is set to `true` (default value), elements are emitted in the order they

site/documentation/json/jsonpath.md

+63-19
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,28 @@ The supported JSONPath features are:
7272
Using the path defined above, we can filter the stream of tokens, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you.
7373
The filtering pipes are located in the `fs2.data.json.jsonpath.filter` namespace.
7474

75-
Since JSONPath includes a recursive descent operator, there can be nested matches for your path.
76-
The `filter.raw` emits a stream of all matches.
77-
Each match is represented as a nested stream of JSON tokens which must be consumed.
75+
The main operators in the namespace are:
7876

79-
```scala mdoc
80-
import fs2.data.json.jsonpath.filter
77+
- `filter.first(path)` which is a `Pipe` returning the tokens of the first match only.
78+
- `filter.collect(path, collector)` which uses the provided `collector` to aggregate the tokens of each match, and emits all the aggregated results.
79+
- `filter.values[Json](path)` which builds the AST for each match for any type `Json` with a [`Builder`][json-builder] in scope.
80+
- `filter.through(path, pipe)` which sends all matches as a stream through the provided `pipe`.
8181

82-
import cats.effect._
83-
import cats.effect.unsafe.implicits.global
82+
@:callout(info)
83+
Since JSONPath includes a recursive descent operator, there can be nested matches for your path.
84+
The matches are returned in the order their first matching token is encountered in the input.
85+
This means that for nested matches, the first stream returned is the ancestor element.
86+
@:@
8487

85-
val filtered = stream.lift[IO].through(filter.raw(path)).parEvalMapUnbounded(_.compile.toList)
86-
filtered.compile.toList.unsafeRunSync()
87-
```
8888

89-
The matching streams are returned in the order their matching element is encountered in the input.
90-
This means that for nested matches, the first stream returned is the ancestor element.
89+
Using `filter.collect`, you can build a stream that collects each match for the provided collector and emits the aggregated result. For instance, to build the list of string representations of the matches, you can run the following code.
9190

9291
```scala mdoc
9392
import fs2.data.json.literals._
93+
import fs2.data.json.jsonpath.filter
94+
95+
import cats.effect._
96+
import cats.effect.unsafe.implicits.global
9497

9598
val recursive = jsonpath"$$..a"
9699

@@ -106,34 +109,75 @@ val json = json"""{
106109

107110
json
108111
.lift[IO]
109-
.through(filter.raw(recursive))
110-
.parEvalMapUnbounded(_.compile.toList)
112+
.through(filter.collect(recursive, List))
111113
.compile
112114
.toList
113115
.unsafeRunSync()
114116
```
115117

116-
This is actually a common use case, so the library offers `filter.collect` to have this behavior for any collector.
118+
If you want to have results emitted as early as possible instead of in order, you can set the `deterministic` parameter to `false`.
117119

118120
```scala mdoc
119121
json
120122
.lift[IO]
121-
.through(filter.collect(recursive, List))
123+
.through(filter.collect(recursive, List, deterministic = false))
122124
.compile
123125
.toList
124126
.unsafeRunSync()
125127
```
126128

127-
If you want to have results emitted as early as possible instead of in order, you can set the `deterministic` parameter to `false`.
129+
The `filter.through` operator allows for handling each match in a streaming fashion.
130+
For instance, let's say you want to save each match in a file, incrementing a counter on each match. You can run the following code.
131+
132+
```scala mdoc
133+
import fs2.io.file.{Files, Path}
134+
135+
def saveJson(counter: Ref[IO, Int], tokens: Stream[IO, Token]): Stream[IO, Nothing] =
136+
Stream.eval(counter.getAndUpdate(_ + 1)).flatMap { index =>
137+
tokens
138+
.through(render.compact)
139+
.through(Files[IO].writeUtf8(Path(s"match-$index.json")))
140+
}
141+
142+
val program =
143+
for {
144+
counter <- Ref[IO].of(0)
145+
_ <- json
146+
.lift[IO]
147+
.through(filter.through(recursive, saveJson(counter, _)))
148+
.compile
149+
.drain
150+
} yield ()
151+
152+
program.unsafeRunSync()
153+
154+
Files[IO].readUtf8(Path("match-0.json")).compile.string.unsafeRunSync()
155+
Files[IO].readUtf8(Path("match-1.json")).compile.string.unsafeRunSync()
156+
```
157+
158+
@:callout(warning)
159+
The operator described below is unsafe and should be used carefully only if none of the above operators fits your purpose.
160+
When using it, please ensure that you:
161+
162+
- consume **all** inner `Stream`s
163+
- consume them in **parallel** (e.g. with a variant of `parEvalMap` and paralellism >1, or with a variant of `parJoin`).
164+
165+
Failure to do so might result in memory leaks or hanging programs.
166+
@:@
167+
168+
The `filter.unsafeRaw` emits a stream of all matches.
169+
Each match is represented as a nested stream of JSON tokens which must be consumed.
128170

129171
```scala mdoc
172+
130173
json
131174
.lift[IO]
132-
.through(filter.collect(recursive, List, deterministic = false))
175+
.through(filter.unsafeRaw(recursive))
176+
.parEvalMapUnbounded(_.compile.toList)
133177
.compile
134178
.toList
135179
.unsafeRunSync()
136180
```
137-
138181
[monad-error]: https://typelevel.org/cats/api/cats/MonadError.html
139182
[jsonpath]: https://goessner.net/articles/JsonPath/index.html
183+
[json-builder]: index.md#ast-builder-and-tokenizer

site/documentation/xml/xpath.md

+61-12
Original file line numberDiff line numberDiff line change
@@ -79,47 +79,96 @@ You can use parentheses to associate differently, for instance `!(p1 && p2) || p
7979
Using the path defined above, we can filter the stream of events, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you.
8080
The filtering pipes are located in the `fs2.data.xml.xpath.filter` namespace.
8181

82-
Since XPath includes a recursive descent operator, there can be nested matches for your path.
83-
The `filter.raw` emits a stream of all matches.
84-
Each match is represented as a nested stream of XML events which must be consumed.
82+
The main operators in the namespace are:
83+
84+
- `filter.first(xpath)` which is a `Pipe` returning the events of the first match only.
85+
- `filter.collect(xpath, collector)` which uses the provided `collector` to aggregate the events of each match, and emits all the aggregated results.
86+
- `filter.dom[Node](xpath)` which builds the DOM for each match for any DOM type `Node` with a [`DocumentBuilder`][dom-builder] in scope.
87+
- `filter.through(xpath, pipe)` which sends all matches as a stream through the provided `pipe`.
88+
89+
@:callout(info)
90+
Since XPath includes a recursive descent operator, there can be nested matches for your xpath.
91+
The matches are returned in the order their opening matching element is encountered in the input by default.
92+
This means that for nested matches, the first stream returned is the ancestor element.
93+
@:@
94+
95+
Using `filter.collect`, you can build a stream that collects each match for the provided collector and emits the aggregated result. For instance, to build the list of string representations of the matches, you can run the following code.
8596

8697
```scala mdoc
8798
import cats.effect._
8899
import cats.effect.unsafe.implicits.global
89100

90101
stream
91102
.lift[IO]
92-
.through(filter.raw(path))
93-
.parEvalMapUnbounded(_.through(render.raw()).compile.foldMonoid)
103+
.through(filter.collect(path, collector.raw()))
94104
.compile
95105
.toList
96106
.unsafeRunSync()
97107
```
98108

99-
The matching streams are returned in the order their matching element is encountered in the input.
100-
This means that for nested matches, the first stream returned is the ancestor element.
101-
102-
The library offers `filter.collect` to collect each match for any collector.
109+
If you want to have results emitted as early as possible instead of in order, you can set the `deterministic` parameter to `false`.
103110

104111
```scala mdoc
105112
stream
106113
.lift[IO]
107-
.through(filter.collect(path, collector.raw()))
114+
.through(filter.collect(path, collector.raw(), deterministic = false))
108115
.compile
109116
.toList
110117
.unsafeRunSync()
111118
```
112119

113-
If you want to have results emitted as early as possible instead of in order, you can set the `deterministic` parameter to `false`.
120+
The `filter.through` operator allows for handling each match in a streaming fashion.
121+
For instance, let's say you want to save each match in a file, incrementing a counter on each match. You can run the following code.
122+
123+
```scala mdoc
124+
import fs2.io.file.{Files, Path}
125+
126+
def saveXml(counter: Ref[IO, Int], events: Stream[IO, XmlEvent]): Stream[IO, Nothing] =
127+
Stream.eval(counter.getAndUpdate(_ + 1)).flatMap { index =>
128+
events
129+
.through(render.raw())
130+
.through(Files[IO].writeUtf8(Path(s"match-$index.xml")))
131+
}
132+
133+
val program =
134+
for {
135+
counter <- Ref[IO].of(0)
136+
_ <- stream
137+
.lift[IO]
138+
.through(filter.through(path, saveXml(counter, _)))
139+
.compile
140+
.drain
141+
} yield ()
142+
143+
program.unsafeRunSync()
144+
145+
Files[IO].readUtf8(Path("match-0.xml")).compile.string.unsafeRunSync()
146+
Files[IO].readUtf8(Path("match-1.xml")).compile.string.unsafeRunSync()
147+
```
148+
149+
@:callout(warning)
150+
The operator described below is unsafe and should be used carefully only if none of the above operators fits your purpose.
151+
When using it, please ensure that you:
152+
153+
- consume **all** inner `Stream`s
154+
- consume them in **parallel** (e.g. with a variant of `parEvalMap` and paralellism >1, or with a variant of `parJoin`).
155+
156+
Failure to do so might result in memory leaks or hanging programs.
157+
@:@
158+
159+
The `filter.unsafeRaw` operator emits a stream of all matches.
160+
Each match is represented as a nested stream of XML events which must be consumed.
114161

115162
```scala mdoc
116163
stream
117164
.lift[IO]
118-
.through(filter.collect(path, collector.raw(), deterministic = false))
165+
.through(filter.unsafeRaw(path))
166+
.parEvalMapUnbounded(_.through(render.raw()).compile.foldMonoid)
119167
.compile
120168
.toList
121169
.unsafeRunSync()
122170
```
123171

124172
[monad-error]: https://typelevel.org/cats/api/cats/MonadError.html
125173
[xpath]: https://www.w3.org/TR/xpath/
174+
[dom-builder]: index.md#dom-builder-and-eventifier

xml/src/main/scala/fs2/data/xml/xpath/package.scala

+24-2
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,18 @@ package object xpath {
4848
* E.g., if you want to emit only the top most matches, set it to `0`.
4949
*
5050
* '''Warning''': make sure you actually consume all the emitted streams otherwise
51-
* this can lead to memory problems.
51+
* this can lead to memory problems. The streams must all be consumed in parallel
52+
* to avoid hanging programs.
5253
*/
53-
def raw(path: XPath, maxMatch: Int = Int.MaxValue, maxNest: Int = Int.MaxValue)(implicit
54+
def unsafeRaw(path: XPath, maxMatch: Int = Int.MaxValue, maxNest: Int = Int.MaxValue)(implicit
5455
F: Concurrent[F]): Pipe[F, XmlEvent, Stream[F, XmlEvent]] =
5556
new XmlQueryPipe(compileXPath(path)).raw(maxMatch, maxNest)(_)
5657

58+
@deprecated(message = "Use `filter.unsafeRaw()` instead", since = "fs2-data 1.12.0")
59+
def raw(path: XPath, maxMatch: Int = Int.MaxValue, maxNest: Int = Int.MaxValue)(implicit
60+
F: Concurrent[F]): Pipe[F, XmlEvent, Stream[F, XmlEvent]] =
61+
unsafeRaw(path = path, maxMatch = maxMatch, maxNest = maxNest)
62+
5763
/** Selects the first match only. First is meant as in: opening tag appears first in the input, no matter the depth.
5864
* Tokens of the first match are emitted as they are read from the input.
5965
*
@@ -84,6 +90,22 @@ package object xpath {
8490
.aggregate(_, _.through(xml.dom.elements).compile.toList, deterministic, maxMatch, maxNest)
8591
.flatMap(Stream.emits(_))
8692

93+
/** Selects all matching elements in the input stream, feeding them to the provided [[fs2.Pipe]] in parallel.
94+
* Each match results in a new stream of [[fs2.data.xml.XmlEvent XmlEvent]] fed to the `pipe`. All the matches are processed in parallel as soon as new events are available.
95+
*
96+
* The `maxMatch` parameter controls how many matches are to be emitted at most.
97+
* Further matches won't be emitted if any.
98+
*
99+
* The `maxNest` parameter controls the maximum level of match nesting to be emitted.
100+
* E.g., if you want to emit only the top most matches, set it to `0`.
101+
*
102+
*/
103+
def through(path: XPath,
104+
pipe: Pipe[F, XmlEvent, Nothing],
105+
maxMatch: Int = Int.MaxValue,
106+
maxNest: Int = Int.MaxValue)(implicit F: Concurrent[F]): Pipe[F, XmlEvent, Nothing] =
107+
new XmlQueryPipe(compileXPath(path)).through(_, pipe, maxMatch, maxNest)
108+
87109
/** Selects all matching elements in the input stream, and applies the [[fs2.Collector]] to it.
88110
*
89111
* If `deterministic` is set to `true` (default value), elements are emitted in the order they

0 commit comments

Comments
 (0)