Skip to content

Commit

Permalink
Use scalafmt (close #58)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Aug 2, 2021
1 parent a94f471 commit e9e859f
Show file tree
Hide file tree
Showing 44 changed files with 1,075 additions and 829 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ jobs:
java-version: 11
- name: Run tests
run: sbt +test
- name: Check Scala formatting
if: ${{ always() }}
run: sbt scalafmtCheckAll
- name: Check assets can be published
if: ${{ always() }}
run: sbt publishLocal
Expand Down
17 changes: 17 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version = 2.4.2
style = default
maxColumn = 120
optIn.breakChainOnFirstMethodDot = false
assumeStandardLibraryStripMargin = true
align = more
continuationIndent.defnSite = 2
rewrite.rules = [
AsciiSortImports,
AvoidInfix,
PreferCurlyFors,
RedundantBraces,
RedundantParens,
SortModifiers
]
project.git = true
includeNoParensInSelectChains = true
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
142 changes: 79 additions & 63 deletions src/main/scala/com/snowplowanalytics/iglu/server/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@ import generated.BuildInfo.version
* be skipped if a schema with the same key already exists
* @param webhooks List of webhooks triggered by specific actions or endpoints
*/
case class Config(database: Config.StorageConfig,
repoServer: Config.Http,
debug: Option[Boolean],
patchesAllowed: Option[Boolean],
webhooks: Option[List[Webhook]])
case class Config(
database: Config.StorageConfig,
repoServer: Config.Http,
debug: Option[Boolean],
patchesAllowed: Option[Boolean],
webhooks: Option[List[Webhook]]
)

object Config {

sealed trait ThreadPool extends Product with Serializable
object ThreadPool {
case object Global extends ThreadPool
case object Cached extends ThreadPool
case object Global extends ThreadPool
case object Cached extends ThreadPool
case class Fixed(size: Int) extends ThreadPool

implicit val threadPoolReader: ConfigReader[ThreadPool] =
Expand All @@ -64,7 +66,7 @@ object Config {
case Right(s) if s.toLowerCase == "cached" => Right(Cached)
case Left(err) =>
for {
obj <- cur.asObjectCursor
obj <- cur.asObjectCursor
typeCur <- obj.atKey("type")
typeStr <- typeCur.asString
pool <- typeStr.toLowerCase match {
Expand All @@ -75,7 +77,7 @@ object Config {
} yield Fixed(sizeInt)
case "global" => Right(Global)
case "cached" => Right(Cached)
case _ => Left(err)
case _ => Left(err)
}
} yield pool
}
Expand All @@ -99,27 +101,31 @@ object Config {
sealed trait ConnectionPool extends Product with Serializable
object ConnectionPool {
case class NoPool(threadPool: ThreadPool = ThreadPool.Cached) extends ConnectionPool
case class Hikari(connectionTimeout: Option[Int],
maxLifetime: Option[Int],
minimumIdle: Option[Int],
maximumPoolSize: Option[Int],
// Provided by doobie
connectionPool: ThreadPool = ThreadPool.Fixed(2),
transactionPool: ThreadPool = ThreadPool.Cached) extends ConnectionPool
case class Hikari(
connectionTimeout: Option[Int],
maxLifetime: Option[Int],
minimumIdle: Option[Int],
maximumPoolSize: Option[Int],
// Provided by doobie
connectionPool: ThreadPool = ThreadPool.Fixed(2),
transactionPool: ThreadPool = ThreadPool.Cached
) extends ConnectionPool

implicit val circePoolEncoder: Encoder[ConnectionPool] =
Encoder.instance {
case NoPool(threadPool) =>
Json.fromFields(List(
("type", Json.fromString("NoPool")),
("threadPool", threadPool.asJson(ThreadPool.threadPoolCirceEncoder))
))
Json.fromFields(
List(
("type", Json.fromString("NoPool")),
("threadPool", threadPool.asJson(ThreadPool.threadPoolCirceEncoder))
)
)
case h: Hikari =>
deriveEncoder[Hikari].apply(h)
}

implicit val nopoolHint = ProductHint[NoPool](ConfigFieldMapping(CamelCase, CamelCase))
implicit val hikariHint = ProductHint[Hikari](ConfigFieldMapping(CamelCase, CamelCase))
implicit val nopoolHint = ProductHint[NoPool](ConfigFieldMapping(CamelCase, CamelCase))
implicit val hikariHint = ProductHint[Hikari](ConfigFieldMapping(CamelCase, CamelCase))
implicit val noPoolReader = deriveReader[NoPool]
implicit val hikariReader = deriveReader[Hikari]

Expand All @@ -129,15 +135,16 @@ object Config {
objCur <- cur.asObjectCursor
typeCur = objCur.atKeyOrUndefined("type")
pool <- if (typeCur.isUndefined) Right(ConfigReader[NoPool].from(cur).getOrElse(ConnectionPool.NoPool()))
else for {
typeStr <- typeCur.asString
result <- typeStr.toLowerCase match {
case "hikari" =>
ConfigReader[Hikari].from(cur)
case "nopool" =>
ConfigReader[NoPool].from(cur)
}
} yield result
else
for {
typeStr <- typeCur.asString
result <- typeStr.toLowerCase match {
case "hikari" =>
ConfigReader[Hikari].from(cur)
case "nopool" =>
ConfigReader[NoPool].from(cur)
}
} yield result
} yield pool
}
}
Expand All @@ -150,15 +157,17 @@ object Config {
/**
* Configuration for PostgreSQL state storage.
*/
case class Postgres(host: String,
port: Int,
dbname: String,
username: String,
password: String,
driver: String,
connectThreads: Option[Int],
maxPoolSize: Option[Int], // deprecated
pool: ConnectionPool = ConnectionPool.NoPool(ThreadPool.Cached)) extends StorageConfig {
case class Postgres(
host: String,
port: Int,
dbname: String,
username: String,
password: String,
driver: String,
connectThreads: Option[Int],
maxPoolSize: Option[Int], // deprecated
pool: ConnectionPool = ConnectionPool.NoPool(ThreadPool.Cached)
) extends StorageConfig {

/** Backward-compatibility */
val maximumPoolSize: Int = pool match {
Expand All @@ -168,19 +177,26 @@ object Config {
}

val postgresReader: ConfigReader[Postgres] =
ConfigReader.forProduct9("host", "port","dbname", "username",
"password", "driver", "connectThreads", "maxPoolSize", "pool")(StorageConfig.Postgres.apply)
ConfigReader.forProduct9(
"host",
"port",
"dbname",
"username",
"password",
"driver",
"connectThreads",
"maxPoolSize",
"pool"
)(StorageConfig.Postgres.apply)

implicit val storageConfigCirceEncoder: Encoder[StorageConfig] =
deriveEncoder[StorageConfig].mapJson { json =>
json.hcursor
.downField("Postgres")
.focus
.getOrElse(Json.Null)
.mapObject { o => JsonObject.fromMap(o.toMap.map {
json.hcursor.downField("Postgres").focus.getOrElse(Json.Null).mapObject { o =>
JsonObject.fromMap(o.toMap.map {
case ("password", _) => ("password", Json.fromString("******"))
case (k, v) => (k, v)
})}
case (k, v) => (k, v)
})
}
}
}

Expand All @@ -200,13 +216,13 @@ object Config {

implicit val pureWebhookReader: ConfigReader[Webhook] = ConfigReader.fromCursor { cur =>
for {
objCur <- cur.asObjectCursor
objCur <- cur.asObjectCursor
uriCursor <- objCur.atKey("uri")
uri <- ConfigReader[org.http4s.Uri].from(uriCursor)
uri <- ConfigReader[org.http4s.Uri].from(uriCursor)

prefixes <- objCur.atKeyOrUndefined("vendor-prefixes") match {
case keyCur if keyCur.isUndefined => List.empty.asRight
case keyCur => keyCur.asList.flatMap(_.traverse(cur => cur.asString))
case keyCur => keyCur.asList.flatMap(_.traverse(cur => cur.asString))
}
} yield Webhook.SchemaPublished(uri, Some(prefixes))
}
Expand All @@ -216,9 +232,9 @@ object Config {
objCur <- cur.asObjectCursor
typeCur <- objCur.atKey("type")
typeStr <- typeCur.asString
result <- typeStr match {
result <- typeStr match {
case "postgres" => StorageConfig.postgresReader.from(cur)
case "dummy" => StorageConfig.Dummy.asRight
case "dummy" => StorageConfig.Dummy.asRight
case _ =>
val message = s"type has value $typeStr instead of class1 or class2"
objCur.failed[StorageConfig](error.CannotConvert(objCur.objValue.toString, "StorageConfig", message))
Expand All @@ -230,9 +246,9 @@ object Config {

implicit val pureWebhooksReader: ConfigReader[List[Webhook]] = ConfigReader.fromCursor { cur =>
for {
objCur <- cur.asObjectCursor
objCur <- cur.asObjectCursor
schemaPublishedCursors <- objCur.atKeyOrUndefined("schema-published").asList
webhooks <- schemaPublishedCursors.traverse(cur => pureWebhookReader.from(cur))
webhooks <- schemaPublishedCursors.traverse(cur => pureWebhookReader.from(cur))
} yield webhooks
}

Expand All @@ -244,26 +260,26 @@ object Config {
sealed trait ServerCommand {
def config: Path
def read: Either[String, Config] =
ConfigSource
.default(ConfigSource.file(config))
.load[Config]
.leftMap(_.toList.map(_.description).mkString("\n"))
ConfigSource.default(ConfigSource.file(config)).load[Config].leftMap(_.toList.map(_.description).mkString("\n"))
}

object ServerCommand {
case class Run(config: Path) extends ServerCommand
case class Run(config: Path) extends ServerCommand
case class Setup(config: Path, migrate: Option[MigrateFrom]) extends ServerCommand
}

val configOpt = Opts.option[Path]("config", "Path to server configuration HOCON")
val migrateOpt = Opts
.option[String]("migrate", "Migrate the DB from a particular version")
.mapValidated { s => MigrateFrom.parse(s).toValid(s"Cannot perform migration from version $s to $version").toValidatedNel }
.mapValidated { s =>
MigrateFrom.parse(s).toValid(s"Cannot perform migration from version $s to $version").toValidatedNel
}
.orNone

val runCommand: Opts[ServerCommand] = configOpt.map(ServerCommand.Run.apply)
val setupCommand: Opts[ServerCommand] =
Opts.subcommand("setup", "Setup Iglu Server")((configOpt, migrateOpt).mapN(ServerCommand.Setup.apply))

val serverCommand = Command[ServerCommand](generated.BuildInfo.name, generated.BuildInfo.version)(runCommand.orElse(setupCommand))
val serverCommand =
Command[ServerCommand](generated.BuildInfo.name, generated.BuildInfo.version)(runCommand.orElse(setupCommand))
}
6 changes: 3 additions & 3 deletions src/main/scala/com/snowplowanalytics/iglu/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ object Main extends SafeIOApp {
val cli = for {
command <- EitherT.fromEither[IO](Config.serverCommand.parse(args).leftMap(_.toString))
config <- EitherT.fromEither[IO](command.read)
result <- command match {
result <- command match {
case _: Config.ServerCommand.Run =>
EitherT.liftF[IO, String, ExitCode](Server.run(config).compile.lastOrError )
EitherT.liftF[IO, String, ExitCode](Server.run(config).compile.lastOrError)
case Config.ServerCommand.Setup(_, migration) =>
EitherT.liftF[IO, String, ExitCode](Server.setup(config, migration))
}
} yield result

cli.value.flatMap {
case Right(code) => IO.pure(code)
case Right(code) => IO.pure(code)
case Left(cliError) => IO(System.err.println(cliError)).as(ExitCode.Error)
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/main/scala/com/snowplowanalytics/iglu/server/SafeIOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import scala.concurrent.ExecutionContext
trait SafeIOApp extends IOApp.WithContext {

private lazy val ec: ExecutionContext =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(List(2, Runtime.getRuntime.availableProcessors()).max, DaemonThreadFactory))
ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(List(2, Runtime.getRuntime.availableProcessors()).max, DaemonThreadFactory)
)

private final val log: Logger = LoggerFactory.getLogger(Main.getClass)
final private val log: Logger = LoggerFactory.getLogger(Main.getClass)

// To overcome https://github.com/typelevel/cats-effect/issues/515
private final val exitingEC: ExecutionContext = new ExecutionContext {
final private val exitingEC: ExecutionContext = new ExecutionContext {
def execute(r: Runnable): Unit =
ec.execute { () =>
try r.run()
Expand All @@ -48,14 +50,14 @@ trait SafeIOApp extends IOApp.WithContext {
Resource.eval(SyncIO(exitingEC))

private object DaemonThreadFactory extends ThreadFactory {
private val s: SecurityManager = System.getSecurityManager
private val poolNumber: AtomicInteger = new AtomicInteger(1)
private val group: ThreadGroup = if (s == null) Thread.currentThread().getThreadGroup else s.getThreadGroup
private val threadNumber: AtomicInteger = new AtomicInteger(1)
private val namePrefix: String = "ioapp-pool-" + poolNumber.getAndIncrement() + "-thread-"
private val s: SecurityManager = System.getSecurityManager
private val poolNumber: AtomicInteger = new AtomicInteger(1)
private val group: ThreadGroup = if (s == null) Thread.currentThread().getThreadGroup else s.getThreadGroup
private val threadNumber: AtomicInteger = new AtomicInteger(1)
private val namePrefix: String = "ioapp-pool-" + poolNumber.getAndIncrement() + "-thread-"

def newThread(r: Runnable): Thread = {
val t: Thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0)
val t: Thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0)
t.setDaemon(true)
if (t.getPriority != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY)
t
Expand Down
Loading

0 comments on commit e9e859f

Please sign in to comment.