Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
private lazy val confluentClient = schemaRegistryClientFactory.create(kafkaConfig)

test("all topic strategy test") {
val strategy = new TopicsWithExistingSubjectSelectionStrategy()
strategy.getTopics(confluentClient, kafkaConfig).toList.map(_.toSet) shouldBe List(
val strategy = new TopicsWithExistingSubjectSelectionStrategy(confluentClient)
strategy.getTopics.toList.map(_.toSet) shouldBe List(
Set(
RecordTopic,
RecordTopicWithKey,
Expand All @@ -40,8 +40,9 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
}

test("topic filtering strategy test") {
val strategy = new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile(".*Record.*"))
strategy.getTopics(confluentClient, kafkaConfig).toList shouldBe List(
val strategy =
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile(".*Record.*"))
strategy.getTopics.toList shouldBe List(
List(ArrayOfRecordsTopic, RecordTopic, RecordTopicWithKey)
)
}
Expand All @@ -53,8 +54,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
testModelDependencies,
new FlinkKafkaSourceImplFactory(None)
) {
override def topicSelectionStrategy =
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile("test-.*"))
override lazy val topicSelectionStrategy =
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile("test-.*"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, TopicN
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.api.{NodeId, Params}
import pl.touk.nussknacker.engine.kafka.validator.WithCachedTopicsExistenceValidator
import pl.touk.nussknacker.engine.kafka.{KafkaComponentsUtils, KafkaConfig, PreparedKafkaTopic, UnspecializedTopicName}
import pl.touk.nussknacker.engine.kafka.{
KafkaComponentsUtils,
KafkaConfig,
KafkaUtils,
PreparedKafkaTopic,
UnspecializedTopicName
}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry._
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName._
Expand Down Expand Up @@ -47,10 +53,14 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
@transient protected lazy val schemaRegistryClient: SchemaRegistryClient =
schemaRegistryClientFactory.create(kafkaConfig)

protected def topicSelectionStrategy: TopicSelectionStrategy = {
@transient protected lazy val topicSelectionStrategy: TopicSelectionStrategy = {
if (kafkaConfig.showTopicsWithoutSchema) {
new AllNonHiddenTopicsSelectionStrategy
} else new TopicsWithExistingSubjectSelectionStrategy
new AllNonHiddenTopicsSelectionStrategy(
schemaRegistryClient,
KafkaUtils.createKafkaAdminClient(kafkaConfig),
kafkaConfig.topicsWithoutSchemaFetchTimeout
)
} else new TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient)
}

@transient protected lazy val kafkaConfig: KafkaConfig = prepareKafkaConfig
Expand All @@ -67,7 +77,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
protected def getTopicParam(
implicit nodeId: NodeId
): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
val topics = topicSelectionStrategy.getTopics(schemaRegistryClient, kafkaConfig)
val topics = topicSelectionStrategy.getTopics

(topics match {
case Valid(topics) => Writer[List[ProcessCompilationError], List[UnspecializedTopicName]](Nil, topics)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,56 +1,53 @@
package pl.touk.nussknacker.engine.schemedkafka

import cats.data.Validated
import org.apache.kafka.clients.admin.ListTopicsOptions
import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.TimeoutException
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, UnspecializedTopicName}
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryError}

import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

trait TopicSelectionStrategy extends Serializable {

def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]]
def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]]

}

class TopicsWithExistingSubjectSelectionStrategy extends TopicSelectionStrategy {
class TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient: SchemaRegistryClient)
extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
schemaRegistryClient.getAllTopics
}

}

class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {
// TODO: Close client
class AllNonHiddenTopicsSelectionStrategy(
schemaRegistryClient: SchemaRegistryClient,
kafkaAdminClient: Admin,
fetchTimeout: FiniteDuration
) extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
val topicsFromSchemaRegistry = schemaRegistryClient.getAllTopics

val schemaLessTopics: List[UnspecializedTopicName] = {
try {
KafkaUtils.usingAdminClient(kafkaConfig) {
_.listTopics(new ListTopicsOptions().timeoutMs(kafkaConfig.topicsWithoutSchemaFetchTimeout.toMillis.toInt))
.names()
.get()
.asScala
.toSet
.map(UnspecializedTopicName.apply)
.filterNot(topic => topic.name.startsWith("_"))
.toList
}
kafkaAdminClient
.listTopics(new ListTopicsOptions().timeoutMs(fetchTimeout.toMillis.toInt))
.names()
.get()
.asScala
.toSet
.map(UnspecializedTopicName.apply)
.filterNot(topic => topic.name.startsWith("_"))
.toList
} catch {
// In some tests we pass dummy kafka address, so when we try to get topics from kafka it fails
case err: ExecutionException =>
Expand All @@ -68,13 +65,12 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {

}

class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(val topicPattern: Pattern)
extends TopicSelectionStrategy {
class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(
schemaRegistryClient: SchemaRegistryClient,
topicPattern: Pattern
) extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] =
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] =
schemaRegistryClient.getAllTopics.map(_.filter(topic => topicPattern.matcher(topic.name).matches()))

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ trait SchemaRegistryClient extends Serializable {

def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]]

// FIXME: strategy created once
def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = {
if (!kafkaConfig.showTopicsWithoutSchema) {
true
} else {
val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig)
val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy(this).getTopics
topicsWithSchema.exists(_.map(_.name).contains(topic))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
.credential(credential)
.buildClient()

// TODO: close it
private val kafkaAdminClient = KafkaUtils.createKafkaAdminClient(KafkaConfig(Some(config.kafkaProperties), None))

// We need to create our own schemas service because some operations like schema listing are not exposed by default client
// or even its Schemas inner class. Others like listing of versions are implemented incorrectly (it has wrong json field name in model)
private val enhancedSchemasService = new EnhancedSchemasImpl(
Expand Down Expand Up @@ -105,7 +108,7 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
}

override def getAllTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
val topics = fetchTopics(KafkaConfig(Some(config.kafkaProperties), None))
val topics = fetchTopics
val matchStrategy = SchemaNameTopicMatchStrategy(topics)
getAllFullSchemaNames.map(matchStrategy.getAllMatchingTopics(_, isKey = false))
}
Expand All @@ -117,13 +120,7 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
getOneMatchingSchemaName(topicName, isKey).andThen(getVersions)
}

private def fetchTopics(kafkaConfig: KafkaConfig) = {
KafkaUtils
.usingAdminClient(kafkaConfig) { admin =>
admin.listTopics().names().get().asScala.toList
}
.map(UnspecializedTopicName.apply)
}
private def fetchTopics = kafkaAdminClient.listTopics().names().get().asScala.toList.map(UnspecializedTopicName.apply)

private def getOneMatchingSchemaName(
topicName: UnspecializedTopicName,
Expand Down