Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion proxy/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ type Environment interface {
DefaultBackendRTC() string
// Default backend SRT port (UDP)
DefaultBackendSRT() string
// Server alive duration for load balancer
ServerAliveDuration() string
// HLS stream alive duration for load balancer
HLSAliveDuration() string
// RTC stream alive duration for load balancer
RTCAliveDuration() string
}

type environment struct{}
Expand Down Expand Up @@ -160,6 +166,18 @@ func (e *environment) DefaultBackendSRT() string {
return os.Getenv("PROXY_DEFAULT_BACKEND_SRT")
}

func (e *environment) ServerAliveDuration() string {
return os.Getenv("PROXY_SERVER_ALIVE_DURATION")
}

func (e *environment) HLSAliveDuration() string {
return os.Getenv("PROXY_HLS_ALIVE_DURATION")
}

func (e *environment) RTCAliveDuration() string {
return os.Getenv("PROXY_RTC_ALIVE_DURATION")
}

// loadEnvFile loads the environment variables from .env file.
func loadEnvFile(ctx context.Context) error {
if err := godotenv.Load(); err != nil {
Expand Down Expand Up @@ -222,6 +240,11 @@ func buildDefaultEnvironmentVariables(ctx context.Context) {
// Default backend udp srt port, for debugging.
setEnvDefault("PROXY_DEFAULT_BACKEND_SRT", "10080")

// Load balancer TTL configurations.
setEnvDefault("PROXY_SERVER_ALIVE_DURATION", "300s")
setEnvDefault("PROXY_HLS_ALIVE_DURATION", "120s")
setEnvDefault("PROXY_RTC_ALIVE_DURATION", "120s")

logger.Df(ctx, "load .env as GO_PPROF=%v, "+
"PROXY_FORCE_QUIT_TIMEOUT=%v, PROXY_GRACE_QUIT_TIMEOUT=%v, "+
"PROXY_HTTP_API=%v, PROXY_HTTP_SERVER=%v, PROXY_RTMP_SERVER=%v, "+
Expand All @@ -231,7 +254,8 @@ func buildDefaultEnvironmentVariables(ctx context.Context) {
"PROXY_DEFAULT_BACKEND_HTTP=%v, PROXY_DEFAULT_BACKEND_API=%v, "+
"PROXY_DEFAULT_BACKEND_RTC=%v, PROXY_DEFAULT_BACKEND_SRT=%v, "+
"PROXY_LOAD_BALANCER_TYPE=%v, PROXY_REDIS_HOST=%v, PROXY_REDIS_PORT=%v, "+
"PROXY_REDIS_PASSWORD=%v, PROXY_REDIS_DB=%v",
"PROXY_REDIS_PASSWORD=%v, PROXY_REDIS_DB=%v, "+
"PROXY_SERVER_ALIVE_DURATION=%v, PROXY_HLS_ALIVE_DURATION=%v, PROXY_RTC_ALIVE_DURATION=%v",
os.Getenv("GO_PPROF"),
os.Getenv("PROXY_FORCE_QUIT_TIMEOUT"), os.Getenv("PROXY_GRACE_QUIT_TIMEOUT"),
os.Getenv("PROXY_HTTP_API"), os.Getenv("PROXY_HTTP_SERVER"), os.Getenv("PROXY_RTMP_SERVER"),
Expand All @@ -242,6 +266,7 @@ func buildDefaultEnvironmentVariables(ctx context.Context) {
os.Getenv("PROXY_DEFAULT_BACKEND_RTC"), os.Getenv("PROXY_DEFAULT_BACKEND_SRT"),
os.Getenv("PROXY_LOAD_BALANCER_TYPE"), os.Getenv("PROXY_REDIS_HOST"), os.Getenv("PROXY_REDIS_PORT"),
os.Getenv("PROXY_REDIS_PASSWORD"), os.Getenv("PROXY_REDIS_DB"),
os.Getenv("PROXY_SERVER_ALIVE_DURATION"), os.Getenv("PROXY_HLS_ALIVE_DURATION"), os.Getenv("PROXY_RTC_ALIVE_DURATION"),
)
}

Expand Down
28 changes: 22 additions & 6 deletions proxy/internal/lb/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,32 @@ import (
"fmt"
"strings"
"time"

"proxy/internal/env"
)

// If server heartbeat in this duration, it's alive.
const ServerAliveDuration = 300 * time.Second
// ParseDurationOrDefault parses a duration string and returns the default if parsing fails.
func ParseDurationOrDefault(s string, defaultDuration time.Duration) time.Duration {
if d, err := time.ParseDuration(s); err == nil {
return d
}
return defaultDuration
}

// If HLS streaming update in this duration, it's alive.
const HLSAliveDuration = 120 * time.Second
// GetServerAliveDuration returns the configurable server alive duration from environment.
func GetServerAliveDuration(environment env.Environment) time.Duration {
return ParseDurationOrDefault(environment.ServerAliveDuration(), 300*time.Second)
}

// If WebRTC streaming update in this duration, it's alive.
const RTCAliveDuration = 120 * time.Second
// GetHLSAliveDuration returns the configurable HLS alive duration from environment.
func GetHLSAliveDuration(environment env.Environment) time.Duration {
return ParseDurationOrDefault(environment.HLSAliveDuration(), 120*time.Second)
}

// GetRTCAliveDuration returns the configurable RTC alive duration from environment.
func GetRTCAliveDuration(environment env.Environment) time.Duration {
return ParseDurationOrDefault(environment.RTCAliveDuration(), 120*time.Second)
}

// SRSServer represents a backend origin server.
type SRSServer struct {
Expand Down
5 changes: 4 additions & 1 deletion proxy/internal/lb/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ func (v *MemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRSSe
return server, nil
}

// Get the configurable server alive duration.
serverAliveDuration := GetServerAliveDuration(v.environment)

// Gather all servers that were alive within the last few seconds.
var servers []*SRSServer
v.servers.Range(func(key string, server *SRSServer) bool {
if time.Since(server.UpdatedAt) < ServerAliveDuration {
if time.Since(server.UpdatedAt) < serverAliveDuration {
servers = append(servers, server)
}
return true
Expand Down
19 changes: 14 additions & 5 deletions proxy/internal/lb/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ func (v *RedisLoadBalancer) Update(ctx context.Context, server *SRSServer) error
return errors.Wrapf(err, "marshal server %+v", server)
}

// Get the configurable server alive duration.
serverAliveDuration := GetServerAliveDuration(v.environment)

key := v.redisKeyServer(server.ID())
if err = v.rdb.Set(ctx, key, b, ServerAliveDuration).Err(); err != nil {
if err = v.rdb.Set(ctx, key, b, serverAliveDuration).Err(); err != nil {
return errors.Wrapf(err, "set key=%v server %+v", key, server)
}

Expand Down Expand Up @@ -214,14 +217,17 @@ func (v *RedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string
return nil, errors.Wrapf(err, "marshal HLS %v", value)
}

// Get the configurable HLS alive duration.
hlsAliveDuration := GetHLSAliveDuration(v.environment)

key := v.redisKeyHLS(streamURL)
if err = v.rdb.Set(ctx, key, b, HLSAliveDuration).Err(); err != nil {
if err = v.rdb.Set(ctx, key, b, hlsAliveDuration).Err(); err != nil {
return nil, errors.Wrapf(err, "set key=%v HLS %v", key, value)
}

// Get SPBHID from value
key2 := v.redisKeySPBHID(value.GetSPBHID())
if err := v.rdb.Set(ctx, key2, b, HLSAliveDuration).Err(); err != nil {
if err := v.rdb.Set(ctx, key2, b, hlsAliveDuration).Err(); err != nil {
return nil, errors.Wrapf(err, "set key=%v HLS %v", key2, value)
}

Expand All @@ -235,14 +241,17 @@ func (v *RedisLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, v
return errors.Wrapf(err, "marshal WebRTC %v", value)
}

// Get the configurable RTC alive duration.
rtcAliveDuration := GetRTCAliveDuration(v.environment)

key := v.redisKeyRTC(streamURL)
if err = v.rdb.Set(ctx, key, b, RTCAliveDuration).Err(); err != nil {
if err = v.rdb.Set(ctx, key, b, rtcAliveDuration).Err(); err != nil {
return errors.Wrapf(err, "set key=%v WebRTC %v", key, value)
}

// Get Ufrag from value
key2 := v.redisKeyUfrag(value.GetUfrag())
if err := v.rdb.Set(ctx, key2, b, RTCAliveDuration).Err(); err != nil {
if err := v.rdb.Set(ctx, key2, b, rtcAliveDuration).Err(); err != nil {
return errors.Wrapf(err, "set key=%v WebRTC %v", key2, value)
}

Expand Down
Loading