Skip to content

Commit

Permalink
GetAllIds for event, store, product [IA-246] (#73)
Browse files Browse the repository at this point in the history
* feat: fixing member service [IA-246]

* feat: GetAllEventIds [IA-246]

* feat: GetAllStoreIds [IA-246]

* feat: GetAllSkus [IA-246]

* feat: GetAllSkus [IA-246]

* feat: gateway & gatling [IA-246]

* feat: removing unnecessary imports [IA-246]

* feat: scalafmtAll [IA-246]
  • Loading branch information
AlexWeinstein92 authored Aug 16, 2023
1 parent dbbb439 commit d7bb123
Show file tree
Hide file tree
Showing 20 changed files with 172 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package com.improving.app.event.api;

import "com/improving/app/event/domain/eventEvents.proto";
import "com/improving/app/event/domain/eventRequests.proto";
import "google/protobuf/empty.proto";

//import "google/api/annotations.proto";

Expand Down Expand Up @@ -44,4 +45,8 @@ service EventService {
//get:"event/{eventId}/end"
}

rpc GetAllIds(google.protobuf.Empty) returns (domain.AllEventIds) {
//get:"event/allIds"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ message EventEnded {
EventMetaInfo meta = 2;
}

message AllEventIds {
repeated common.domain.EventId all_event_ids = 1;
}

message EventEvent{
option (scalapb.message).sealed_oneof_extends = "com.improving.app.common.serialize.PBMsgOneOfSerializable";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity}
import akka.cluster.typed.{Cluster, Join}
import akka.grpc.GrpcServiceException
import akka.pattern.StatusReply
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.util.Timeout
import com.google.protobuf.empty.Empty
import com.google.protobuf.timestamp.Timestamp
import com.google.rpc.Code
import com.improving.app.common.domain.EventId
Expand Down Expand Up @@ -230,4 +233,15 @@ class EventServiceImpl(implicit val system: ActorSystem[_]) extends EventService
response
}
)

