Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.103
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip/chains/evm v0.0.0-20260608180601-efa81bfdfda9
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260622160845-86b9f94f3650
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260622205851-5fe4f9bdcd4a
github.com/smartcontractkit/chainlink-common/keystore v1.2.0
github.com/smartcontractkit/chainlink-data-streams v0.1.15-0.20260522094612-5f9f748bd87a
github.com/smartcontractkit/chainlink-deployments-framework v0.105.0
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions core/services/workflows/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type EngineMetrics struct {
triggerQueueToExecutionStartSeconds metric.Float64Histogram
triggerPayloadBytes metric.Int64Histogram
executionSemaphoreWaitSeconds metric.Float64Histogram

donTimeCallsCounter metric.Int64Counter
donTimeTimeoutsCounter metric.Int64Counter
donTimeErrorsCounter metric.Int64Counter
}

func InitMonitoringResources() (em *EngineMetrics, err error) {
Expand Down Expand Up @@ -401,6 +405,30 @@ func InitMonitoringResources() (em *EngineMetrics, err error) {
return nil, fmt.Errorf("failed to register execution semaphore wait histogram: %w", err)
}

em.donTimeCallsCounter, err = beholder.GetMeter().Int64Counter(
"platform_engine_dontime_calls_total",
metric.WithDescription("Total GetDONTime calls made by the workflow engine"),
)
if err != nil {
return nil, fmt.Errorf("failed to register don time calls counter: %w", err)
}

em.donTimeTimeoutsCounter, err = beholder.GetMeter().Int64Counter(
"platform_engine_dontime_timeouts_total",
metric.WithDescription("GetDONTime calls that timed out waiting for a response"),
)
if err != nil {
return nil, fmt.Errorf("failed to register don time timeouts counter: %w", err)
}

em.donTimeErrorsCounter, err = beholder.GetMeter().Int64Counter(
"platform_engine_dontime_errors_total",
metric.WithDescription("GetDONTime calls that failed without DON consensus recovery"),
)
if err != nil {
return nil, fmt.Errorf("failed to register don time errors counter: %w", err)
}

return em, nil
}

Expand Down Expand Up @@ -743,3 +771,18 @@ func (c WorkflowsMetricLabeler) RecordExecutionSemaphoreWaitSeconds(ctx context.
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
c.em.executionSemaphoreWaitSeconds.Record(ctx, waitSeconds, metric.WithAttributes(otelLabels...))
}

func (c WorkflowsMetricLabeler) IncrementDonTimeCallsCounter(ctx context.Context) {
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
c.em.donTimeCallsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c WorkflowsMetricLabeler) IncrementDonTimeTimeoutsCounter(ctx context.Context) {
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
c.em.donTimeTimeoutsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c WorkflowsMetricLabeler) IncrementDonTimeErrorsCounter(ctx context.Context) {
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
c.em.donTimeErrorsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}
17 changes: 15 additions & 2 deletions core/services/workflows/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type EngineLimiters struct {
UserMetricLabelValueLength limits.BoundLimiter[int]

ExecutionTimestampsEnabled limits.GateLimiter
DONTimeRequestTimeout limits.TimeLimiter
}

// NewLimiters returns a new set of EngineLimiters based on the default configuration, and optionally modified by cfgFn.
Expand Down Expand Up @@ -264,6 +265,10 @@ func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflo
if err != nil {
return
}
l.DONTimeRequestTimeout, err = lf.MakeTimeLimiter(cfg.DONTime.RequestTimeout)
if err != nil {
return
}
return
}

Expand Down Expand Up @@ -302,6 +307,7 @@ func (l *EngineLimiters) EvictWorkflow(workflowID string) error {
l.ConfidentialHTTPCalls,
l.SecretsCalls,
l.ExecutionTimestampsEnabled,
l.DONTimeRequestTimeout,
}
var errs error
for _, e := range evictables {
Expand Down Expand Up @@ -343,11 +349,13 @@ func (l *EngineLimiters) Close() error {
l.ConfidentialHTTPCalls,
l.SecretsCalls,
l.ExecutionTimestampsEnabled,
l.DONTimeRequestTimeout,
)
}

type EngineFeatureFlags struct {
FeatureMultiTriggerExecutionIDs limits.RangeLimiter[config.Timestamp]
FeatureMultiTriggerExecutionIDs limits.RangeLimiter[config.Timestamp]
FeatureUseSingleDONTimeProviderPerExecution limits.RangeLimiter[config.Timestamp]
}

