Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion k8s/exceptionless/templates/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ spec:
annotations:
checksum/config: {{ include (print $.Template.BasePath "/config.yaml") . | sha256sum }}
spec:
# SSE connections are long-lived; give the pod enough time to drain before SIGTERM.
# The preStop sleep lets the ALB/ingress controller deregister the pod before traffic stops,
# then the remaining window allows ASP.NET Core to cancel RequestAborted tokens and clean up.
# When push is eventually enabled behind a Gateway API RoutePolicy, revisit this value.
terminationGracePeriodSeconds: 60
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
Expand All @@ -39,6 +44,13 @@ spec:
- name: {{ template "exceptionless.name" . }}-api
image: "{{ .Values.api.image.repository }}:{{ .Values.version }}"
imagePullPolicy: {{ .Values.api.image.pullPolicy }}
lifecycle:
preStop:
# Give the ALB ~15s to deregister this pod before SIGTERM fires.
# The total graceful window is terminationGracePeriodSeconds (60s) minus
# this sleep, leaving ~45s for ASP.NET Core to drain active SSE connections.
exec:
command: ["sleep", "15"]
livenessProbe:
httpGet:
path: /health
Expand Down Expand Up @@ -82,7 +94,10 @@ spec:
{{- include "exceptionless.otel-env" . | indent 12 }}
- name: RunJobsInProcess
value: 'false'
- name: EnableWebSockets
# SSE rollout prerequisite: Azure Application Gateway for Containers Ingress API
# does not support the routeTimeout=0s override required for long-lived SSE streams.
# Keep push disabled here until this route moves to Gateway API + RoutePolicy.
- name: EnablePush
value: 'false'
{{- if (empty .Values.storage.connectionString) }}
volumeMounts:
Expand Down Expand Up @@ -163,6 +178,8 @@ metadata:
alb.networking.azure.io/alb-namespace: {{ .Values.ingress.albNamespace }}
alb.networking.azure.io/alb-frontend: {{ template "exceptionless.fullname" . }}-fe
cert-manager.io/cluster-issuer: {{ .Values.ingress.clusterIssuer }}
# SSE is not safe to enable behind the current AGC Ingress API path.
# Migrate to Gateway API and attach a RoutePolicy with routeTimeout: 0s before enabling push.
spec:
ingressClassName: azure-alb-external
tls:
Expand Down
4 changes: 2 additions & 2 deletions src/Exceptionless.Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ public static void LogConfiguration(IServiceProvider serviceProvider, AppOptions
if (String.IsNullOrEmpty(appOptions.StorageOptions.Provider))
logger.LogWarning("Distributed storage is NOT enabled on {MachineName}", Environment.MachineName);

if (!appOptions.EnableWebSockets)
logger.LogWarning("Web Sockets is NOT enabled on {MachineName}", Environment.MachineName);
if (!appOptions.EnablePush)
logger.LogWarning("Real-time push (SSE) is NOT enabled on {MachineName}", Environment.MachineName);

if (String.IsNullOrEmpty(appOptions.EmailOptions.SmtpHost))
logger.LogWarning("Emails will NOT be sent until the SmtpHost is configured on {MachineName}", Environment.MachineName);
Expand Down
9 changes: 7 additions & 2 deletions src/Exceptionless.Core/Configuration/AppOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ public class AppOptions

public bool EnableRepositoryNotifications { get; internal set; }

public bool EnableWebSockets { get; internal set; }
/// <summary>
/// Controls whether real-time push (SSE) is enabled. Reads from either 'EnablePush'
/// or legacy 'EnableWebSockets' config key for backward compatibility.
/// </summary>
public bool EnablePush { get; internal set; }

public string? Version { get; internal set; }

Expand Down Expand Up @@ -111,7 +115,8 @@ public static AppOptions ReadFromConfiguration(IConfiguration config)
options.BulkBatchSize = config.GetValue(nameof(options.BulkBatchSize), 1000);

options.EnableRepositoryNotifications = config.GetValue(nameof(options.EnableRepositoryNotifications), true);
options.EnableWebSockets = config.GetValue(nameof(options.EnableWebSockets), true);
// Support both new 'EnablePush' and legacy 'EnableWebSockets' config keys
options.EnablePush = config.GetValue(nameof(options.EnablePush), config.GetValue("EnableWebSockets", true));
Comment thread
niemyjski marked this conversation as resolved.

try
{
Expand Down
5 changes: 5 additions & 0 deletions src/Exceptionless.Core/Utility/AppDiagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public GaugeInfo(Meter meter, string name)

internal static readonly Counter<int> SavedViewsSize = Meter.CreateCounter<int>("ex.savedviews.size", description: "Size of user saved views");
internal static readonly Counter<int> SavedViewsViewTypeSize = Meter.CreateCounter<int>("ex.savedviews.viewtype.size", description: "Size of user saved views by view type");

internal static readonly Counter<int> PushSseConnectionsOpened = Meter.CreateCounter<int>("ex.push.connections.sse.opened", description: "SSE push connections opened");
internal static readonly Counter<int> PushSseConnectionsClosed = Meter.CreateCounter<int>("ex.push.connections.sse.closed", description: "SSE push connections closed");
internal static readonly Counter<int> PushWebSocketConnectionsOpened = Meter.CreateCounter<int>("ex.push.connections.websocket.opened", description: "WebSocket push connections opened");
internal static readonly Counter<int> PushWebSocketConnectionsClosed = Meter.CreateCounter<int>("ex.push.connections.websocket.closed", description: "WebSocket push connections closed");
}

public static class MetricsClientExtensions
Expand Down
1 change: 1 addition & 0 deletions src/Exceptionless.Web/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class Bootstrapper
{
public static void RegisterServices(IServiceCollection services, AppOptions appOptions, ILoggerFactory loggerFactory)
{
services.AddSingleton<SseConnectionManager>();
services.AddSingleton<WebSocketConnectionManager>();
services.AddSingleton<MessageBusBroker>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
this.timeoutInterval = 2000;
this.forcedClose = false;
this.timedOut = false;
this.hasConnectedOnce = false;
this.protocols = [];
this.onopen = function (event) {};
this.onclose = function (event) {};
this.onconnecting = function () {};
this.onmessage = function (event) {};
this.onerror = function (event) {};
this.ontransportfallback = function (event) {
return false;
};
this.url = url;
this.protocols = protocols;
this.readyState = WebSocket.CONNECTING;
Expand All @@ -38,6 +42,7 @@
this.ws.onopen = function (event) {
clearTimeout(timeout);
_this.readyState = WebSocket.OPEN;
_this.hasConnectedOnce = true;
reconnectAttempt = false;
_this.onopen(event);
};
Expand All @@ -47,6 +52,8 @@
if (_this.forcedClose) {
_this.readyState = WebSocket.CLOSED;
_this.onclose(event);
} else if (!_this.hasConnectedOnce && _this.ontransportfallback(event) === true) {
_this.readyState = WebSocket.CLOSED;
} else {
_this.readyState = WebSocket.CONNECTING;
_this.onconnecting();
Expand Down Expand Up @@ -101,27 +108,13 @@

function startDelayed(delay) {
function startImpl() {
_connection = new ResilientWebSocket(getPushUrl());
_connection.onmessage = function (ev) {
var data = ev.data ? JSON.parse(ev.data) : null;
if (!data || !data.type) {
return;
}

if (data.message && data.message.change_type >= 0) {
data.message.added = data.message.change_type === 0;
data.message.updated = data.message.change_type === 1;
data.message.deleted = data.message.change_type === 2;
}

$rootScope.$emit(data.type, data.message);
if (supportsWebSocket() && startWebSocket()) {
return;
}

// This event is fired when a user is added or removed from an organization.
if (data.type === "UserMembershipChanged" && data.message && data.message.organization_id) {
$rootScope.$emit("OrganizationChanged", data.message);
$rootScope.$emit("ProjectChanged", data.message);
}
};
if (!startSse()) {
$ExceptionlessClient.submitLog("No supported push transport is available.", "warn", source);
}
}

if (_connection || _websocketTimeout) {
Expand All @@ -131,20 +124,82 @@
_websocketTimeout = $timeout(startImpl, delay || 1000);
}

function startWebSocket() {
// Keep WebSocket as the preferred Angular transport during rollout so existing
// release notification refresh behavior stays unchanged until SSE fully replaces it.
try {
_connection = new ResilientWebSocket(getWebSocketPushUrl());
} catch (error) {
_connection = null;
return false;
}

_connection.ontransportfallback = function () {
return startSse();
};
_connection.onmessage = function (ev) {
handleMessage(ev.data);
};

return true;
}

function startSse() {
if (typeof EventSource === "undefined") {
return false;
}

_connection = new EventSource(getSsePushUrl());
_connection.onmessage = function (ev) {
handleMessage(ev.data);
};

return true;
}

function handleMessage(payload) {
var data = payload ? JSON.parse(payload) : null;
if (!data || !data.type) {
return;
}

if (data.message && data.message.change_type >= 0) {
data.message.added = data.message.change_type === 0;
data.message.updated = data.message.change_type === 1;
data.message.deleted = data.message.change_type === 2;
}

$rootScope.$emit(data.type, data.message);

// This event is fired when a user is added or removed from an organization.
if (data.type === "UserMembershipChanged" && data.message && data.message.organization_id) {
$rootScope.$emit("OrganizationChanged", data.message);
$rootScope.$emit("ProjectChanged", data.message);
}
}

function stop() {
if (_websocketTimeout) {
$timeout.cancel(_websocketTimeout);
_websocketTimeout = null;
}

if (_connection) {
_connection.close();
var connection = _connection;
_connection = null;

if (connection.close) {
connection.close();
}
}
}

function getPushUrl() {
var pushUrl = BASE_URL + "/api/v2/push?access_token=" + authService.getToken();
function supportsWebSocket() {
return typeof WebSocket !== "undefined";
}

function getWebSocketPushUrl() {
var pushUrl = getSsePushUrl();
var protoMatch = /^(https?):\/\//;
if (BASE_URL.startsWith("https:")) {
return pushUrl.replace(protoMatch, "wss://");
Expand All @@ -153,6 +208,10 @@
return pushUrl.replace(protoMatch, "ws://");
}

function getSsePushUrl() {
return BASE_URL + "/api/v2/push?access_token=" + authService.getToken();
}

var service = {
start: start,
startDelayed: startDelayed,
Expand Down
Loading
Loading