/**
* get:"event/allIds"
*/
override def getAllIds(in: Empty): Future[AllEventIds] = {
val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
readJournal.currentPersistenceIds().runFold(Seq[EventId]())(_ :+ EventId(_)).map { seq =>
AllEventIds(seq)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ message EventCancelled {
EventMetaInfo meta = 2;
}

message AllEventIds {
repeated common.domain.EventId all_event_ids = 1;
}

message EventEvent{
option (scalapb.message).sealed_oneof_extends = "com.improving.app.common.serialize.PBMsgOneOfSerializable";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ message ProductDeleted {
ProductMetaInfo meta = 3;
}

message AllSkus {
repeated common.domain.Sku all_skus = 1;
}

message ProductEvent{
option (scalapb.message).sealed_oneof_extends = "com.improving.app.common.serialize.PBMsgOneOfSerializable";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,8 @@ message ProductsRemovedFromStore {
StoreOrEditableInfo info = 2;
StoreMetaInfo meta_info = 3;
}

message AllStoreIds {
option (scalapb.message).extends = "com.improving.app.common.serialize.PBMsgSerializable";
repeated common.domain.StoreId all_store_ids = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.grpc.GrpcClientSettings
import akka.util.Timeout
import com.improving.app.gateway.domain.common.util.getHostAndPortForService
import com.improving.app.gateway.domain.event.{
AllEventIds => GatewayAllEventIds,
CancelEvent => GatewayCancelEvent,
CreateEvent => GatewayCreateEvent,
EventCancelled,
Expand Down Expand Up @@ -86,4 +87,7 @@ class EventGatewayHandler(grpcClientSettingsOpt: Option[GrpcClientSettings] = No
response.meta.map(_.toGatewayEventMeta)
)
}

def getAllIds: Future[GatewayAllEventIds] =
eventClient.getAllIds(com.google.protobuf.empty.Empty()).map(response => GatewayAllEventIds(response.allEventIds))
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.improving.app.product.domain.{ActivateProduct, CreateProduct, DeleteP
import com.improving.app.gateway.domain.common.util.getHostAndPortForService
import com.improving.app.gateway.domain.product.{
ActivateProduct => GatewayActivateProduct,
AllSkus => GatewayAllSkus,
CreateProduct => GatewayCreateProduct,
DeleteProduct => GatewayDeleteProduct,
ProductActivated,
Expand Down Expand Up @@ -85,4 +86,7 @@ class ProductGatewayHandler(grpcClientSettingsOpt: Option[GrpcClientSettings] =
response.meta.map(_.toGatewayProductMeta)
)
}

def getAllSkus: Future[GatewayAllSkus] =
productClient.getAllSkus(com.google.protobuf.empty.Empty()).map(response => GatewayAllSkus(response.allSkus))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.util.Timeout
import com.improving.app.store.domain.{CloseStore, CreateStore, DeleteStore, MakeStoreReady, OpenStore}
import com.improving.app.gateway.domain.common.util.getHostAndPortForService
import com.improving.app.gateway.domain.store.{
AllStoreIds => GatewayAllStoreIds,
CloseStore => GatewayCloseStore,
CreateStore => GatewayCreateStore,
DeleteStore => GatewayDeleteStore,
Expand Down Expand Up @@ -122,4 +123,7 @@ class StoreGatewayHandler(grpcClientSettingsOpt: Option[GrpcClientSettings] = No
response.metaInfo.map(_.toGatewayStoreMeta)
)
}

def getAllIds: Future[GatewayAllStoreIds] =
storeClient.getAllIds(com.google.protobuf.empty.Empty()).map(response => GatewayAllStoreIds(response.allStoreIds))
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,27 @@ trait EventGatewayRoutes extends ErrorAccumulatingCirceSupport with StrictLoggin
}
}
}
} ~
post {
entity(Directives.as[String]) { data =>
onSuccess(
handler
.createEvent(
fromJsonString[GatewayCreateEvent](data)
)
) { eventCreated =>
complete(JsonFormat.toJsonString(eventCreated))
}
} ~ pathPrefix("allIds") {
get {
onSuccess(
handler.getAllIds
) { allIds =>
complete(JsonFormat.toJsonString(allIds))
}

}
} ~ post {
entity(Directives.as[String]) { data =>
onSuccess(
handler
.createEvent(
fromJsonString[GatewayCreateEvent](data)
)
) { eventCreated =>
complete(JsonFormat.toJsonString(eventCreated))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,27 @@ trait ProductGatewayRoutes extends ErrorAccumulatingCirceSupport with StrictLogg
}
}
}
} ~
post {
entity(Directives.as[String]) { data =>
onSuccess(
handler
.createProduct(
fromJsonString[GatewayCreateProduct](data)
)
) { productCreated =>
complete(JsonFormat.toJsonString(productCreated))
}
} ~ pathPrefix("allSkus") {
get {
onSuccess(
handler.getAllSkus
) { allSkus =>
complete(JsonFormat.toJsonString(allSkus))
}

}
} ~ post {
entity(Directives.as[String]) { data =>
onSuccess(
handler
.createProduct(
fromJsonString[GatewayCreateProduct](data)
)
) { productCreated =>
complete(JsonFormat.toJsonString(productCreated))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,27 @@ trait StoreGatewayRoutes extends ErrorAccumulatingCirceSupport with StrictLoggin
}
}
}
} ~
post {
entity(Directives.as[String]) { data =>
onSuccess(
handler
.createStore(
fromJsonString[GatewayCreateStore](data)
)
) { storeCreated =>
complete(JsonFormat.toJsonString(storeCreated))
}
} ~ pathPrefix("allIds") {
get {
onSuccess(
handler.getAllIds
) { allIds =>
complete(JsonFormat.toJsonString(allIds))
}

}
} ~ post {
entity(Directives.as[String]) { data =>
onSuccess(
handler
.createStore(
fromJsonString[GatewayCreateStore](data)
)
) { storeCreated =>
complete(JsonFormat.toJsonString(storeCreated))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.improving.app.gatling.demoScenario
import io.gatling.core.Predef._
import io.gatling.core.controller.inject.open.OpenInjectionStep
import io.gatling.core.structure.ScenarioBuilder
import io.gatling.http.Predef.http
import io.gatling.http.Predef._
import io.gatling.http.protocol.HttpProtocolBuilder

class GetAllIds extends Simulation {
Expand All @@ -22,10 +22,24 @@ class GetAllIds extends Simulation {

val getAllMembersScn: ScenarioBuilder = getAllScnForService("member")

val getAllEventsScn: ScenarioBuilder = getAllScnForService("event")

val getAllStoresScn: ScenarioBuilder = getAllScnForService("store")

val getAllProductsScn: ScenarioBuilder = scenario(
s"GetAllProducts"
).exec(
http(s"StartScenario - GetAllProducts")
.get(s"/product/allSkus")
)

val injectionProfile: OpenInjectionStep = atOnceUsers(1)
setUp(
getAllTenantsScn.inject(injectionProfile),
getAllOrgsScn.inject(injectionProfile),
getAllMembersScn.inject(injectionProfile)
getAllMembersScn.inject(injectionProfile),
getAllEventsScn.inject(injectionProfile),
getAllStoresScn.inject(injectionProfile),
getAllProductsScn.inject(injectionProfile),
).protocols(httpProtocol)
}
2 changes: 1 addition & 1 deletion member/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ datastax-java-driver {
basic.load-balancing-policy.local-datacenter = "datacenter1"
}

akka.cluster.sharding.remember-entities = on
akka.cluster.sharding.remember-entities = off
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package com.improving.app.product.api;

import "com/improving/app/product/domain/productEvents.proto";
import "com/improving/app/product/domain/productRequests.proto";
import "google/protobuf/empty.proto";

//import "google/api/annotations.proto";

Expand All @@ -15,4 +16,5 @@ service ProductService {
rpc ActivateProduct(domain.ActivateProduct) returns (domain.ProductActivated){}
rpc InactivateProduct(domain.InactivateProduct) returns (domain.ProductInactivated){}
rpc GetProductInfo(domain.GetProductInfo) returns (domain.ProductData){}
rpc GetAllSkus(google.protobuf.Empty) returns (domain.AllSkus) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ message ProductEventResponse {
ProductEvent product_event = 1;
}

message AllSkus {
option (scalapb.message).extends = "com.improving.app.common.serialize.PBMsgSerializable";
repeated common.domain.Sku all_skus = 1;
}

message ProductResponse {
option (scalapb.message).sealed_oneof_extends = "com.improving.app.common.serialize.PBMsgOneOfSerializable";
oneof sealed_value {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity}
import akka.cluster.typed.{Cluster, Join}
import akka.grpc.GrpcServiceException
import akka.pattern.StatusReply
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.util.Timeout
import com.google.protobuf.empty.Empty
import com.google.rpc.Code
import com.improving.app.common.domain.Sku
import com.improving.app.product.domain.{ProductResponse, _}
import com.improving.app.product.domain.Product.{ProductEntityKey, ProductEnvelope}

import scala.:+
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.language.{existentials, postfixOps}
Expand Down Expand Up @@ -138,4 +143,12 @@ class ProductServiceImpl()(implicit val system: ActorSystem[_]) extends ProductS
response
}
)

override def getAllSkus(in: Empty): Future[AllSkus] = {
val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
readJournal.currentPersistenceIds().runFold(Seq[Sku]())(_ :+ Sku(_)).map { seq =>
AllSkus(seq)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@ package com.improving.app.store.api;

import "com/improving/app/store/domain/storeEvents.proto";
import "com/improving/app/store/domain/storeRequest.proto";
import "google/protobuf/empty.proto";

service StoreService {

rpc CreateStore (domain.CreateStore) returns (domain.StoreCreated) {}
rpc CreateStore (domain.CreateStore) returns (domain.StoreCreated) {}

rpc MakeStoreReady (domain.MakeStoreReady) returns (domain.StoreIsReady) {}
rpc MakeStoreReady (domain.MakeStoreReady) returns (domain.StoreIsReady) {}

rpc DeleteStore (domain.DeleteStore) returns (domain.StoreDeleted) {}
rpc DeleteStore (domain.DeleteStore) returns (domain.StoreDeleted) {}

rpc OpenStore (domain.OpenStore) returns (domain.StoreOpened) {}
rpc OpenStore (domain.OpenStore) returns (domain.StoreOpened) {}

rpc CloseStore (domain.CloseStore) returns (domain.StoreClosed) {}
rpc CloseStore (domain.CloseStore) returns (domain.StoreClosed) {}

rpc TerminateStore (domain.TerminateStore) returns (domain.StoreTerminated) {}
rpc TerminateStore (domain.TerminateStore) returns (domain.StoreTerminated) {}

rpc EditStoreInfo (domain.EditStoreInfo) returns (domain.StoreInfoEdited) {}
rpc EditStoreInfo (domain.EditStoreInfo) returns (domain.StoreInfoEdited) {}

rpc AddProductsToStore (domain.AddProductsToStore) returns (domain.ProductsAddedToStore) {}
rpc AddProductsToStore (domain.AddProductsToStore) returns (domain.ProductsAddedToStore) {}

rpc RemoveProductsFromStore (domain.RemoveProductsFromStore) returns (domain.ProductsRemovedFromStore) {}
rpc RemoveProductsFromStore (domain.RemoveProductsFromStore) returns (domain.ProductsRemovedFromStore) {}

rpc GetAllIds(google.protobuf.Empty) returns (domain.AllStoreIds) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,8 @@ message ProductsRemovedFromStore {
StoreOrEditableInfo info = 2;
StoreMetaInfo meta_info = 3;
}

message AllStoreIds {
option (scalapb.message).extends = "com.improving.app.common.serialize.PBMsgSerializable";
repeated common.domain.StoreId all_store_ids = 1;
}
Loading

0 comments on commit d7bb123

Please sign in to comment.