func NewFeatureFlags(lf limits.Factory, cfgFn func(*cresettings.Workflows)) (*EngineFeatureFlags, error) {
Expand All @@ -359,8 +367,13 @@ func NewFeatureFlags(lf limits.Factory, cfgFn func(*cresettings.Workflows)) (*En
if err != nil {
return nil, err
}
featureUseSingleDONTimeProviderPerExecution, err := limits.MakeRangeLimiter(lf, cfg.FeatureUseSingleDONTimeProviderPerExecutionActivePeriod)
if err != nil {
return nil, err
}
return &EngineFeatureFlags{
FeatureMultiTriggerExecutionIDs: featureMultiTriggerExecutionIDs,
FeatureMultiTriggerExecutionIDs: featureMultiTriggerExecutionIDs,
FeatureUseSingleDONTimeProviderPerExecution: featureUseSingleDONTimeProviderPerExecution,
}, nil
}

Expand Down
30 changes: 26 additions & 4 deletions core/services/workflows/v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error {

var timeProvider TimeProvider = &types.LocalTimeProvider{}
if !e.cfg.UseLocalTimeProvider {
timeProvider = NewDonTimeProvider(e.cfg.DonTimeStore, e.cfg.WorkflowID, e.logger())
timeProvider = NewDonTimeProvider(e.cfg.DonTimeStore, e.cfg.WorkflowID, e.donTimeRequestTimeout(subCtx, e.cfg.LocalLimiters.DONTimeRequestTimeout), e.logger(), e.metrics)
}

moduleExecuteMaxResponseSizeBytes, err := e.cfg.LocalLimiters.ExecutionResponse.Limit(ctx)
Expand Down Expand Up @@ -721,9 +721,10 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue
lggr := e.logger().With(platform.KeyOrganizationID, e.orgID)

var executionTimestamp time.Time
var executionDonTimeProvider TimeProvider
if tsErr := e.cfg.LocalLimiters.ExecutionTimestampsEnabled.AllowErr(ctx); tsErr == nil {
executionTimeProvider := NewDonTimeProvider(e.cfg.DonTimeStore, fullExecutionID, lggr)
donTime, dtErr := executionTimeProvider.GetDONTime()
executionDonTimeProvider = NewDonTimeProvider(e.cfg.DonTimeStore, fullExecutionID, e.donTimeRequestTimeout(ctx, e.cfg.LocalLimiters.DONTimeRequestTimeout), lggr, e.metrics)
donTime, dtErr := executionDonTimeProvider.GetDONTime()
if dtErr != nil {
executionTimestamp = e.cfg.Clock.Now()
lggr.Warnw("Failed to get DON time for execution timestamp, falling back to local time", "err", dtErr, "executionTimestamp", executionTimestamp)
Expand Down Expand Up @@ -924,7 +925,11 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue

var timeProvider TimeProvider = &types.LocalTimeProvider{}
if !e.cfg.UseLocalTimeProvider {
timeProvider = NewDonTimeProvider(e.cfg.DonTimeStore, executionID, lggr)
if e.cfg.FeatureFlags.FeatureUseSingleDONTimeProviderPerExecution.Check(ctx, config.NewTimestamp(executionTimestamp)) == nil && executionDonTimeProvider != nil {
timeProvider = executionDonTimeProvider
} else {
timeProvider = NewDonTimeProvider(e.cfg.DonTimeStore, executionID, e.donTimeRequestTimeout(execCtx, e.cfg.LocalLimiters.DONTimeRequestTimeout), lggr, e.metrics)
}
}

moduleExecuteMaxResponseSizeBytes, err := e.cfg.LocalLimiters.ExecutionResponse.Limit(ctx)
Expand Down Expand Up @@ -1206,3 +1211,20 @@ func (e *Engine) emitUserLogs(ctx context.Context, userLogChan chan *protoevents
}
}
}

func (e *Engine) donTimeRequestTimeout(ctx context.Context, limiter limits.TimeLimiter) time.Duration {
defaultTimeout := cresettings.Default.PerWorkflow.DONTime.RequestTimeout.DefaultValue
if limiter != nil {
limit, err := limiter.Limit(ctx)
if err != nil {
e.logger().Errorw("Failed to get DON time request timeout", "err", err)
return defaultTimeout
}
if limit <= 0 {
e.logger().Warnw("DON time request timeout is less than or equal to 0, using default timeout", "defaultTimeout", defaultTimeout)
return defaultTimeout
}
return limit
}
return defaultTimeout
}
62 changes: 60 additions & 2 deletions core/services/workflows/v2/time_provider.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package v2

import (
"context"
"fmt"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows/monitoring"
)

type TimeProvider interface {
Expand All @@ -18,15 +22,25 @@ type DonTimeProvider struct {
workflowExecutionID string
timeSeqNum int
donTimeStore *dontime.Store
requestTimeout time.Duration
lggr logger.Logger
metrics *monitoring.WorkflowsMetricLabeler
}

func NewDonTimeProvider(store *dontime.Store, workflowExecutionID string, lggr logger.Logger) TimeProvider {
func NewDonTimeProvider(
store *dontime.Store,
workflowExecutionID string,
requestTimeout time.Duration,
lggr logger.Logger,
metrics *monitoring.WorkflowsMetricLabeler,
) TimeProvider {
return &DonTimeProvider{
workflowExecutionID: workflowExecutionID,
timeSeqNum: 0,
donTimeStore: store,
requestTimeout: requestTimeout,
lggr: logger.Named(lggr, "TimeProvider"),
metrics: metrics,
}
}

Expand All @@ -40,21 +54,65 @@ func (tp *DonTimeProvider) GetDONTime() (time.Time, error) {
tp.timeSeqNum++
}()

donTimeResp := <-tp.donTimeStore.RequestDonTime(tp.workflowExecutionID, tp.timeSeqNum)
tp.recordDonTimeCall()

ch := tp.donTimeStore.RequestDonTime(tp.workflowExecutionID, tp.timeSeqNum)
timer := time.NewTimer(tp.requestTimeout)
defer timer.Stop()

select {
case donTimeResp := <-ch:
return tp.donTimeFromResponse(donTimeResp)
case <-timer.C:
tp.donTimeStore.RemoveRequest(tp.workflowExecutionID)
tp.recordDonTimeTimeout()
return tp.donTimeFromResponse(dontime.Response{
WorkflowExecutionID: tp.workflowExecutionID,
SeqNum: tp.timeSeqNum,
Err: fmt.Errorf(
"timeout exceeded: could not process request before expiry, workflowExecutionID %s",
tp.workflowExecutionID),
})
}
}

func (tp *DonTimeProvider) donTimeFromResponse(donTimeResp dontime.Response) (time.Time, error) {
if donTimeResp.Err != nil {
// This node's request timed out, so it did not include the request in its observation.
// However, consensus may still have been reached if other nodes included the request.
if donTime := tp.donTimeStore.GetDonTimeForSeqNum(tp.workflowExecutionID, tp.timeSeqNum); donTime != nil {
// Consensus was reached; return the DON time generated by the network.
return fromUnixMilli(*donTime), nil
}
tp.recordDonTimeError()
tp.lggr.Errorf("No DON time reached for time call sequence %d on executionID %s; returning local node time as fallback. "+
"This may result in non-deterministic behavior across nodes for this workflow step", tp.timeSeqNum, tp.workflowExecutionID)
return tp.GetNodeTime(), nil
}
return fromUnixMilli(donTimeResp.Timestamp), nil
}

func (tp *DonTimeProvider) recordDonTimeCall() {
if tp.metrics == nil {
return
}
tp.metrics.IncrementDonTimeCallsCounter(context.Background())
}

func (tp *DonTimeProvider) recordDonTimeTimeout() {
if tp.metrics == nil {
return
}
tp.metrics.IncrementDonTimeTimeoutsCounter(context.Background())
}

func (tp *DonTimeProvider) recordDonTimeError() {
if tp.metrics == nil {
return
}
tp.metrics.IncrementDonTimeErrorsCounter(context.Background())
}

func fromUnixMilli(ms int64) time.Time {
return time.Unix(0, ms*int64(time.Millisecond)).UTC()
}
35 changes: 35 additions & 0 deletions core/services/workflows/v2/time_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package v2

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func TestDonTimeProvider_GetDONTime_requestTimeout(t *testing.T) {
t.Parallel()

store := dontime.NewStore(dontime.DefaultRequestTimeout)
provider := NewDonTimeProvider(store, "exec-id", 50*time.Millisecond, logger.TestLogger(t), nil)

start := time.Now()
_, err := provider.GetDONTime()
elapsed := time.Since(start)

require.NoError(t, err)
require.GreaterOrEqual(t, elapsed, 50*time.Millisecond)
require.Less(t, elapsed, 200*time.Millisecond)

// A timed-out request must not block subsequent sequence numbers.
start = time.Now()
_, err = provider.GetDONTime()
elapsed = time.Since(start)
require.NoError(t, err)
require.GreaterOrEqual(t, elapsed, 50*time.Millisecond)
require.Less(t, elapsed, 200*time.Millisecond)
}
2 changes: 1 addition & 1 deletion deployment/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260415165642-49f23e4d76cc
github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260511195239-0f6e1b177fc7
github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20260526183310-eb2e6d9cf68a
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260622160845-86b9f94f3650
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260622205851-5fe4f9bdcd4a
github.com/smartcontractkit/chainlink-common/keystore v1.2.0
github.com/smartcontractkit/chainlink-data-streams v0.1.15-0.20260522094612-5f9f748bd87a
github.com/smartcontractkit/chainlink-deployments-framework v0.105.0
Expand Down
4 changes: 2 additions & 2 deletions deployment/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ require (
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260415165642-49f23e4d76cc
github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260415165642-49f23e4d76cc
github.com/smartcontractkit/chainlink-ccv v0.0.2-0.20260617194919-3078e69b2aa0
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260622160845-86b9f94f3650
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260622205851-5fe4f9bdcd4a
github.com/smartcontractkit/chainlink-common/keystore v1.2.0
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0
github.com/smartcontractkit/chainlink-data-streams v0.1.15-0.20260522094612-5f9f748bd87a
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/smartcontractkit/chainlink-ccip/chains/evm v0.0.0-20260608180601-efa81bfdfda9
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260506144252-c100eabfda74
github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260511195239-0f6e1b177fc7
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260622160845-86b9f94f3650
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260622205851-5fe4f9bdcd4a
github.com/smartcontractkit/chainlink-common/keystore v1.2.0
github.com/smartcontractkit/chainlink-deployments-framework v0.105.0
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260618132327-105433c1ac66
Expand Down
Loading
Loading