diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt index 12ab2b92..17826001 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt @@ -3,6 +3,7 @@ package com.r3.corda.lib.tokens.selection.memory.config import co.paralleluniverse.fibers.Suspendable import com.r3.corda.lib.tokens.selection.api.StateSelectionConfig import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector +import com.r3.corda.lib.tokens.selection.memory.services.IndexingType import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService import net.corda.core.cordapp.CordappConfig import net.corda.core.cordapp.CordappConfigException @@ -13,12 +14,12 @@ const val CACHE_SIZE_DEFAULT = 1024 const val PAGE_SIZE_DEFAULT = 1024 data class InMemorySelectionConfig @JvmOverloads constructor( - val enabled: Boolean, - val indexingStrategies: List, - val cacheSize: Int = CACHE_SIZE_DEFAULT, - val pageSize: Int = 1000, - val sleep: Int = 0, - val loadingThreads: Int = 4 + val enabled: Boolean, + val indexingStrategies: List, + val cacheSize: Int = CACHE_SIZE_DEFAULT, + val pageSize: Int = 1000, + val sleep: Int = 0, + val loadingThreads: Int = 4 ) : StateSelectionConfig { companion object { private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger") @@ -37,13 +38,13 @@ data class InMemorySelectionConfig @JvmOverloads constructor( val loadingSleep: Int = config.getIntOrNull("stateSelection.inMemory.loadingSleepSeconds")?: 0 val loadingThreads: Int = config.getIntOrNull("stateSelection.inMemory.loadingThreads")?: 4 val indexingType = try { - (config.get("stateSelection.inMemory.indexingStrategies") as List).map { VaultWatcherService.IndexingType.valueOf(it.toString()) } + (config.get("stateSelection.inMemory.indexingStrategies") as List).map { IndexingType.valueOf(it.toString()) } } catch (e: CordappConfigException) { logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") - emptyList() + emptyList() } catch (e: ClassCastException) { logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") - emptyList() + emptyList() } logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize") return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize, loadingSleep, loadingThreads) diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt new file mode 100644 index 00000000..2bec1ccb --- /dev/null +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt @@ -0,0 +1,25 @@ +package com.r3.corda.lib.tokens.selection.memory.services + +import com.r3.corda.lib.tokens.selection.memory.internal.Holder + +enum class IndexingType(val ownerType: Class) { + + EXTERNAL_ID(Holder.MappedIdentity::class.java), + PUBLIC_KEY(Holder.KeyIdentity::class.java); + + companion object { + fun fromHolder(holder: Class): IndexingType { + return when (holder) { + Holder.MappedIdentity::class.java -> { + EXTERNAL_ID + } + + Holder.KeyIdentity::class.java -> { + PUBLIC_KEY + } + else -> throw IllegalArgumentException("Unknown Holder type: $holder") + } + } + } + +} \ No newline at end of file diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt new file mode 100644 index 00000000..c5f9568a --- /dev/null +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt @@ -0,0 +1,75 @@ +package com.r3.corda.lib.tokens.selection.memory.services + +import com.r3.corda.lib.tokens.contracts.states.FungibleToken +import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig +import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending +import io.github.classgraph.ClassGraph +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.Vault +import net.corda.core.node.services.queryBy +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.utilities.contextLogger +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger + +class ServiceHubAsyncLoader(private val appServiceHub: AppServiceHub, + private val configOptions: InMemorySelectionConfig) : ((Vault.Update) -> Unit) -> Unit { + + + override fun invoke( + onVaultUpdate: (Vault.Update) -> Unit + ) { + LOG.info("Starting async token loading from vault") + + val classGraph = ClassGraph() + classGraph.enableClassInfo() + + val scanResultFuture = CompletableFuture.supplyAsync { + classGraph.scan() + } + + scanResultFuture.thenApplyAsync { scanResult -> + val subclasses: Set> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName) + .map { it.name } + .map { Class.forName(it) as Class }.toSet() + + val enrichedClasses = (subclasses - setOf(FungibleToken::class.java)) + LOG.info("Enriching token query with types: $enrichedClasses") + + val shouldLoop = AtomicBoolean(true) + val pageNumber = AtomicInteger(DEFAULT_PAGE_NUM - 1) + val loadingFutures: List> = 0.until(configOptions.loadingThreads).map { + CompletableFuture.runAsync { + try { + while (shouldLoop.get()) { + val newlyLoadedStates = appServiceHub.vaultService.queryBy( + paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = configOptions.pageSize), + criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses), + sorting = sortByTimeStampAscending() + ).states.toSet() + onVaultUpdate(Vault.Update(emptySet(), newlyLoadedStates)) + LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback") + shouldLoop.compareAndSet(true, newlyLoadedStates.isNotEmpty()) + LOG.debug("shouldLoop=${shouldLoop}") + if (configOptions.sleep > 0) { + Thread.sleep(configOptions.sleep.toLong() * 1000) + } + } + } catch (t: Throwable) { + LOG.error("Token Loading Failed due to: ", t) + } + } + } + CompletableFuture.allOf(*loadingFutures.toTypedArray()).thenRunAsync { + LOG.info("finished token loading") + } + } + } + + companion object { + val LOG = contextLogger() + } +} \ No newline at end of file diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt index cccc7aef..0a48da64 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt @@ -10,16 +10,12 @@ import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.memory.internal.Holder import com.r3.corda.lib.tokens.selection.memory.internal.lookupExternalIdFromKey import com.r3.corda.lib.tokens.selection.sortByStateRefAscending -import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending -import io.github.classgraph.ClassGraph -import io.github.classgraph.ScanResult import net.corda.core.contracts.Amount import net.corda.core.contracts.StateAndRef import net.corda.core.internal.uncheckedCast import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.node.services.Vault -import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria @@ -28,12 +24,8 @@ import net.corda.core.utilities.contextLogger import rx.Observable import java.time.Duration import java.util.concurrent.* -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantReadWriteLock -import java.util.function.Function -import java.util.function.Supplier import kotlin.concurrent.read import kotlin.concurrent.write @@ -55,41 +47,28 @@ class VaultWatcherService( private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock() private val UPDATER = Executors.newSingleThreadScheduledExecutor() - enum class IndexingType(val ownerType: Class) { - - EXTERNAL_ID(Holder.MappedIdentity::class.java), - PUBLIC_KEY(Holder.KeyIdentity::class.java); - - companion object { - fun fromHolder(holder: Class): IndexingType { - return when (holder) { - Holder.MappedIdentity::class.java -> { - EXTERNAL_ID - } - - Holder.KeyIdentity::class.java -> { - PUBLIC_KEY - } - else -> throw IllegalArgumentException("Unknown Holder type: $holder") - } - } - } - - } - constructor(appServiceHub: AppServiceHub) : this( getObservableFromAppServiceHub(appServiceHub), InMemorySelectionConfig.parse(appServiceHub.getAppContext().config) ) + init { + addTokensToCache(tokenObserver.initialValues) + tokenObserver.source.doOnError { + LOG.error("received error from observable", it) + } + tokenObserver.source.subscribe(::onVaultUpdate) + tokenObserver.startLoading(::onVaultUpdate) + } + companion object { val LOG = contextLogger() private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver { - val config = appServiceHub.cordappProvider.getAppContext().config - val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config) + val rawConfig = appServiceHub.cordappProvider.getAppContext().config + val parsedConfig: InMemorySelectionConfig = InMemorySelectionConfig.parse(rawConfig) - if (!configOptions.enabled) { + if (!parsedConfig.enabled) { LOG.info("Disabling inMemory token selection - refer to documentation on how to enable") return TokenObserver(emptyList(), Observable.empty(), { _, _ -> Holder.UnmappedIdentity() @@ -105,7 +84,6 @@ class VaultWatcherService( } } } - val (_, vaultObservable) = appServiceHub.vaultService.trackBy( contractStateType = FungibleToken::class.java, paging = PageSpecification(pageNumber = DEFAULT_PAGE_NUM, pageSize = 1), @@ -113,68 +91,12 @@ class VaultWatcherService( sorting = sortByStateRefAscending() ) - val pageSize = configOptions.pageSize - val asyncLoader = object : ((Vault.Update) -> Unit) -> Unit { - override fun invoke(callback: (Vault.Update) -> Unit) { - LOG.info("Starting async token loading from vault") - - val classGraph = ClassGraph() - classGraph.enableClassInfo() - - val scanResultFuture = CompletableFuture.supplyAsync { - classGraph.scan() - } - - scanResultFuture.thenApplyAsync { scanResult -> - val subclasses: Set> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName) - .map { it.name } - .map { Class.forName(it) as Class }.toSet() - - val enrichedClasses = (subclasses - setOf(FungibleToken::class.java)) - LOG.info("Enriching token query with types: $enrichedClasses") - - val shouldLoop = AtomicBoolean(true) - val pageNumber = AtomicInteger(DEFAULT_PAGE_NUM - 1) - val loadingFutures: List> = 0.until(configOptions.loadingThreads).map { - CompletableFuture.runAsync { - try { - while (shouldLoop.get()) { - val newlyLoadedStates = appServiceHub.vaultService.queryBy( - paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = pageSize), - criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses), - sorting = sortByTimeStampAscending() - ).states.toSet() - callback(Vault.Update(emptySet(), newlyLoadedStates)) - LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback") - shouldLoop.compareAndSet(newlyLoadedStates.isNotEmpty(), true) - LOG.debug("shouldLoop=${shouldLoop}") - if (configOptions.sleep > 0) { - Thread.sleep(configOptions.sleep.toLong() * 1000) - } - - } - LOG.info("finished token loading") - } catch (t: Throwable) { - LOG.error("Token Loading Failed due to: ", t) - } - } - } - } - } - } - + val asyncLoader = ServiceHubAsyncLoader(appServiceHub, parsedConfig) return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader) } } - init { - addTokensToCache(tokenObserver.initialValues) - tokenObserver.source.doOnError { - LOG.error("received error from observable", it) - } - tokenObserver.startLoading(::onVaultUpdate) - tokenObserver.source.subscribe(::onVaultUpdate) - } + private fun processToken(token: StateAndRef, indexingType: IndexingType): TokenIndex { val owner = tokenObserver.ownerProvider(token, indexingType) @@ -375,10 +297,10 @@ class VaultWatcherService( } class TokenObserver( - val initialValues: List>, - val source: Observable>, - val ownerProvider: ((StateAndRef, VaultWatcherService.IndexingType) -> Holder), - inline val asyncLoader: ((Vault.Update) -> Unit) -> Unit = { _ -> } + val initialValues: List>, + val source: Observable>, + val ownerProvider: ((StateAndRef, IndexingType) -> Holder), + inline val asyncLoader: ((Vault.Update) -> Unit) -> Unit = { _ -> } ) { fun startLoading(loadingCallBack: (Vault.Update) -> Unit) { diff --git a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt index 801bf32f..16723123 100644 --- a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt +++ b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt @@ -35,7 +35,6 @@ import net.corda.core.utilities.unwrap import java.time.Duration import java.time.temporal.ChronoUnit import java.util.* -import javax.swing.plaf.nimbus.State // This is very simple test flow for DvP. @CordaSerializable diff --git a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt index d598dd02..151f64f4 100644 --- a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt +++ b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt @@ -8,6 +8,7 @@ import com.r3.corda.lib.tokens.selection.database.selector.DatabaseTokenSelectio import com.r3.corda.lib.tokens.selection.memory.config.CACHE_SIZE_DEFAULT import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector +import com.r3.corda.lib.tokens.selection.memory.services.IndexingType import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService import com.typesafe.config.ConfigFactory import net.corda.core.identity.CordaX500Name @@ -58,14 +59,14 @@ class ConfigSelectionTest { val config = ConfigFactory.parseString("stateSelection {\n" + "inMemory {\n" + "cacheSize: 9000\n" + - "indexingStrategies: [${VaultWatcherService.IndexingType.PUBLIC_KEY}]\n" + + "indexingStrategies: [${IndexingType.PUBLIC_KEY}]\n" + "}\n" + "}") val cordappConfig = TypesafeCordappConfig(config) val inMemoryConfig = InMemorySelectionConfig.parse(cordappConfig) assertThat(inMemoryConfig.cacheSize).isEqualTo(9000) - assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(VaultWatcherService.IndexingType.PUBLIC_KEY)) + assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(IndexingType.PUBLIC_KEY)) val selection = ConfigSelection.getPreferredSelection(services, cordappConfig) assertThat(selection).isInstanceOf(LocalTokenSelector::class.java) @@ -76,13 +77,13 @@ class ConfigSelectionTest { val config = ConfigFactory.parseString("stateSelection {\n" + "inMemory {\n" + "cacheSize: 9000\n" + - "indexingStrategies: [\"${VaultWatcherService.IndexingType.EXTERNAL_ID}\", \"${VaultWatcherService.IndexingType.PUBLIC_KEY}\"]\n" + + "indexingStrategies: [\"${IndexingType.EXTERNAL_ID}\", \"${IndexingType.PUBLIC_KEY}\"]\n" + "}\n" + "}") val cordappConfig = TypesafeCordappConfig(config) val inMemoryConfig = InMemorySelectionConfig.parse(cordappConfig) assertThat(inMemoryConfig.cacheSize).isEqualTo(9000) - assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(VaultWatcherService.IndexingType.EXTERNAL_ID, VaultWatcherService.IndexingType.PUBLIC_KEY)) + assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(IndexingType.EXTERNAL_ID, IndexingType.PUBLIC_KEY)) } @Test diff --git a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt index bc8bdb48..c16b1088 100644 --- a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt +++ b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt @@ -12,6 +12,7 @@ import com.r3.corda.lib.tokens.money.GBP import com.r3.corda.lib.tokens.selection.InsufficientBalanceException import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.memory.internal.Holder +import com.r3.corda.lib.tokens.selection.memory.services.IndexingType import com.r3.corda.lib.tokens.selection.memory.services.TokenObserver import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService import com.r3.corda.lib.tokens.workflows.flows.rpc.IssueTokens @@ -205,14 +206,14 @@ class VaultWatcherServiceTest { } }.toMap() - val ownerProvider = object : (StateAndRef, VaultWatcherService.IndexingType) -> Holder { - override fun invoke(tokenState: StateAndRef, indexingType: VaultWatcherService.IndexingType): Holder { + val ownerProvider = object : (StateAndRef, IndexingType) -> Holder { + override fun invoke(tokenState: StateAndRef, indexingType: IndexingType): Holder { return when (indexingType) { - VaultWatcherService.IndexingType.EXTERNAL_ID -> { + IndexingType.EXTERNAL_ID -> { Holder.MappedIdentity(keyToAccount[tokenState.state.data.holder.owningKey] ?: error("should never happen")) } - VaultWatcherService.IndexingType.PUBLIC_KEY -> { + IndexingType.PUBLIC_KEY -> { Holder.KeyIdentity(tokenState.state.data.holder.owningKey) } }