Skip to content

Commit

Permalink
refactor some code out of VaultWatcherService.kt
Browse files Browse the repository at this point in the history
  • Loading branch information
roastario committed Dec 7, 2021
1 parent db195dd commit ff99df7
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.r3.corda.lib.tokens.selection.memory.config
import co.paralleluniverse.fibers.Suspendable
import com.r3.corda.lib.tokens.selection.api.StateSelectionConfig
import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector
import com.r3.corda.lib.tokens.selection.memory.services.IndexingType
import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService
import net.corda.core.cordapp.CordappConfig
import net.corda.core.cordapp.CordappConfigException
Expand All @@ -13,12 +14,12 @@ const val CACHE_SIZE_DEFAULT = 1024
const val PAGE_SIZE_DEFAULT = 1024

data class InMemorySelectionConfig @JvmOverloads constructor(
val enabled: Boolean,
val indexingStrategies: List<VaultWatcherService.IndexingType>,
val cacheSize: Int = CACHE_SIZE_DEFAULT,
val pageSize: Int = 1000,
val sleep: Int = 0,
val loadingThreads: Int = 4
val enabled: Boolean,
val indexingStrategies: List<IndexingType>,
val cacheSize: Int = CACHE_SIZE_DEFAULT,
val pageSize: Int = 1000,
val sleep: Int = 0,
val loadingThreads: Int = 4
) : StateSelectionConfig {
companion object {
private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger")
Expand All @@ -37,13 +38,13 @@ data class InMemorySelectionConfig @JvmOverloads constructor(
val loadingSleep: Int = config.getIntOrNull("stateSelection.inMemory.loadingSleepSeconds")?: 0
val loadingThreads: Int = config.getIntOrNull("stateSelection.inMemory.loadingThreads")?: 4
val indexingType = try {
(config.get("stateSelection.inMemory.indexingStrategies") as List<Any>).map { VaultWatcherService.IndexingType.valueOf(it.toString()) }
(config.get("stateSelection.inMemory.indexingStrategies") as List<Any>).map { IndexingType.valueOf(it.toString()) }
} catch (e: CordappConfigException) {
logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens")
emptyList<VaultWatcherService.IndexingType>()
emptyList<IndexingType>()
} catch (e: ClassCastException) {
logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens")
emptyList<VaultWatcherService.IndexingType>()
emptyList<IndexingType>()
}
logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize")
return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize, loadingSleep, loadingThreads)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.r3.corda.lib.tokens.selection.memory.services

import com.r3.corda.lib.tokens.selection.memory.internal.Holder

enum class IndexingType(val ownerType: Class<out Holder>) {

EXTERNAL_ID(Holder.MappedIdentity::class.java),
PUBLIC_KEY(Holder.KeyIdentity::class.java);

companion object {
fun fromHolder(holder: Class<out Holder>): IndexingType {
return when (holder) {
Holder.MappedIdentity::class.java -> {
EXTERNAL_ID
}

Holder.KeyIdentity::class.java -> {
PUBLIC_KEY
}
else -> throw IllegalArgumentException("Unknown Holder type: $holder")
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.r3.corda.lib.tokens.selection.memory.services

import com.r3.corda.lib.tokens.contracts.states.FungibleToken
import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending
import io.github.classgraph.ClassGraph
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.Vault
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.contextLogger
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

class ServiceHubAsyncLoader(private val appServiceHub: AppServiceHub,
private val configOptions: InMemorySelectionConfig) : ((Vault.Update<FungibleToken>) -> Unit) -> Unit {


override fun invoke(
onVaultUpdate: (Vault.Update<FungibleToken>) -> Unit
) {
LOG.info("Starting async token loading from vault")

val classGraph = ClassGraph()
classGraph.enableClassInfo()

val scanResultFuture = CompletableFuture.supplyAsync {
classGraph.scan()
}

scanResultFuture.thenApplyAsync { scanResult ->
val subclasses: Set<Class<out FungibleToken>> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName)
.map { it.name }
.map { Class.forName(it) as Class<out FungibleToken> }.toSet()

val enrichedClasses = (subclasses - setOf(FungibleToken::class.java))
LOG.info("Enriching token query with types: $enrichedClasses")

val shouldLoop = AtomicBoolean(true)
val pageNumber = AtomicInteger(DEFAULT_PAGE_NUM - 1)
val loadingFutures: List<CompletableFuture<Void>> = 0.until(configOptions.loadingThreads).map {
CompletableFuture.runAsync {
try {
while (shouldLoop.get()) {
val newlyLoadedStates = appServiceHub.vaultService.queryBy<FungibleToken>(
paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = configOptions.pageSize),
criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses),
sorting = sortByTimeStampAscending()
).states.toSet()
onVaultUpdate(Vault.Update(emptySet(), newlyLoadedStates))
LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback")
shouldLoop.compareAndSet(true, newlyLoadedStates.isNotEmpty())
LOG.debug("shouldLoop=${shouldLoop}")
if (configOptions.sleep > 0) {
Thread.sleep(configOptions.sleep.toLong() * 1000)
}
}
} catch (t: Throwable) {
LOG.error("Token Loading Failed due to: ", t)
}
}
}
CompletableFuture.allOf(*loadingFutures.toTypedArray()).thenRunAsync {
LOG.info("finished token loading")
}
}
}

companion object {
val LOG = contextLogger()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@ import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
import com.r3.corda.lib.tokens.selection.memory.internal.Holder
import com.r3.corda.lib.tokens.selection.memory.internal.lookupExternalIdFromKey
import com.r3.corda.lib.tokens.selection.sortByStateRefAscending
import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending
import io.github.classgraph.ClassGraph
import io.github.classgraph.ScanResult
import net.corda.core.contracts.Amount
import net.corda.core.contracts.StateAndRef
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.Vault
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
Expand All @@ -28,12 +24,8 @@ import net.corda.core.utilities.contextLogger
import rx.Observable
import java.time.Duration
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.function.Function
import java.util.function.Supplier
import kotlin.concurrent.read
import kotlin.concurrent.write

Expand All @@ -55,41 +47,28 @@ class VaultWatcherService(
private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock()
private val UPDATER = Executors.newSingleThreadScheduledExecutor()

enum class IndexingType(val ownerType: Class<out Holder>) {

EXTERNAL_ID(Holder.MappedIdentity::class.java),
PUBLIC_KEY(Holder.KeyIdentity::class.java);

companion object {
fun fromHolder(holder: Class<out Holder>): IndexingType {
return when (holder) {
Holder.MappedIdentity::class.java -> {
EXTERNAL_ID
}

Holder.KeyIdentity::class.java -> {
PUBLIC_KEY
}
else -> throw IllegalArgumentException("Unknown Holder type: $holder")
}
}
}

}

constructor(appServiceHub: AppServiceHub) : this(
getObservableFromAppServiceHub(appServiceHub),
InMemorySelectionConfig.parse(appServiceHub.getAppContext().config)
)

init {
addTokensToCache(tokenObserver.initialValues)
tokenObserver.source.doOnError {
LOG.error("received error from observable", it)
}
tokenObserver.source.subscribe(::onVaultUpdate)
tokenObserver.startLoading(::onVaultUpdate)
}

companion object {
val LOG = contextLogger()

private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver {
val config = appServiceHub.cordappProvider.getAppContext().config
val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config)
val rawConfig = appServiceHub.cordappProvider.getAppContext().config
val parsedConfig: InMemorySelectionConfig = InMemorySelectionConfig.parse(rawConfig)

if (!configOptions.enabled) {
if (!parsedConfig.enabled) {
LOG.info("Disabling inMemory token selection - refer to documentation on how to enable")
return TokenObserver(emptyList(), Observable.empty(), { _, _ ->
Holder.UnmappedIdentity()
Expand All @@ -105,76 +84,19 @@ class VaultWatcherService(
}
}
}

val (_, vaultObservable) = appServiceHub.vaultService.trackBy(
contractStateType = FungibleToken::class.java,
paging = PageSpecification(pageNumber = DEFAULT_PAGE_NUM, pageSize = 1),
criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL),
sorting = sortByStateRefAscending()
)

val pageSize = configOptions.pageSize
val asyncLoader = object : ((Vault.Update<FungibleToken>) -> Unit) -> Unit {
override fun invoke(callback: (Vault.Update<FungibleToken>) -> Unit) {
LOG.info("Starting async token loading from vault")

val classGraph = ClassGraph()
classGraph.enableClassInfo()

val scanResultFuture = CompletableFuture.supplyAsync {
classGraph.scan()
}

scanResultFuture.thenApplyAsync { scanResult ->
val subclasses: Set<Class<out FungibleToken>> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName)
.map { it.name }
.map { Class.forName(it) as Class<out FungibleToken> }.toSet()

val enrichedClasses = (subclasses - setOf(FungibleToken::class.java))
LOG.info("Enriching token query with types: $enrichedClasses")

val shouldLoop = AtomicBoolean(true)
val pageNumber = AtomicInteger(DEFAULT_PAGE_NUM - 1)
val loadingFutures: List<CompletableFuture<Void>> = 0.until(configOptions.loadingThreads).map {
CompletableFuture.runAsync {
try {
while (shouldLoop.get()) {
val newlyLoadedStates = appServiceHub.vaultService.queryBy<FungibleToken>(
paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = pageSize),
criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses),
sorting = sortByTimeStampAscending()
).states.toSet()
callback(Vault.Update(emptySet(), newlyLoadedStates))
LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback")
shouldLoop.compareAndSet(newlyLoadedStates.isNotEmpty(), true)
LOG.debug("shouldLoop=${shouldLoop}")
if (configOptions.sleep > 0) {
Thread.sleep(configOptions.sleep.toLong() * 1000)
}

}
LOG.info("finished token loading")
} catch (t: Throwable) {
LOG.error("Token Loading Failed due to: ", t)
}
}
}
}
}
}

val asyncLoader = ServiceHubAsyncLoader(appServiceHub, parsedConfig)
return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader)
}
}

init {
addTokensToCache(tokenObserver.initialValues)
tokenObserver.source.doOnError {
LOG.error("received error from observable", it)
}
tokenObserver.startLoading(::onVaultUpdate)
tokenObserver.source.subscribe(::onVaultUpdate)
}


private fun processToken(token: StateAndRef<FungibleToken>, indexingType: IndexingType): TokenIndex {
val owner = tokenObserver.ownerProvider(token, indexingType)
Expand Down Expand Up @@ -375,10 +297,10 @@ class VaultWatcherService(
}

class TokenObserver(
val initialValues: List<StateAndRef<FungibleToken>>,
val source: Observable<Vault.Update<FungibleToken>>,
val ownerProvider: ((StateAndRef<FungibleToken>, VaultWatcherService.IndexingType) -> Holder),
inline val asyncLoader: ((Vault.Update<FungibleToken>) -> Unit) -> Unit = { _ -> }
val initialValues: List<StateAndRef<FungibleToken>>,
val source: Observable<Vault.Update<FungibleToken>>,
val ownerProvider: ((StateAndRef<FungibleToken>, IndexingType) -> Holder),
inline val asyncLoader: ((Vault.Update<FungibleToken>) -> Unit) -> Unit = { _ -> }
) {

fun startLoading(loadingCallBack: (Vault.Update<FungibleToken>) -> Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import net.corda.core.utilities.unwrap
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.*
import javax.swing.plaf.nimbus.State

// This is very simple test flow for DvP.
@CordaSerializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.r3.corda.lib.tokens.selection.database.selector.DatabaseTokenSelectio
import com.r3.corda.lib.tokens.selection.memory.config.CACHE_SIZE_DEFAULT
import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector
import com.r3.corda.lib.tokens.selection.memory.services.IndexingType
import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService
import com.typesafe.config.ConfigFactory
import net.corda.core.identity.CordaX500Name
Expand Down Expand Up @@ -58,14 +59,14 @@ class ConfigSelectionTest {
val config = ConfigFactory.parseString("stateSelection {\n" +
"inMemory {\n" +
"cacheSize: 9000\n" +
"indexingStrategies: [${VaultWatcherService.IndexingType.PUBLIC_KEY}]\n" +
"indexingStrategies: [${IndexingType.PUBLIC_KEY}]\n" +
"}\n" +
"}")
val cordappConfig = TypesafeCordappConfig(config)

val inMemoryConfig = InMemorySelectionConfig.parse(cordappConfig)
assertThat(inMemoryConfig.cacheSize).isEqualTo(9000)
assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(VaultWatcherService.IndexingType.PUBLIC_KEY))
assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(IndexingType.PUBLIC_KEY))

val selection = ConfigSelection.getPreferredSelection(services, cordappConfig)
assertThat(selection).isInstanceOf(LocalTokenSelector::class.java)
Expand All @@ -76,13 +77,13 @@ class ConfigSelectionTest {
val config = ConfigFactory.parseString("stateSelection {\n" +
"inMemory {\n" +
"cacheSize: 9000\n" +
"indexingStrategies: [\"${VaultWatcherService.IndexingType.EXTERNAL_ID}\", \"${VaultWatcherService.IndexingType.PUBLIC_KEY}\"]\n" +
"indexingStrategies: [\"${IndexingType.EXTERNAL_ID}\", \"${IndexingType.PUBLIC_KEY}\"]\n" +
"}\n" +
"}")
val cordappConfig = TypesafeCordappConfig(config)
val inMemoryConfig = InMemorySelectionConfig.parse(cordappConfig)
assertThat(inMemoryConfig.cacheSize).isEqualTo(9000)
assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(VaultWatcherService.IndexingType.EXTERNAL_ID, VaultWatcherService.IndexingType.PUBLIC_KEY))
assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(IndexingType.EXTERNAL_ID, IndexingType.PUBLIC_KEY))
}

@Test
Expand Down
Loading

0 comments on commit ff99df7

Please sign in to comment.