From 671fdfe5255af6e0dda5f68df9ca128af71b7765 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Sat, 20 Jun 2026 20:51:56 -0700 Subject: [PATCH 01/14] Allow static hosts for requested services --- temporal/fx.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/temporal/fx.go b/temporal/fx.go index 40a3b37b096..5489793ee31 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -261,10 +261,10 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { } } - // check that when static hosts are defined, they are defined for all required hosts + // check that when static hosts are defined, they are defined for all requested hosts if len(so.hostsByService) > 0 { - for _, service := range DefaultServices { - hosts := so.hostsByService[primitives.ServiceName(service)] + for service := range so.serviceNames { + hosts := so.hostsByService[service] if len(hosts.All) == 0 { return serverOptionsProvider{}, fmt.Errorf("%w: %v", missingServiceInStaticHosts, service) } From c618b0c5971d4b20df10903c622fcf5f4f9c6ab7 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Sat, 20 Jun 2026 22:39:13 -0700 Subject: [PATCH 02/14] Relax startup checks for custom server options --- temporal/fx.go | 10 ++-- temporal/fx_test.go | 103 ++++++++++++++++++++++++++++++++++++++ temporal/server_option.go | 2 +- 3 files changed, 111 insertions(+), 4 deletions(-) diff --git a/temporal/fx.go b/temporal/fx.go index 5489793ee31..55aded68d51 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -182,9 +182,13 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { } persistenceConfig := so.config.Persistence - err = verifyPersistenceCompatibleVersion(persistenceConfig, so.persistenceServiceResolver, logger) - if err != nil { - return serverOptionsProvider{}, err + // Custom persistence factories own their own schema/version compatibility + // contract; the built-in SQL/Cassandra checker only applies to built-in stores. + if so.customDataStoreFactory == nil { + err = verifyPersistenceCompatibleVersion(persistenceConfig, so.persistenceServiceResolver, logger) + if err != nil { + return serverOptionsProvider{}, err + } } stopChan := make(chan any) diff --git a/temporal/fx_test.go b/temporal/fx_test.go index fc4fe72d57f..c00d2ee8cae 100644 --- a/temporal/fx_test.go +++ b/temporal/fx_test.go @@ -11,7 +11,12 @@ import ( "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/config" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/membership/static" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/primitives" + "go.temporal.io/server/common/resolver" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/tests/testutils" "go.uber.org/mock/gomock" @@ -113,6 +118,64 @@ func TestOverwriteCurrentClusterMetadataWithDBRecord(t *testing.T) { require.Equal(t, int32(1024), cfg.Persistence.NumHistoryShards) } +func TestServerOptionsProviderStaticHostsOnlyRequiresRequestedServices(t *testing.T) { + cfg := testServerConfig(testCustomDataStore("test-custom")) + + _, err := ServerOptionsProvider([]ServerOption{ + ForServices([]string{string(primitives.FrontendService)}), + WithConfig(cfg), + WithCustomDataStoreFactory(noopAbstractDataStoreFactory{}), + WithStaticHosts(map[primitives.ServiceName]static.Hosts{ + primitives.FrontendService: static.SingleLocalHost("127.0.0.1:7000"), + }), + WithLogger(log.NewNoopLogger()), + }) + require.NoError(t, err) +} + +func TestServerOptionsProviderStaticHostsRequiresEachRequestedService(t *testing.T) { + cfg := testServerConfig(testCustomDataStore("test-custom")) + + _, err := ServerOptionsProvider([]ServerOption{ + ForServices([]string{ + string(primitives.FrontendService), + string(primitives.HistoryService), + }), + WithConfig(cfg), + WithCustomDataStoreFactory(noopAbstractDataStoreFactory{}), + WithStaticHosts(map[primitives.ServiceName]static.Hosts{ + primitives.FrontendService: static.SingleLocalHost("127.0.0.1:7000"), + }), + WithLogger(log.NewNoopLogger()), + }) + require.ErrorIs(t, err, missingServiceInStaticHosts) + require.ErrorContains(t, err, string(primitives.HistoryService)) +} + +func TestServerOptionsProviderCustomDataStoreFactorySkipsBuiltInSchemaCheck(t *testing.T) { + cfg := testServerConfig(config.DataStore{ + SQL: &config.SQL{ + PluginName: "not-registered", + DatabaseName: "temporal", + ConnectAddr: "127.0.0.1:1", + ConnectProtocol: "tcp", + }, + }) + + _, err := ServerOptionsProvider([]ServerOption{ + WithConfig(cfg), + WithLogger(log.NewNoopLogger()), + }) + require.ErrorContains(t, err, "unknown plugin") + + _, err = ServerOptionsProvider([]ServerOption{ + WithConfig(cfg), + WithCustomDataStoreFactory(noopAbstractDataStoreFactory{}), + WithLogger(log.NewNoopLogger()), + }) + require.NoError(t, err) +} + func TestUpdateIndexSearchAttributes(t *testing.T) { testCases := []struct { name string @@ -377,6 +440,46 @@ func TestUpdateIndexSearchAttributes(t *testing.T) { } } +func testServerConfig(dataStore config.DataStore) *config.Config { + return &config.Config{ + Persistence: config.Persistence{ + DefaultStore: "default", + VisibilityStore: "visibility", + NumHistoryShards: 1, + DataStores: map[string]config.DataStore{ + "default": dataStore, + "visibility": dataStore, + }, + }, + Services: map[string]config.Service{ + string(primitives.FrontendService): {}, + string(primitives.HistoryService): {}, + }, + } +} + +func testCustomDataStore(name string) config.DataStore { + return config.DataStore{ + CustomDataStoreConfig: &config.CustomDatastoreConfig{ + Name: name, + IndexName: name, + }, + } +} + +type noopAbstractDataStoreFactory struct{} + +func (noopAbstractDataStoreFactory) NewFactory( + config.CustomDatastoreConfig, + resolver.ServiceResolver, + string, + log.Logger, + metrics.Handler, + serialization.Serializer, +) persistence.DataStoreFactory { + return nil +} + func TestTaskCategoryRegistryProvider(t *testing.T) { for _, tc := range []struct { name string diff --git a/temporal/server_option.go b/temporal/server_option.go index 54ffda44b48..716c0ef5b49 100644 --- a/temporal/server_option.go +++ b/temporal/server_option.go @@ -63,7 +63,7 @@ func ForServices(names []string) ServerOption { } // WithStaticHosts disables dynamic service membership and resolves service hosts statically. -// At least one host must be provided for all required services (frontend, history, matching, worker). +// At least one host must be provided for all services passed to ForServices. // And a self-address must be provided for all services passed to ForServices. func WithStaticHosts(hostsByService map[primitives.ServiceName]static.Hosts) ServerOption { return applyFunc(func(s *serverOptions) { From d34f44d908b657c246016c3ff50e20920e56c0d4 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Sat, 20 Jun 2026 22:45:07 -0700 Subject: [PATCH 03/14] Initialize test schema versions --- common/persistence/cassandra/test.go | 38 +++++++++++++++++++ .../persistence/sql/test_sql_persistence.go | 36 ++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/common/persistence/cassandra/test.go b/common/persistence/cassandra/test.go index 1adfc15651f..413cb7cd54a 100644 --- a/common/persistence/cassandra/test.go +++ b/common/persistence/cassandra/test.go @@ -17,6 +17,7 @@ import ( commongocql "go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/resolver" + cassandraschema "go.temporal.io/server/schema/cassandra" "go.temporal.io/server/temporal/environment" "go.temporal.io/server/tests/testutils" ) @@ -93,6 +94,7 @@ func (s *TestCluster) SetupTestDatabase() { } s.LoadSchema(path.Join(schemaDir, "temporal", "schema.cql")) + s.LoadSchemaVersion() } // TearDownTestDatabase from PersistenceTestCluster interface @@ -181,6 +183,42 @@ func (s *TestCluster) LoadSchema(schemaFile string) { s.logger.Info("loaded schema") } +// LoadSchemaVersion writes the schema metadata expected by server startup validation. +func (s *TestCluster) LoadSchemaVersion() { + for _, stmt := range []string{ + `CREATE TABLE IF NOT EXISTS schema_version(keyspace_name text PRIMARY KEY, creation_time timestamp, curr_version text, min_compatible_version text);`, + `CREATE TABLE IF NOT EXISTS schema_update_history(year int, month int, update_time timestamp, description text, manifest_md5 text, new_version text, old_version text, PRIMARY KEY ((year, month), update_time));`, + } { + if err := s.session.Query(stmt).Exec(); err != nil { + s.logger.Fatal("LoadSchemaVersion", tag.Error(err)) + } + } + + now := time.Now().UTC() + if err := s.session.Query( + `INSERT into schema_version(keyspace_name, creation_time, curr_version, min_compatible_version) VALUES (?,?,?,?)`, + s.keyspace, + now, + cassandraschema.Version, + cassandraschema.Version, + ).Exec(); err != nil { + s.logger.Fatal("LoadSchemaVersion", tag.Error(err)) + } + if err := s.session.Query( + `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES(?,?,?,?,?,?,?)`, + now.Year(), + int(now.Month()), + now, + "0", + cassandraschema.Version, + "", + "initial version", + ).Exec(); err != nil { + s.logger.Fatal("LoadSchemaVersion", tag.Error(err)) + } + s.logger.Info("loaded schema version", tag.String("version", cassandraschema.Version)) +} + func (s *TestCluster) GetSession() commongocql.Session { return s.session } diff --git a/common/persistence/sql/test_sql_persistence.go b/common/persistence/sql/test_sql_persistence.go index b92589e9a1a..20abae23d6e 100644 --- a/common/persistence/sql/test_sql_persistence.go +++ b/common/persistence/sql/test_sql_persistence.go @@ -83,6 +83,7 @@ func (s *TestCluster) SetupTestDatabase() { } s.LoadSchema(path.Join(schemaDir, "temporal", "schema.sql")) s.LoadSchema(path.Join(schemaDir, "visibility", "schema.sql")) + s.LoadSchemaVersion() } // Config returns the persistence config for connecting to this test cluster @@ -220,3 +221,38 @@ func (s *TestCluster) LoadSchema(schemaFile string) { } s.logger.Info("loaded schema") } + +// LoadSchemaVersion writes the schema metadata expected by server startup validation. +func (s *TestCluster) LoadSchemaVersion() { + var db sqlplugin.AdminDB + var err error + err = backoff.ThrottleRetry( + func() error { + db, err = NewSQLAdminDB(sqlplugin.DbKindMain, &s.cfg, resolver.NewNoopResolver(), log.NewTestLogger(), metrics.NoopMetricsHandler) + return err + }, + backoff.NewExponentialRetryPolicy(time.Second).WithExpirationInterval(time.Minute), + nil, + ) + if err != nil { + panic(err) + } + defer func() { + err := db.Close() + if err != nil { + panic(err) + } + }() + + expectedVersion := db.ExpectedVersion() + if err = db.CreateSchemaVersionTables(); err != nil { + s.logger.Fatal("CreateSchemaVersionTables", tag.Error(err)) + } + if err = db.UpdateSchemaVersion(s.cfg.DatabaseName, expectedVersion, expectedVersion); err != nil { + s.logger.Fatal("UpdateSchemaVersion", tag.Error(err)) + } + if err = db.WriteSchemaUpdateLog("0", expectedVersion, "", "initial version"); err != nil { + s.logger.Fatal("WriteSchemaUpdateLog", tag.Error(err)) + } + s.logger.Info("loaded schema version", tag.String("version", expectedVersion)) +} From af9e7928f38ee2e5937c77107bf305a7a78204f7 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Sat, 20 Jun 2026 22:47:26 -0700 Subject: [PATCH 04/14] Keep schema validation in startup checks --- temporal/fx.go | 10 +++------- temporal/fx_test.go | 42 ------------------------------------------ 2 files changed, 3 insertions(+), 49 deletions(-) diff --git a/temporal/fx.go b/temporal/fx.go index 55aded68d51..5489793ee31 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -182,13 +182,9 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { } persistenceConfig := so.config.Persistence - // Custom persistence factories own their own schema/version compatibility - // contract; the built-in SQL/Cassandra checker only applies to built-in stores. - if so.customDataStoreFactory == nil { - err = verifyPersistenceCompatibleVersion(persistenceConfig, so.persistenceServiceResolver, logger) - if err != nil { - return serverOptionsProvider{}, err - } + err = verifyPersistenceCompatibleVersion(persistenceConfig, so.persistenceServiceResolver, logger) + if err != nil { + return serverOptionsProvider{}, err } stopChan := make(chan any) diff --git a/temporal/fx_test.go b/temporal/fx_test.go index c00d2ee8cae..dfc54fb0a37 100644 --- a/temporal/fx_test.go +++ b/temporal/fx_test.go @@ -12,11 +12,8 @@ import ( "go.temporal.io/server/common/config" "go.temporal.io/server/common/log" "go.temporal.io/server/common/membership/static" - "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/primitives" - "go.temporal.io/server/common/resolver" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/tests/testutils" "go.uber.org/mock/gomock" @@ -124,7 +121,6 @@ func TestServerOptionsProviderStaticHostsOnlyRequiresRequestedServices(t *testin _, err := ServerOptionsProvider([]ServerOption{ ForServices([]string{string(primitives.FrontendService)}), WithConfig(cfg), - WithCustomDataStoreFactory(noopAbstractDataStoreFactory{}), WithStaticHosts(map[primitives.ServiceName]static.Hosts{ primitives.FrontendService: static.SingleLocalHost("127.0.0.1:7000"), }), @@ -142,7 +138,6 @@ func TestServerOptionsProviderStaticHostsRequiresEachRequestedService(t *testing string(primitives.HistoryService), }), WithConfig(cfg), - WithCustomDataStoreFactory(noopAbstractDataStoreFactory{}), WithStaticHosts(map[primitives.ServiceName]static.Hosts{ primitives.FrontendService: static.SingleLocalHost("127.0.0.1:7000"), }), @@ -152,30 +147,6 @@ func TestServerOptionsProviderStaticHostsRequiresEachRequestedService(t *testing require.ErrorContains(t, err, string(primitives.HistoryService)) } -func TestServerOptionsProviderCustomDataStoreFactorySkipsBuiltInSchemaCheck(t *testing.T) { - cfg := testServerConfig(config.DataStore{ - SQL: &config.SQL{ - PluginName: "not-registered", - DatabaseName: "temporal", - ConnectAddr: "127.0.0.1:1", - ConnectProtocol: "tcp", - }, - }) - - _, err := ServerOptionsProvider([]ServerOption{ - WithConfig(cfg), - WithLogger(log.NewNoopLogger()), - }) - require.ErrorContains(t, err, "unknown plugin") - - _, err = ServerOptionsProvider([]ServerOption{ - WithConfig(cfg), - WithCustomDataStoreFactory(noopAbstractDataStoreFactory{}), - WithLogger(log.NewNoopLogger()), - }) - require.NoError(t, err) -} - func TestUpdateIndexSearchAttributes(t *testing.T) { testCases := []struct { name string @@ -467,19 +438,6 @@ func testCustomDataStore(name string) config.DataStore { } } -type noopAbstractDataStoreFactory struct{} - -func (noopAbstractDataStoreFactory) NewFactory( - config.CustomDatastoreConfig, - resolver.ServiceResolver, - string, - log.Logger, - metrics.Handler, - serialization.Serializer, -) persistence.DataStoreFactory { - return nil -} - func TestTaskCategoryRegistryProvider(t *testing.T) { for _, tc := range []struct { name string From fdbdf4beb0237b4de00ce4c6578e60fe44539310 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Sun, 21 Jun 2026 00:18:23 -0700 Subject: [PATCH 05/14] Avoid panic in test schema setup --- common/persistence/sql/test_sql_persistence.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/persistence/sql/test_sql_persistence.go b/common/persistence/sql/test_sql_persistence.go index 20abae23d6e..85cb79771c9 100644 --- a/common/persistence/sql/test_sql_persistence.go +++ b/common/persistence/sql/test_sql_persistence.go @@ -123,12 +123,12 @@ func (s *TestCluster) CreateDatabase() { nil, ) if err != nil { - panic(err) + s.logger.Fatal("NewSQLAdminDB", tag.Error(err)) } defer func() { err := db.Close() if err != nil { - panic(err) + s.logger.Fatal("Close schema version DB", tag.Error(err)) } }() err = db.CreateDatabase(s.cfg.DatabaseName) From e01a658e0deed3ae71bc26b07cca71736197f660 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Sun, 21 Jun 2026 09:04:58 -0700 Subject: [PATCH 06/14] Avoid panic in schema version setup --- common/persistence/sql/test_sql_persistence.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/persistence/sql/test_sql_persistence.go b/common/persistence/sql/test_sql_persistence.go index 85cb79771c9..ecc38c141fa 100644 --- a/common/persistence/sql/test_sql_persistence.go +++ b/common/persistence/sql/test_sql_persistence.go @@ -235,12 +235,12 @@ func (s *TestCluster) LoadSchemaVersion() { nil, ) if err != nil { - panic(err) + s.logger.Fatal("NewSQLAdminDB", tag.Error(err)) } defer func() { err := db.Close() if err != nil { - panic(err) + s.logger.Fatal("Close schema version DB", tag.Error(err)) } }() From 91c5ab61a140ab190be6b9319d549f0739b74da5 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 22 Jun 2026 07:41:35 -0700 Subject: [PATCH 07/14] Remove static host startup change --- temporal/fx.go | 6 ++-- temporal/fx_test.go | 61 --------------------------------------- temporal/server_option.go | 2 +- 3 files changed, 4 insertions(+), 65 deletions(-) diff --git a/temporal/fx.go b/temporal/fx.go index 5489793ee31..40a3b37b096 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -261,10 +261,10 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { } } - // check that when static hosts are defined, they are defined for all requested hosts + // check that when static hosts are defined, they are defined for all required hosts if len(so.hostsByService) > 0 { - for service := range so.serviceNames { - hosts := so.hostsByService[service] + for _, service := range DefaultServices { + hosts := so.hostsByService[primitives.ServiceName(service)] if len(hosts.All) == 0 { return serverOptionsProvider{}, fmt.Errorf("%w: %v", missingServiceInStaticHosts, service) } diff --git a/temporal/fx_test.go b/temporal/fx_test.go index dfc54fb0a37..fc4fe72d57f 100644 --- a/temporal/fx_test.go +++ b/temporal/fx_test.go @@ -11,9 +11,7 @@ import ( "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/config" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/membership/static" "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/primitives" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/tests/testutils" "go.uber.org/mock/gomock" @@ -115,38 +113,6 @@ func TestOverwriteCurrentClusterMetadataWithDBRecord(t *testing.T) { require.Equal(t, int32(1024), cfg.Persistence.NumHistoryShards) } -func TestServerOptionsProviderStaticHostsOnlyRequiresRequestedServices(t *testing.T) { - cfg := testServerConfig(testCustomDataStore("test-custom")) - - _, err := ServerOptionsProvider([]ServerOption{ - ForServices([]string{string(primitives.FrontendService)}), - WithConfig(cfg), - WithStaticHosts(map[primitives.ServiceName]static.Hosts{ - primitives.FrontendService: static.SingleLocalHost("127.0.0.1:7000"), - }), - WithLogger(log.NewNoopLogger()), - }) - require.NoError(t, err) -} - -func TestServerOptionsProviderStaticHostsRequiresEachRequestedService(t *testing.T) { - cfg := testServerConfig(testCustomDataStore("test-custom")) - - _, err := ServerOptionsProvider([]ServerOption{ - ForServices([]string{ - string(primitives.FrontendService), - string(primitives.HistoryService), - }), - WithConfig(cfg), - WithStaticHosts(map[primitives.ServiceName]static.Hosts{ - primitives.FrontendService: static.SingleLocalHost("127.0.0.1:7000"), - }), - WithLogger(log.NewNoopLogger()), - }) - require.ErrorIs(t, err, missingServiceInStaticHosts) - require.ErrorContains(t, err, string(primitives.HistoryService)) -} - func TestUpdateIndexSearchAttributes(t *testing.T) { testCases := []struct { name string @@ -411,33 +377,6 @@ func TestUpdateIndexSearchAttributes(t *testing.T) { } } -func testServerConfig(dataStore config.DataStore) *config.Config { - return &config.Config{ - Persistence: config.Persistence{ - DefaultStore: "default", - VisibilityStore: "visibility", - NumHistoryShards: 1, - DataStores: map[string]config.DataStore{ - "default": dataStore, - "visibility": dataStore, - }, - }, - Services: map[string]config.Service{ - string(primitives.FrontendService): {}, - string(primitives.HistoryService): {}, - }, - } -} - -func testCustomDataStore(name string) config.DataStore { - return config.DataStore{ - CustomDataStoreConfig: &config.CustomDatastoreConfig{ - Name: name, - IndexName: name, - }, - } -} - func TestTaskCategoryRegistryProvider(t *testing.T) { for _, tc := range []struct { name string diff --git a/temporal/server_option.go b/temporal/server_option.go index 716c0ef5b49..54ffda44b48 100644 --- a/temporal/server_option.go +++ b/temporal/server_option.go @@ -63,7 +63,7 @@ func ForServices(names []string) ServerOption { } // WithStaticHosts disables dynamic service membership and resolves service hosts statically. -// At least one host must be provided for all services passed to ForServices. +// At least one host must be provided for all required services (frontend, history, matching, worker). // And a self-address must be provided for all services passed to ForServices. func WithStaticHosts(hostsByService map[primitives.ServiceName]static.Hosts) ServerOption { return applyFunc(func(s *serverOptions) { From e079cf61e1db3df9c5f1b850e9277168c1aea155 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 22 Jun 2026 07:46:22 -0700 Subject: [PATCH 08/14] Simplify test schema version setup --- common/persistence/cassandra/test.go | 71 +++++++++----- .../persistence/sql/test_sql_persistence.go | 98 ++++++------------- 2 files changed, 77 insertions(+), 92 deletions(-) diff --git a/common/persistence/cassandra/test.go b/common/persistence/cassandra/test.go index 413cb7cd54a..7c1486bfd2b 100644 --- a/common/persistence/cassandra/test.go +++ b/common/persistence/cassandra/test.go @@ -24,6 +24,25 @@ import ( const ( testSchemaDir = "schema/cassandra/" + + createSchemaVersionTableCQL = `CREATE TABLE IF NOT EXISTS schema_version(keyspace_name text PRIMARY KEY, ` + + `creation_time timestamp, ` + + `curr_version text, ` + + `min_compatible_version text);` + + createSchemaUpdateHistoryTableCQL = `CREATE TABLE IF NOT EXISTS schema_update_history(` + + `year int, ` + + `month int, ` + + `update_time timestamp, ` + + `description text, ` + + `manifest_md5 text, ` + + `new_version text, ` + + `old_version text, ` + + `PRIMARY KEY ((year, month), update_time));` + + writeSchemaVersionCQL = `INSERT into schema_version(keyspace_name, creation_time, curr_version, min_compatible_version) VALUES (?,?,?,?)` + + writeSchemaUpdateHistoryCQL = `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES(?,?,?,?,?,?,?)` ) // TestCluster allows executing cassandra operations in testing. @@ -185,38 +204,40 @@ func (s *TestCluster) LoadSchema(schemaFile string) { // LoadSchemaVersion writes the schema metadata expected by server startup validation. func (s *TestCluster) LoadSchemaVersion() { - for _, stmt := range []string{ - `CREATE TABLE IF NOT EXISTS schema_version(keyspace_name text PRIMARY KEY, creation_time timestamp, curr_version text, min_compatible_version text);`, - `CREATE TABLE IF NOT EXISTS schema_update_history(year int, month int, update_time timestamp, description text, manifest_md5 text, new_version text, old_version text, PRIMARY KEY ((year, month), update_time));`, - } { - if err := s.session.Query(stmt).Exec(); err != nil { - s.logger.Fatal("LoadSchemaVersion", tag.Error(err)) - } - } + s.createSchemaVersionTables() + s.updateSchemaVersion(cassandraschema.Version, cassandraschema.Version) + s.writeSchemaUpdateLog("0", cassandraschema.Version, "", "initial version") + s.logger.Info("loaded schema version", tag.String("version", cassandraschema.Version)) +} + +func (s *TestCluster) createSchemaVersionTables() { + s.execSchemaVersionQuery(createSchemaVersionTableCQL) + s.execSchemaVersionQuery(createSchemaUpdateHistoryTableCQL) +} +func (s *TestCluster) updateSchemaVersion(newVersion string, minCompatibleVersion string) { now := time.Now().UTC() - if err := s.session.Query( - `INSERT into schema_version(keyspace_name, creation_time, curr_version, min_compatible_version) VALUES (?,?,?,?)`, - s.keyspace, - now, - cassandraschema.Version, - cassandraschema.Version, - ).Exec(); err != nil { - s.logger.Fatal("LoadSchemaVersion", tag.Error(err)) - } - if err := s.session.Query( - `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES(?,?,?,?,?,?,?)`, + s.execSchemaVersionQuery(writeSchemaVersionCQL, s.keyspace, now, newVersion, minCompatibleVersion) +} + +func (s *TestCluster) writeSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, description string) { + now := time.Now().UTC() + s.execSchemaVersionQuery( + writeSchemaUpdateHistoryCQL, now.Year(), int(now.Month()), now, - "0", - cassandraschema.Version, - "", - "initial version", - ).Exec(); err != nil { + oldVersion, + newVersion, + manifestMD5, + description, + ) +} + +func (s *TestCluster) execSchemaVersionQuery(stmt string, args ...any) { + if err := s.session.Query(stmt, args...).Exec(); err != nil { s.logger.Fatal("LoadSchemaVersion", tag.Error(err)) } - s.logger.Info("loaded schema version", tag.String("version", cassandraschema.Version)) } func (s *TestCluster) GetSession() commongocql.Session { diff --git a/common/persistence/sql/test_sql_persistence.go b/common/persistence/sql/test_sql_persistence.go index ecc38c141fa..a46222f61b3 100644 --- a/common/persistence/sql/test_sql_persistence.go +++ b/common/persistence/sql/test_sql_persistence.go @@ -112,26 +112,9 @@ func (s *TestCluster) CreateDatabase() { cfg2.DatabaseName = "" } - var db sqlplugin.AdminDB - var err error - err = backoff.ThrottleRetry( - func() error { - db, err = NewSQLAdminDB(sqlplugin.DbKindUnknown, &cfg2, resolver.NewNoopResolver(), log.NewTestLogger(), metrics.NoopMetricsHandler) - return err - }, - backoff.NewExponentialRetryPolicy(time.Second).WithExpirationInterval(time.Minute), - nil, - ) - if err != nil { - s.logger.Fatal("NewSQLAdminDB", tag.Error(err)) - } - defer func() { - err := db.Close() - if err != nil { - s.logger.Fatal("Close schema version DB", tag.Error(err)) - } - }() - err = db.CreateDatabase(s.cfg.DatabaseName) + db := s.newAdminDB(sqlplugin.DbKindUnknown, &cfg2) + defer s.closeAdminDB(db) + err := db.CreateDatabase(s.cfg.DatabaseName) if err != nil { panic(err) } @@ -155,18 +138,9 @@ func (s *TestCluster) DropDatabase() { // NOTE need to connect with empty name to drop the database cfg2.DatabaseName = "" - db, err := NewSQLAdminDB(sqlplugin.DbKindUnknown, &cfg2, resolver.NewNoopResolver(), log.NewTestLogger(), metrics.NoopMetricsHandler) - if err != nil { - panic(err) - } - defer func() { - err := db.Close() - if err != nil { - panic(err) - } - }() - err = db.DropDatabase(s.cfg.DatabaseName) - if err != nil { + db := s.newAdminDB(sqlplugin.DbKindUnknown, &cfg2) + defer s.closeAdminDB(db) + if err := db.DropDatabase(s.cfg.DatabaseName); err != nil { panic(err) } s.logger.Info("dropped database", tag.String("database", s.cfg.DatabaseName)) @@ -184,24 +158,8 @@ func (s *TestCluster) LoadSchema(schemaFile string) { ) } - var db sqlplugin.AdminDB - err = backoff.ThrottleRetry( - func() error { - db, err = NewSQLAdminDB(sqlplugin.DbKindUnknown, &s.cfg, resolver.NewNoopResolver(), log.NewTestLogger(), metrics.NoopMetricsHandler) - return err - }, - backoff.NewExponentialRetryPolicy(time.Second).WithExpirationInterval(time.Minute), - nil, - ) - if err != nil { - panic(err) - } - defer func() { - err := db.Close() - if err != nil { - panic(err) - } - }() + db := s.newAdminDB(sqlplugin.DbKindUnknown, &s.cfg) + defer s.closeAdminDB(db) if rewriter, ok := db.(sqlplugin.SchemaStatementRewriter); ok { statements = rewriter.RewriteSchemaStatements(statements) @@ -224,11 +182,28 @@ func (s *TestCluster) LoadSchema(schemaFile string) { // LoadSchemaVersion writes the schema metadata expected by server startup validation. func (s *TestCluster) LoadSchemaVersion() { + db := s.newAdminDB(sqlplugin.DbKindMain, &s.cfg) + defer s.closeAdminDB(db) + + expectedVersion := db.ExpectedVersion() + if err := db.CreateSchemaVersionTables(); err != nil { + s.logger.Fatal("CreateSchemaVersionTables", tag.Error(err)) + } + if err := db.UpdateSchemaVersion(s.cfg.DatabaseName, expectedVersion, expectedVersion); err != nil { + s.logger.Fatal("UpdateSchemaVersion", tag.Error(err)) + } + if err := db.WriteSchemaUpdateLog("0", expectedVersion, "", "initial version"); err != nil { + s.logger.Fatal("WriteSchemaUpdateLog", tag.Error(err)) + } + s.logger.Info("loaded schema version", tag.String("version", expectedVersion)) +} + +func (s *TestCluster) newAdminDB(kind sqlplugin.DbKind, cfg *config.SQL) sqlplugin.AdminDB { var db sqlplugin.AdminDB var err error err = backoff.ThrottleRetry( func() error { - db, err = NewSQLAdminDB(sqlplugin.DbKindMain, &s.cfg, resolver.NewNoopResolver(), log.NewTestLogger(), metrics.NoopMetricsHandler) + db, err = NewSQLAdminDB(kind, cfg, resolver.NewNoopResolver(), log.NewTestLogger(), metrics.NoopMetricsHandler) return err }, backoff.NewExponentialRetryPolicy(time.Second).WithExpirationInterval(time.Minute), @@ -237,22 +212,11 @@ func (s *TestCluster) LoadSchemaVersion() { if err != nil { s.logger.Fatal("NewSQLAdminDB", tag.Error(err)) } - defer func() { - err := db.Close() - if err != nil { - s.logger.Fatal("Close schema version DB", tag.Error(err)) - } - }() + return db +} - expectedVersion := db.ExpectedVersion() - if err = db.CreateSchemaVersionTables(); err != nil { - s.logger.Fatal("CreateSchemaVersionTables", tag.Error(err)) - } - if err = db.UpdateSchemaVersion(s.cfg.DatabaseName, expectedVersion, expectedVersion); err != nil { - s.logger.Fatal("UpdateSchemaVersion", tag.Error(err)) +func (s *TestCluster) closeAdminDB(db sqlplugin.AdminDB) { + if err := db.Close(); err != nil { + s.logger.Fatal("Close schema DB", tag.Error(err)) } - if err = db.WriteSchemaUpdateLog("0", expectedVersion, "", "initial version"); err != nil { - s.logger.Fatal("WriteSchemaUpdateLog", tag.Error(err)) - } - s.logger.Info("loaded schema version", tag.String("version", expectedVersion)) } From b1308d1de52f20740f8b294069690b48799d94a4 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 22 Jun 2026 07:49:45 -0700 Subject: [PATCH 09/14] Update test_sql_persistence.go --- common/persistence/sql/test_sql_persistence.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/common/persistence/sql/test_sql_persistence.go b/common/persistence/sql/test_sql_persistence.go index a46222f61b3..4654f958c24 100644 --- a/common/persistence/sql/test_sql_persistence.go +++ b/common/persistence/sql/test_sql_persistence.go @@ -200,16 +200,14 @@ func (s *TestCluster) LoadSchemaVersion() { func (s *TestCluster) newAdminDB(kind sqlplugin.DbKind, cfg *config.SQL) sqlplugin.AdminDB { var db sqlplugin.AdminDB - var err error - err = backoff.ThrottleRetry( + err := backoff.ThrottleRetry( func() error { db, err = NewSQLAdminDB(kind, cfg, resolver.NewNoopResolver(), log.NewTestLogger(), metrics.NoopMetricsHandler) return err }, backoff.NewExponentialRetryPolicy(time.Second).WithExpirationInterval(time.Minute), nil, - ) - if err != nil { + ); err != nil { s.logger.Fatal("NewSQLAdminDB", tag.Error(err)) } return db From 20395a0326dc19b805bb68c6d5843b504cc521af Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 22 Jun 2026 07:54:10 -0700 Subject: [PATCH 10/14] Update test_sql_persistence.go --- common/persistence/sql/test_sql_persistence.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/persistence/sql/test_sql_persistence.go b/common/persistence/sql/test_sql_persistence.go index 4654f958c24..a46222f61b3 100644 --- a/common/persistence/sql/test_sql_persistence.go +++ b/common/persistence/sql/test_sql_persistence.go @@ -200,14 +200,16 @@ func (s *TestCluster) LoadSchemaVersion() { func (s *TestCluster) newAdminDB(kind sqlplugin.DbKind, cfg *config.SQL) sqlplugin.AdminDB { var db sqlplugin.AdminDB - err := backoff.ThrottleRetry( + var err error + err = backoff.ThrottleRetry( func() error { db, err = NewSQLAdminDB(kind, cfg, resolver.NewNoopResolver(), log.NewTestLogger(), metrics.NoopMetricsHandler) return err }, backoff.NewExponentialRetryPolicy(time.Second).WithExpirationInterval(time.Minute), nil, - ); err != nil { + ) + if err != nil { s.logger.Fatal("NewSQLAdminDB", tag.Error(err)) } return db From 908112c28789dec89cc8a4be8464d5344170dd89 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 22 Jun 2026 07:56:13 -0700 Subject: [PATCH 11/14] Update test.go --- common/persistence/cassandra/test.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/common/persistence/cassandra/test.go b/common/persistence/cassandra/test.go index 7c1486bfd2b..a061022c72f 100644 --- a/common/persistence/cassandra/test.go +++ b/common/persistence/cassandra/test.go @@ -222,16 +222,7 @@ func (s *TestCluster) updateSchemaVersion(newVersion string, minCompatibleVersio func (s *TestCluster) writeSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, description string) { now := time.Now().UTC() - s.execSchemaVersionQuery( - writeSchemaUpdateHistoryCQL, - now.Year(), - int(now.Month()), - now, - oldVersion, - newVersion, - manifestMD5, - description, - ) + s.execSchemaVersionQuery(writeSchemaUpdateHistoryCQL, now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, description) } func (s *TestCluster) execSchemaVersionQuery(stmt string, args ...any) { From 6fb30a40a3697f90e67525315e93ad9b034af203 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 22 Jun 2026 07:56:41 -0700 Subject: [PATCH 12/14] Update test.go --- common/persistence/cassandra/test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/persistence/cassandra/test.go b/common/persistence/cassandra/test.go index a061022c72f..9e59e3da778 100644 --- a/common/persistence/cassandra/test.go +++ b/common/persistence/cassandra/test.go @@ -217,12 +217,14 @@ func (s *TestCluster) createSchemaVersionTables() { func (s *TestCluster) updateSchemaVersion(newVersion string, minCompatibleVersion string) { now := time.Now().UTC() - s.execSchemaVersionQuery(writeSchemaVersionCQL, s.keyspace, now, newVersion, minCompatibleVersion) + s.execSchemaVersionQuery(writeSchemaVersionCQL, + s.keyspace, now, newVersion, minCompatibleVersion) } func (s *TestCluster) writeSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, description string) { now := time.Now().UTC() - s.execSchemaVersionQuery(writeSchemaUpdateHistoryCQL, now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, description) + s.execSchemaVersionQuery(writeSchemaUpdateHistoryCQL, + now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, description) } func (s *TestCluster) execSchemaVersionQuery(stmt string, args ...any) { From 3b335ed9b548bca450da9482ff1eb1d2168ae2d2 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 22 Jun 2026 07:56:49 -0700 Subject: [PATCH 13/14] Update test.go --- common/persistence/cassandra/test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/persistence/cassandra/test.go b/common/persistence/cassandra/test.go index 9e59e3da778..e4433866820 100644 --- a/common/persistence/cassandra/test.go +++ b/common/persistence/cassandra/test.go @@ -217,13 +217,15 @@ func (s *TestCluster) createSchemaVersionTables() { func (s *TestCluster) updateSchemaVersion(newVersion string, minCompatibleVersion string) { now := time.Now().UTC() - s.execSchemaVersionQuery(writeSchemaVersionCQL, + s.execSchemaVersionQuery( + writeSchemaVersionCQL, s.keyspace, now, newVersion, minCompatibleVersion) } func (s *TestCluster) writeSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, description string) { now := time.Now().UTC() - s.execSchemaVersionQuery(writeSchemaUpdateHistoryCQL, + s.execSchemaVersionQuery( + writeSchemaUpdateHistoryCQL, now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, description) } From 02bd1a6c226ff60282be19e805116524451d0de1 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 22 Jun 2026 07:58:13 -0700 Subject: [PATCH 14/14] Unexport test schema version setup --- common/persistence/cassandra/test.go | 7 +++---- common/persistence/sql/test_sql_persistence.go | 5 ++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/common/persistence/cassandra/test.go b/common/persistence/cassandra/test.go index e4433866820..1755faefae7 100644 --- a/common/persistence/cassandra/test.go +++ b/common/persistence/cassandra/test.go @@ -113,7 +113,7 @@ func (s *TestCluster) SetupTestDatabase() { } s.LoadSchema(path.Join(schemaDir, "temporal", "schema.cql")) - s.LoadSchemaVersion() + s.loadSchemaVersion() } // TearDownTestDatabase from PersistenceTestCluster interface @@ -202,8 +202,7 @@ func (s *TestCluster) LoadSchema(schemaFile string) { s.logger.Info("loaded schema") } -// LoadSchemaVersion writes the schema metadata expected by server startup validation. -func (s *TestCluster) LoadSchemaVersion() { +func (s *TestCluster) loadSchemaVersion() { s.createSchemaVersionTables() s.updateSchemaVersion(cassandraschema.Version, cassandraschema.Version) s.writeSchemaUpdateLog("0", cassandraschema.Version, "", "initial version") @@ -231,7 +230,7 @@ func (s *TestCluster) writeSchemaUpdateLog(oldVersion string, newVersion string, func (s *TestCluster) execSchemaVersionQuery(stmt string, args ...any) { if err := s.session.Query(stmt, args...).Exec(); err != nil { - s.logger.Fatal("LoadSchemaVersion", tag.Error(err)) + s.logger.Fatal("loadSchemaVersion", tag.Error(err)) } } diff --git a/common/persistence/sql/test_sql_persistence.go b/common/persistence/sql/test_sql_persistence.go index a46222f61b3..557afda9cf2 100644 --- a/common/persistence/sql/test_sql_persistence.go +++ b/common/persistence/sql/test_sql_persistence.go @@ -83,7 +83,7 @@ func (s *TestCluster) SetupTestDatabase() { } s.LoadSchema(path.Join(schemaDir, "temporal", "schema.sql")) s.LoadSchema(path.Join(schemaDir, "visibility", "schema.sql")) - s.LoadSchemaVersion() + s.loadSchemaVersion() } // Config returns the persistence config for connecting to this test cluster @@ -180,8 +180,7 @@ func (s *TestCluster) LoadSchema(schemaFile string) { s.logger.Info("loaded schema") } -// LoadSchemaVersion writes the schema metadata expected by server startup validation. -func (s *TestCluster) LoadSchemaVersion() { +func (s *TestCluster) loadSchemaVersion() { db := s.newAdminDB(sqlplugin.DbKindMain, &s.cfg) defer s.closeAdminDB(db)