diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala index 14e34f6c576..8999a6ec2db 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala @@ -24,8 +24,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource private lazy val confluentClient = schemaRegistryClientFactory.create(kafkaConfig) test("all topic strategy test") { - val strategy = new TopicsWithExistingSubjectSelectionStrategy() - strategy.getTopics(confluentClient, kafkaConfig).toList.map(_.toSet) shouldBe List( + val strategy = new TopicsWithExistingSubjectSelectionStrategy(confluentClient) + strategy.getTopics.toList.map(_.toSet) shouldBe List( Set( RecordTopic, RecordTopicWithKey, @@ -40,8 +40,9 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource } test("topic filtering strategy test") { - val strategy = new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile(".*Record.*")) - strategy.getTopics(confluentClient, kafkaConfig).toList shouldBe List( + val strategy = + new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile(".*Record.*")) + strategy.getTopics.toList shouldBe List( List(ArrayOfRecordsTopic, RecordTopic, RecordTopicWithKey) ) } @@ -53,8 +54,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource testModelDependencies, new FlinkKafkaSourceImplFactory(None) ) { - override def topicSelectionStrategy = - new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile("test-.*")) + override lazy val topicSelectionStrategy = + new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile("test-.*")) } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala index 9da6f208136..d98cca2f609 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala @@ -13,7 +13,13 @@ import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, TopicN import pl.touk.nussknacker.engine.api.validation.ValidationMode import pl.touk.nussknacker.engine.api.{NodeId, Params} import pl.touk.nussknacker.engine.kafka.validator.WithCachedTopicsExistenceValidator -import pl.touk.nussknacker.engine.kafka.{KafkaComponentsUtils, KafkaConfig, PreparedKafkaTopic, UnspecializedTopicName} +import pl.touk.nussknacker.engine.kafka.{ + KafkaComponentsUtils, + KafkaConfig, + KafkaUtils, + PreparedKafkaTopic, + UnspecializedTopicName +} import pl.touk.nussknacker.engine.schemedkafka.schemaregistry._ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName._ @@ -47,10 +53,14 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid @transient protected lazy val schemaRegistryClient: SchemaRegistryClient = schemaRegistryClientFactory.create(kafkaConfig) - protected def topicSelectionStrategy: TopicSelectionStrategy = { + @transient protected lazy val topicSelectionStrategy: TopicSelectionStrategy = { if (kafkaConfig.showTopicsWithoutSchema) { - new AllNonHiddenTopicsSelectionStrategy - } else new TopicsWithExistingSubjectSelectionStrategy + new AllNonHiddenTopicsSelectionStrategy( + schemaRegistryClient, + KafkaUtils.createKafkaAdminClient(kafkaConfig), + kafkaConfig.topicsWithoutSchemaFetchTimeout + ) + } else new TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient) } @transient protected lazy val kafkaConfig: KafkaConfig = prepareKafkaConfig @@ -67,7 +77,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid protected def getTopicParam( implicit nodeId: NodeId ): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = { - val topics = topicSelectionStrategy.getTopics(schemaRegistryClient, kafkaConfig) + val topics = topicSelectionStrategy.getTopics (topics match { case Valid(topics) => Writer[List[ProcessCompilationError], List[UnspecializedTopicName]](Nil, topics) diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala index 1693ddea4fb..f832b414427 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala @@ -1,56 +1,53 @@ package pl.touk.nussknacker.engine.schemedkafka import cats.data.Validated -import org.apache.kafka.clients.admin.ListTopicsOptions +import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors.TimeoutException -import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, UnspecializedTopicName} +import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryError} import java.util.concurrent.ExecutionException import java.util.regex.Pattern +import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ trait TopicSelectionStrategy extends Serializable { - def getTopics( - schemaRegistryClient: SchemaRegistryClient, - kafkaConfig: KafkaConfig - ): Validated[SchemaRegistryError, List[UnspecializedTopicName]] + def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] } -class TopicsWithExistingSubjectSelectionStrategy extends TopicSelectionStrategy { +class TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient: SchemaRegistryClient) + extends TopicSelectionStrategy { - override def getTopics( - schemaRegistryClient: SchemaRegistryClient, - kafkaConfig: KafkaConfig - ): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { + override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { schemaRegistryClient.getAllTopics } } -class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy { +// TODO: Close client +class AllNonHiddenTopicsSelectionStrategy( + schemaRegistryClient: SchemaRegistryClient, + kafkaAdminClient: Admin, + fetchTimeout: FiniteDuration +) extends TopicSelectionStrategy { - override def getTopics( - schemaRegistryClient: SchemaRegistryClient, - kafkaConfig: KafkaConfig - ): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { + override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { val topicsFromSchemaRegistry = schemaRegistryClient.getAllTopics val schemaLessTopics: List[UnspecializedTopicName] = { try { - KafkaUtils.usingAdminClient(kafkaConfig) { - _.listTopics(new ListTopicsOptions().timeoutMs(kafkaConfig.topicsWithoutSchemaFetchTimeout.toMillis.toInt)) - .names() - .get() - .asScala - .toSet - .map(UnspecializedTopicName.apply) - .filterNot(topic => topic.name.startsWith("_")) - .toList - } + kafkaAdminClient + .listTopics(new ListTopicsOptions().timeoutMs(fetchTimeout.toMillis.toInt)) + .names() + .get() + .asScala + .toSet + .map(UnspecializedTopicName.apply) + .filterNot(topic => topic.name.startsWith("_")) + .toList } catch { // In some tests we pass dummy kafka address, so when we try to get topics from kafka it fails case err: ExecutionException => @@ -68,13 +65,12 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy { } -class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(val topicPattern: Pattern) - extends TopicSelectionStrategy { +class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy( + schemaRegistryClient: SchemaRegistryClient, + topicPattern: Pattern +) extends TopicSelectionStrategy { - override def getTopics( - schemaRegistryClient: SchemaRegistryClient, - kafkaConfig: KafkaConfig - ): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = + override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = schemaRegistryClient.getAllTopics.map(_.filter(topic => topicPattern.matcher(topic.name).matches())) } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala index 8319b0c021f..71b5cd905a3 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala @@ -40,11 +40,12 @@ trait SchemaRegistryClient extends Serializable { def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]] + // FIXME: strategy created once def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = { if (!kafkaConfig.showTopicsWithoutSchema) { true } else { - val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig) + val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy(this).getTopics topicsWithSchema.exists(_.map(_.name).contains(topic)) } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaRegistryClientFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaRegistryClientFactory.scala index 41e847c9835..7e318fc0cdb 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaRegistryClientFactory.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaRegistryClientFactory.scala @@ -44,6 +44,9 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends .credential(credential) .buildClient() + // TODO: close it + private val kafkaAdminClient = KafkaUtils.createKafkaAdminClient(KafkaConfig(Some(config.kafkaProperties), None)) + // We need to create our own schemas service because some operations like schema listing are not exposed by default client // or even its Schemas inner class. Others like listing of versions are implemented incorrectly (it has wrong json field name in model) private val enhancedSchemasService = new EnhancedSchemasImpl( @@ -105,7 +108,7 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends } override def getAllTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = { - val topics = fetchTopics(KafkaConfig(Some(config.kafkaProperties), None)) + val topics = fetchTopics val matchStrategy = SchemaNameTopicMatchStrategy(topics) getAllFullSchemaNames.map(matchStrategy.getAllMatchingTopics(_, isKey = false)) } @@ -117,13 +120,7 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends getOneMatchingSchemaName(topicName, isKey).andThen(getVersions) } - private def fetchTopics(kafkaConfig: KafkaConfig) = { - KafkaUtils - .usingAdminClient(kafkaConfig) { admin => - admin.listTopics().names().get().asScala.toList - } - .map(UnspecializedTopicName.apply) - } + private def fetchTopics = kafkaAdminClient.listTopics().names().get().asScala.toList.map(UnspecializedTopicName.apply) private def getOneMatchingSchemaName( topicName: UnspecializedTopicName,