diff --git a/proxy/internal/env/env.go b/proxy/internal/env/env.go index c18edbd7a76..0b4895033e4 100644 --- a/proxy/internal/env/env.go +++ b/proxy/internal/env/env.go @@ -59,6 +59,12 @@ type Environment interface { DefaultBackendRTC() string // Default backend SRT port (UDP) DefaultBackendSRT() string + // Server alive duration for load balancer + ServerAliveDuration() string + // HLS stream alive duration for load balancer + HLSAliveDuration() string + // RTC stream alive duration for load balancer + RTCAliveDuration() string } type environment struct{} @@ -160,6 +166,18 @@ func (e *environment) DefaultBackendSRT() string { return os.Getenv("PROXY_DEFAULT_BACKEND_SRT") } +func (e *environment) ServerAliveDuration() string { + return os.Getenv("PROXY_SERVER_ALIVE_DURATION") +} + +func (e *environment) HLSAliveDuration() string { + return os.Getenv("PROXY_HLS_ALIVE_DURATION") +} + +func (e *environment) RTCAliveDuration() string { + return os.Getenv("PROXY_RTC_ALIVE_DURATION") +} + // loadEnvFile loads the environment variables from .env file. func loadEnvFile(ctx context.Context) error { if err := godotenv.Load(); err != nil { @@ -222,6 +240,11 @@ func buildDefaultEnvironmentVariables(ctx context.Context) { // Default backend udp srt port, for debugging. setEnvDefault("PROXY_DEFAULT_BACKEND_SRT", "10080") + // Load balancer TTL configurations. + setEnvDefault("PROXY_SERVER_ALIVE_DURATION", "300s") + setEnvDefault("PROXY_HLS_ALIVE_DURATION", "120s") + setEnvDefault("PROXY_RTC_ALIVE_DURATION", "120s") + logger.Df(ctx, "load .env as GO_PPROF=%v, "+ "PROXY_FORCE_QUIT_TIMEOUT=%v, PROXY_GRACE_QUIT_TIMEOUT=%v, "+ "PROXY_HTTP_API=%v, PROXY_HTTP_SERVER=%v, PROXY_RTMP_SERVER=%v, "+ @@ -231,7 +254,8 @@ func buildDefaultEnvironmentVariables(ctx context.Context) { "PROXY_DEFAULT_BACKEND_HTTP=%v, PROXY_DEFAULT_BACKEND_API=%v, "+ "PROXY_DEFAULT_BACKEND_RTC=%v, PROXY_DEFAULT_BACKEND_SRT=%v, "+ "PROXY_LOAD_BALANCER_TYPE=%v, PROXY_REDIS_HOST=%v, PROXY_REDIS_PORT=%v, "+ - "PROXY_REDIS_PASSWORD=%v, PROXY_REDIS_DB=%v", + "PROXY_REDIS_PASSWORD=%v, PROXY_REDIS_DB=%v, "+ + "PROXY_SERVER_ALIVE_DURATION=%v, PROXY_HLS_ALIVE_DURATION=%v, PROXY_RTC_ALIVE_DURATION=%v", os.Getenv("GO_PPROF"), os.Getenv("PROXY_FORCE_QUIT_TIMEOUT"), os.Getenv("PROXY_GRACE_QUIT_TIMEOUT"), os.Getenv("PROXY_HTTP_API"), os.Getenv("PROXY_HTTP_SERVER"), os.Getenv("PROXY_RTMP_SERVER"), @@ -242,6 +266,7 @@ func buildDefaultEnvironmentVariables(ctx context.Context) { os.Getenv("PROXY_DEFAULT_BACKEND_RTC"), os.Getenv("PROXY_DEFAULT_BACKEND_SRT"), os.Getenv("PROXY_LOAD_BALANCER_TYPE"), os.Getenv("PROXY_REDIS_HOST"), os.Getenv("PROXY_REDIS_PORT"), os.Getenv("PROXY_REDIS_PASSWORD"), os.Getenv("PROXY_REDIS_DB"), + os.Getenv("PROXY_SERVER_ALIVE_DURATION"), os.Getenv("PROXY_HLS_ALIVE_DURATION"), os.Getenv("PROXY_RTC_ALIVE_DURATION"), ) } diff --git a/proxy/internal/lb/lb.go b/proxy/internal/lb/lb.go index eae62a57420..facf30fb90e 100644 --- a/proxy/internal/lb/lb.go +++ b/proxy/internal/lb/lb.go @@ -8,16 +8,32 @@ import ( "fmt" "strings" "time" + + "proxy/internal/env" ) -// If server heartbeat in this duration, it's alive. -const ServerAliveDuration = 300 * time.Second +// ParseDurationOrDefault parses a duration string and returns the default if parsing fails. +func ParseDurationOrDefault(s string, defaultDuration time.Duration) time.Duration { + if d, err := time.ParseDuration(s); err == nil { + return d + } + return defaultDuration +} -// If HLS streaming update in this duration, it's alive. -const HLSAliveDuration = 120 * time.Second +// GetServerAliveDuration returns the configurable server alive duration from environment. +func GetServerAliveDuration(environment env.Environment) time.Duration { + return ParseDurationOrDefault(environment.ServerAliveDuration(), 300*time.Second) +} -// If WebRTC streaming update in this duration, it's alive. -const RTCAliveDuration = 120 * time.Second +// GetHLSAliveDuration returns the configurable HLS alive duration from environment. +func GetHLSAliveDuration(environment env.Environment) time.Duration { + return ParseDurationOrDefault(environment.HLSAliveDuration(), 120*time.Second) +} + +// GetRTCAliveDuration returns the configurable RTC alive duration from environment. +func GetRTCAliveDuration(environment env.Environment) time.Duration { + return ParseDurationOrDefault(environment.RTCAliveDuration(), 120*time.Second) +} // SRSServer represents a backend origin server. type SRSServer struct { diff --git a/proxy/internal/lb/mem.go b/proxy/internal/lb/mem.go index f3b9fe3ce5f..e6b8c2043d6 100644 --- a/proxy/internal/lb/mem.go +++ b/proxy/internal/lb/mem.go @@ -80,10 +80,13 @@ func (v *MemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSSe return server, nil } + // Get the configurable server alive duration. + serverAliveDuration := GetServerAliveDuration(v.environment) + // Gather all servers that were alive within the last few seconds. var servers []*SRSServer v.servers.Range(func(key string, server *SRSServer) bool { - if time.Since(server.UpdatedAt) < ServerAliveDuration { + if time.Since(server.UpdatedAt) < serverAliveDuration { servers = append(servers, server) } return true diff --git a/proxy/internal/lb/redis.go b/proxy/internal/lb/redis.go index 936e6fc84bf..e6258b6eed5 100644 --- a/proxy/internal/lb/redis.go +++ b/proxy/internal/lb/redis.go @@ -86,8 +86,11 @@ func (v *RedisLoadBalancer) Update(ctx context.Context, server *SRSServer) error return errors.Wrapf(err, "marshal server %+v", server) } + // Get the configurable server alive duration. + serverAliveDuration := GetServerAliveDuration(v.environment) + key := v.redisKeyServer(server.ID()) - if err = v.rdb.Set(ctx, key, b, ServerAliveDuration).Err(); err != nil { + if err = v.rdb.Set(ctx, key, b, serverAliveDuration).Err(); err != nil { return errors.Wrapf(err, "set key=%v server %+v", key, server) } @@ -214,14 +217,17 @@ func (v *RedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string return nil, errors.Wrapf(err, "marshal HLS %v", value) } + // Get the configurable HLS alive duration. + hlsAliveDuration := GetHLSAliveDuration(v.environment) + key := v.redisKeyHLS(streamURL) - if err = v.rdb.Set(ctx, key, b, HLSAliveDuration).Err(); err != nil { + if err = v.rdb.Set(ctx, key, b, hlsAliveDuration).Err(); err != nil { return nil, errors.Wrapf(err, "set key=%v HLS %v", key, value) } // Get SPBHID from value key2 := v.redisKeySPBHID(value.GetSPBHID()) - if err := v.rdb.Set(ctx, key2, b, HLSAliveDuration).Err(); err != nil { + if err := v.rdb.Set(ctx, key2, b, hlsAliveDuration).Err(); err != nil { return nil, errors.Wrapf(err, "set key=%v HLS %v", key2, value) } @@ -235,14 +241,17 @@ func (v *RedisLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, v return errors.Wrapf(err, "marshal WebRTC %v", value) } + // Get the configurable RTC alive duration. + rtcAliveDuration := GetRTCAliveDuration(v.environment) + key := v.redisKeyRTC(streamURL) - if err = v.rdb.Set(ctx, key, b, RTCAliveDuration).Err(); err != nil { + if err = v.rdb.Set(ctx, key, b, rtcAliveDuration).Err(); err != nil { return errors.Wrapf(err, "set key=%v WebRTC %v", key, value) } // Get Ufrag from value key2 := v.redisKeyUfrag(value.GetUfrag()) - if err := v.rdb.Set(ctx, key2, b, RTCAliveDuration).Err(); err != nil { + if err := v.rdb.Set(ctx, key2, b, rtcAliveDuration).Err(); err != nil { return errors.Wrapf(err, "set key=%v WebRTC %v", key2, value) }