-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Make Sql.Interpolation backwards compatible #514
Changes from all commits
03372ec
f4944bb
8224add
8743aaa
6d30fdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,12 @@ | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.this") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.Sql$Interpolation$") | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql.Interpolation") | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql.Interpolation") | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql#Interpolation.this") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.h2.H2Utils") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.h2.H2Utils$") | ||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.journalPayloadCodec") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$RichStatement") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$RichRow") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$JsonCodec$") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$ByteArrayCodec$") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$") | ||
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec") | ||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.durableStatePayloadCodec") | ||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.snapshotPayloadCodec") | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.journalPayloadCodec") | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.snapshotPayloadCodec") | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.durableStatePayloadCodec") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,21 +54,12 @@ object R2dbcSettings { | |
s"Expected akka.persistence.r2dbc.$prefix.payload-column-type to be one of 'BYTEA', 'JSON' or 'JSONB' but found '$t'") | ||
} | ||
|
||
val journalPayloadCodec: PayloadCodec = | ||
if (useJsonPayload("journal")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec | ||
|
||
val journalPublishEvents: Boolean = config.getBoolean("journal.publish-events") | ||
|
||
val snapshotsTable: String = config.getString("snapshot.table") | ||
|
||
val snapshotPayloadCodec: PayloadCodec = | ||
if (useJsonPayload("snapshot")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec | ||
|
||
val durableStateTable: String = config.getString("state.table") | ||
|
||
val durableStatePayloadCodec: PayloadCodec = | ||
if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec | ||
|
||
val durableStateTableByEntityType: Map[String, String] = | ||
configToMap(config.getConfig("state.custom-table")) | ||
|
||
|
@@ -88,18 +79,6 @@ object R2dbcSettings { | |
|
||
val connectionFactorySettings = ConnectionFactorySettings(config.getConfig("connection-factory")) | ||
|
||
val (tagsCodec: TagsCodec, timestampCodec: TimestampCodec, queryAdapter: QueryAdapter) = { | ||
connectionFactorySettings.dialect.name match { | ||
case "sqlserver" => | ||
( | ||
new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config), | ||
TimestampCodec.SqlServerTimestampCodec, | ||
SqlServerQueryAdapter) | ||
case "h2" => (TagsCodec.H2TagsCodec, TimestampCodec.H2TimestampCodec, IdentityAdapter) | ||
case _ => (TagsCodec.PostgresTagsCodec, TimestampCodec.PostgresTimestampCodec, IdentityAdapter) | ||
} | ||
} | ||
|
||
val querySettings = new QuerySettings(config.getConfig("query")) | ||
|
||
val dbTimestampMonotonicIncreasing: Boolean = config.getBoolean("db-timestamp-monotonic-increasing") | ||
|
@@ -112,24 +91,55 @@ object R2dbcSettings { | |
case _ => config.getDuration("log-db-calls-exceeding").asScala | ||
} | ||
|
||
val codecSettings = { | ||
val journalPayloadCodec: PayloadCodec = | ||
if (useJsonPayload("journal")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec | ||
val snapshotPayloadCodec: PayloadCodec = | ||
if (useJsonPayload("snapshot")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec | ||
val durableStatePayloadCodec: PayloadCodec = | ||
if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec | ||
|
||
connectionFactorySettings.dialect.name match { | ||
case "sqlserver" => | ||
new CodecSettings( | ||
journalPayloadCodec, | ||
snapshotPayloadCodec, | ||
durableStatePayloadCodec, | ||
tagsCodec = new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config), | ||
timestampCodec = TimestampCodec.SqlServerTimestampCodec, | ||
queryAdapter = SqlServerQueryAdapter) | ||
case "h2" => | ||
new CodecSettings( | ||
journalPayloadCodec, | ||
snapshotPayloadCodec, | ||
durableStatePayloadCodec, | ||
tagsCodec = TagsCodec.H2TagsCodec, | ||
timestampCodec = TimestampCodec.H2TimestampCodec, | ||
queryAdapter = IdentityAdapter) | ||
case _ => | ||
new CodecSettings( | ||
journalPayloadCodec, | ||
snapshotPayloadCodec, | ||
durableStatePayloadCodec, | ||
tagsCodec = TagsCodec.PostgresTagsCodec, | ||
timestampCodec = TimestampCodec.PostgresTimestampCodec, | ||
queryAdapter = IdentityAdapter) | ||
} | ||
} | ||
|
||
val cleanupSettings = new CleanupSettings(config.getConfig("cleanup")) | ||
val settingsFromConfig = new R2dbcSettings( | ||
schema, | ||
journalTable, | ||
journalPayloadCodec, | ||
journalPublishEvents, | ||
snapshotsTable, | ||
snapshotPayloadCodec, | ||
tagsCodec, | ||
timestampCodec, | ||
queryAdapter, | ||
durableStateTable, | ||
durableStatePayloadCodec, | ||
durableStateAssertSingleWriter, | ||
logDbCallsExceeding, | ||
querySettings, | ||
dbTimestampMonotonicIncreasing, | ||
cleanupSettings, | ||
codecSettings, | ||
connectionFactorySettings, | ||
durableStateTableByEntityType, | ||
durableStateAdditionalColumnClasses, | ||
|
@@ -153,20 +163,16 @@ object R2dbcSettings { | |
final class R2dbcSettings private ( | ||
val schema: Option[String], | ||
val journalTable: String, | ||
val journalPayloadCodec: PayloadCodec, | ||
val journalPublishEvents: Boolean, | ||
val snapshotsTable: String, | ||
val snapshotPayloadCodec: PayloadCodec, | ||
val tagsCodec: TagsCodec, | ||
val timestampCodec: TimestampCodec, | ||
val queryAdapter: QueryAdapter, | ||
val durableStateTable: String, | ||
val durableStatePayloadCodec: PayloadCodec, | ||
val durableStateAssertSingleWriter: Boolean, | ||
val logDbCallsExceeding: FiniteDuration, | ||
val querySettings: QuerySettings, | ||
val dbTimestampMonotonicIncreasing: Boolean, | ||
val cleanupSettings: CleanupSettings, | ||
/** INTERNAL API */ | ||
@InternalApi private[akka] val codecSettings: CodecSettings, | ||
_connectionFactorySettings: ConnectionFactorySettings, | ||
_durableStateTableByEntityType: Map[String, String], | ||
_durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]], | ||
|
@@ -234,20 +240,15 @@ final class R2dbcSettings private ( | |
private def copy( | ||
schema: Option[String] = schema, | ||
journalTable: String = journalTable, | ||
journalPayloadCodec: PayloadCodec = journalPayloadCodec, | ||
journalPublishEvents: Boolean = journalPublishEvents, | ||
snapshotsTable: String = snapshotsTable, | ||
snapshotPayloadCodec: PayloadCodec = snapshotPayloadCodec, | ||
tagsCodec: TagsCodec = tagsCodec, | ||
timestampCodec: TimestampCodec = timestampCodec, | ||
queryAdapter: QueryAdapter = queryAdapter, | ||
durableStateTable: String = durableStateTable, | ||
durableStatePayloadCodec: PayloadCodec = durableStatePayloadCodec, | ||
durableStateAssertSingleWriter: Boolean = durableStateAssertSingleWriter, | ||
logDbCallsExceeding: FiniteDuration = logDbCallsExceeding, | ||
querySettings: QuerySettings = querySettings, | ||
dbTimestampMonotonicIncreasing: Boolean = dbTimestampMonotonicIncreasing, | ||
cleanupSettings: CleanupSettings = cleanupSettings, | ||
codecSettings: CodecSettings = codecSettings, | ||
connectionFactorySettings: ConnectionFactorySettings = connectionFactorySettings, | ||
durableStateTableByEntityType: Map[String, String] = _durableStateTableByEntityType, | ||
durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] = | ||
|
@@ -257,20 +258,15 @@ final class R2dbcSettings private ( | |
new R2dbcSettings( | ||
schema, | ||
journalTable, | ||
journalPayloadCodec, | ||
journalPublishEvents, | ||
snapshotsTable, | ||
snapshotPayloadCodec, | ||
tagsCodec, | ||
timestampCodec, | ||
queryAdapter, | ||
durableStateTable, | ||
durableStatePayloadCodec, | ||
durableStateAssertSingleWriter, | ||
logDbCallsExceeding, | ||
querySettings, | ||
dbTimestampMonotonicIncreasing, | ||
cleanupSettings, | ||
codecSettings, | ||
connectionFactorySettings, | ||
_durableStateTableByEntityType, | ||
_durableStateAdditionalColumnClasses, | ||
|
@@ -328,6 +324,39 @@ final class PublishEventsDynamicSettings(config: Config) { | |
val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").asScala | ||
} | ||
|
||
/** | ||
* INTERNAL API | ||
*/ | ||
@InternalStableApi | ||
final class CodecSettings( | ||
val journalPayloadCodec: PayloadCodec, | ||
val snapshotPayloadCodec: PayloadCodec, | ||
val durableStatePayloadCodec: PayloadCodec, | ||
val tagsCodec: TagsCodec, | ||
val timestampCodec: TimestampCodec, | ||
val queryAdapter: QueryAdapter) { | ||
|
||
// implicits that can be imported | ||
object JournalImplicits { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have not seen this very often, objects within a case class. But I guess it is fine. Alternative could be something based on inheritance or so, this this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CodecSettings.this.journalPayloadCodec was just because I used the same names There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, the object is only for name scoping |
||
implicit def journalPayloadCodec: PayloadCodec = CodecSettings.this.journalPayloadCodec | ||
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec | ||
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec | ||
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter | ||
} | ||
object SnapshotImplicits { | ||
implicit def snapshotPayloadCodec: PayloadCodec = CodecSettings.this.snapshotPayloadCodec | ||
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec | ||
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec | ||
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter | ||
} | ||
object DurableStateImplicits { | ||
implicit def durableStatePayloadCodec: PayloadCodec = CodecSettings.this.durableStatePayloadCodec | ||
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec | ||
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec | ||
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter | ||
} | ||
} | ||
|
||
/** | ||
* INTERNAL API | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
package akka.persistence.r2dbc.internal | ||
|
||
import scala.annotation.varargs | ||
|
||
import akka.annotation.InternalApi | ||
import akka.annotation.InternalStableApi | ||
import akka.persistence.r2dbc.internal.codec.IdentityAdapter | ||
import akka.persistence.r2dbc.internal.codec.QueryAdapter | ||
|
@@ -21,7 +23,19 @@ object Sql { | |
* to include a literal `?`. Trims whitespace, including line breaks. Standard string interpolation arguments `$` can | ||
* be used. | ||
*/ | ||
implicit class Interpolation(val sc: StringContext)(implicit adapter: QueryAdapter) extends AnyRef { | ||
implicit class Interpolation(val sc: StringContext) extends AnyVal { | ||
def sql(args: Any*): String = | ||
fillInParameterNumbers(trimLineBreaks(sc.s(args: _*))) | ||
} | ||
|
||
/** | ||
* INTERNAL API: Scala string interpolation with `sql` prefix. Replaces `?` with numbered `\$1`, `\$2` for bind | ||
* parameters. Use `??` to include a literal `?`. Trims whitespace, including line breaks. Standard string | ||
* interpolation arguments `$` can be used. | ||
*/ | ||
@InternalApi private[akka] implicit class InterpolationWithAdapter(val sc: StringContext)(implicit | ||
adapter: QueryAdapter) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Note that this is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, we don't override all sql, which is good. I'll think a little more about this, but this PR can be merged seperately There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, we do not override all sql statements, which is why this QueryAdaper was introduced. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I leave it as this for now. Projections will still be able to use QueryAdapter since it's |
||
extends AnyRef { | ||
def sql(args: Any*): String = | ||
adapter(fillInParameterNumbers(trimLineBreaks(sc.s(args: _*)))) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the file should be latest release, 1.2.1, not 1.2.2