Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SourceProvider - eventsFromSQLQuery #595

Open
gfduszynski opened this issue Aug 28, 2024 · 2 comments
Open

SourceProvider - eventsFromSQLQuery #595

gfduszynski opened this issue Aug 28, 2024 · 2 comments

Comments

@gfduszynski
Copy link

Short description

Goal of this feature would be to allow the user to use custom event tables as sources for projections.

In our case alongside standard db structure for akka-persistence-r2dbc we have a TimescaleDB hypertable
holding device metrics (basically just a table for the sake of discussion).

It's inconvenient to implement SourceProvider for this use case because the logic for "live query" would basically
have to be a duplicate of what is already available with liveBySlices.

Details

I envision it could work by allowing the user to provide a function with following signature:
asOfYetUnknownFunctionName(offset: Option[Offset])(session: R2dbcSession): Future[Seq[EventEnvelope[T]]

Ultimately this allows user to reuse a lot of good and tested code.

Previous attempt using source provider

Before I realized live query is a requirement this is what I was doing:

class DemoSourceProvider(implicit system: ActorSystem[_]) extends SourceProvider[Offset, EventEnvelope[DeviceEvent]]:
  implicit val executionContext: ExecutionContext = system.executionContext

  override def source(offset: () => Future[Option[Offset]]): Future[Source[EventEnvelope[DeviceEvent], NotUsed]] = {
    implicit val ec: ExecutionContext = system.executionContext

    offset().flatMap { offsetOpt =>
      val offset = offsetOpt.getOrElse(NoOffset)
      val sqlOffset = offset match
        case NoOffset => 0
        case Sequence(n) => n


      R2dbcSession.withSession(system)(session => {
        val mapper = new ObjectMapper().registerModule(DefaultScalaModule).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
        val stm = session.createStatement("SELECT * FROM tenant_1.event_series_test OFFSET $1 LIMIT 1000;").bind(0,sqlOffset);
        session.select(stm)(row => {
          val seqNr = row.get(0,    classOf[java.lang.Long])
          val dbTime = row.get(1,   classOf[Instant])
          val devTime = row.get(2,  classOf[Instant])
          val deviceId = row.get(3, classOf[Integer]).intValue()
          val event = row.get(4,    classOf[String])
          val json = row.get(5,     classOf[io.r2dbc.postgresql.codec.Json])

          val map = mapper.readValue(json.asString(), classOf[Map[String, AnyRef]])
          val evt = DeviceEvent(seqNr, dbTime, devTime, deviceId, event, map)

          EventEnvelope(Offset.sequence(seqNr), s"device-${deviceId}", seqNr, evt, dbTime.toEpochMilli)
        }).map(rows => Source(rows))
      })

    }
  }

  override def extractOffset(envelope: EventEnvelope[DeviceEvent]): Offset = envelope.offset

  override def extractCreationTime(envelope: EventEnvelope[DeviceEvent]): Long = envelope.timestamp
@patriknw
Copy link
Member

I don't understand how we would reuse the existing eventsBySlices live query to implement this since that is very specific for slices and timestamp offsets.

If you have a seqNr that can be used as offset the R2dbcOffsetStore will be able to handle that, so I think you were on the right track to implement this as a custom SourceProvider.

Regarding live query I think you can look at eventsByPersistenceId instead of bySlices.

@gfduszynski
Copy link
Author

gfduszynski commented Aug 29, 2024

Thank you for your reply.

You are correct about eventBySlices, I was just trying to point at the code that already exists and performs the polling of the DB.

It just seems to me like an interesting proposition since SourceProvider would have to pretty much follow the exact same logic.
Having this feature in some form would prevent duplication of code and configuration.

Below is a fragment of code that Intend to use for the time being.
In this fragment DeviceEventDAO is what user would have to implement in order to take advantage of the requested feature.
I was looking for something like this before I started writing it, maybe somebody will find it helpful :)

trait EventEnvelopeDAO[T] {
  def query(session: R2dbcSession, offsetOpt: Option[Offset], limit: Long): Future[scala.collection.immutable.Iterable[EventEnvelope[T]]]
}

object DeviceEventDAO extends EventEnvelopeDAO[DeviceEvent]:
  private val mapper = new ObjectMapper().registerModule(DefaultScalaModule).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)

  override def query(session: R2dbcSession, offsetOpt: Option[Offset], limit: Long): Future[scala.collection.immutable.Iterable[EventEnvelope[DeviceEvent]]] = {
    val queryOffset = offsetOpt.getOrElse(NoOffset) match
      case NoOffset => 0
      case Sequence(n) => n

    val stm = session.createStatement("SELECT * FROM tenant_1.event_series_test OFFSET $1 LIMIT $2;").bind(0, queryOffset).bind(1, limit);
    session.select(stm)(row => {
      val seqNr     = row.get(0, classOf[java.lang.Long])
      val dbTime    = row.get(1, classOf[Instant])
      val devTime   = row.get(2, classOf[Instant])
      val deviceId  = row.get(3, classOf[Integer]).intValue()
      val event     = row.get(4, classOf[String])
      val json      = row.get(5, classOf[io.r2dbc.postgresql.codec.Json])

      val map = mapper.readValue(json.asString(), classOf[Map[String, AnyRef]])
      val evt = DeviceEvent(seqNr, dbTime, devTime, deviceId, event, map)

      EventEnvelope(Offset.sequence(seqNr), s"device-${deviceId}", seqNr, evt, dbTime.toEpochMilli)
    })
  }

class DemoSourceProvider[T](dao: EventEnvelopeDAO[T])(implicit system: ActorSystem[_]) extends SourceProvider[Offset, EventEnvelope[T]]:
  implicit val executionContext: ExecutionContext = system.executionContext

  private val bufferSize = 1000
  private val refreshInterval = 3.seconds

  override def source(offset: () => Future[Option[Offset]]): Future[Source[EventEnvelope[T], NotUsed]] = {
    implicit val ec: ExecutionContext = system.executionContext
    offset().flatMap { offsetOpt =>
        Future.successful {
         Source.unfoldAsync(offsetOpt) { offset =>
              R2dbcSession.withSession(system)(implicit session => {
                dao.query(session, offset, bufferSize).map { eventEnvelopes =>
                  if (eventEnvelopes.nonEmpty) then Some((Some(eventEnvelopes.last.offset), eventEnvelopes))
                  else Some((offset, eventEnvelopes))
                }
              })
            }
            .throttle(2, refreshInterval, -1, { eventEnvelopes =>
              eventEnvelopes.size match {
                case size if size / bufferSize < 0.1 => 2
                case size if size / bufferSize < 0.9 => 1
                case _ => 0
              }
            },ThrottleMode.shaping)
            .flatMapConcat {
              eventEnvelopes => Source(eventEnvelopes)
            }
        }
    }
  }

  override def extractOffset(envelope: EventEnvelope[T]): Offset = envelope.offset

  override def extractCreationTime(envelope: EventEnvelope[T]): Long = envelope.timestamp

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants