Skip to content

Commit bb57fa5

Browse files
authored
chore: read partition id from message properties and propagate it to router jobs (#6518)
🔒 Scanned for secrets using gitleaks 8.28.0 # Description - Filling the gateway job's `PartitionID` field from `MessageProperties`. - Propagating gateway's job `PartitionID` field to router/batchrouter job through `Metadata`. - Refactoring event metadata handling and reporting in the processor pipeline, improving clarity and consistency. ## Linear Ticket Resolves PIPE-2570 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 95f7cb9 commit bb57fa5

File tree

13 files changed

+546
-498
lines changed

13 files changed

+546
-498
lines changed

gateway/gateway_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2089,6 +2089,67 @@ var _ = Describe("Gateway", func() {
20892089
Expect(err).To(BeNil())
20902090
})
20912091

2092+
It("fills message properties in job metadata", func() {
2093+
properties := stream.MessageProperties{
2094+
RequestType: "dummyRequestType",
2095+
RoutingKey: "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
2096+
WorkspaceID: "workspaceID",
2097+
SourceID: "sourceID",
2098+
SourceJobRunID: "sourceJobRunID",
2099+
SourceTaskRunID: "sourceTaskRunID",
2100+
UserID: "userID",
2101+
TraceID: "traceId",
2102+
DestinationID: "destinationID",
2103+
SourceType: "sourceType",
2104+
ReceivedAt: time.Date(2024, 1, 1, 1, 1, 1, 1, time.UTC),
2105+
RequestIP: "dummyIP",
2106+
PartitionID: "partitionID",
2107+
IsBot: true,
2108+
BotAction: "none",
2109+
BotName: "botName",
2110+
BotURL: "botURL",
2111+
BotIsInvalidBrowser: true,
2112+
}
2113+
msg := stream.Message{
2114+
Properties: properties,
2115+
Payload: []byte(`{}`),
2116+
}
2117+
messages := []stream.Message{msg}
2118+
payload, err := jsonrs.Marshal(messages)
2119+
Expect(err).To(BeNil())
2120+
2121+
req := &webRequestT{
2122+
reqType: "batch",
2123+
authContext: rCtxEnabled,
2124+
done: make(chan<- string),
2125+
requestPayload: payload,
2126+
}
2127+
jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
2128+
Expect(err).To(BeNil())
2129+
Expect(jobsWithStats).To(HaveLen(1))
2130+
2131+
job := jobsWithStats[0].job
2132+
Expect(job.UserID).To(Equal(properties.RoutingKey))
2133+
Expect(job.WorkspaceId).To(Equal(properties.WorkspaceID))
2134+
Expect(job.PartitionID).To(Equal(properties.PartitionID))
2135+
var params jobParams
2136+
err = jsonrs.Unmarshal(job.Parameters, &params)
2137+
Expect(err).To(BeNil())
2138+
2139+
Expect(params.SourceID).To(Equal(properties.SourceID))
2140+
Expect(params.SourceJobRunID).To(Equal(properties.SourceJobRunID))
2141+
Expect(params.SourceTaskRunID).To(Equal(properties.SourceTaskRunID))
2142+
Expect(params.UserID).To(Equal(properties.UserID))
2143+
Expect(params.TraceParent).To(Equal(properties.TraceID))
2144+
Expect(params.DestinationID).To(Equal(properties.DestinationID))
2145+
Expect(params.SourceCategory).To(Equal(properties.SourceType))
2146+
Expect(params.IsBot).To(Equal(properties.IsBot))
2147+
Expect(params.BotName).To(Equal(properties.BotName))
2148+
Expect(params.BotURL).To(Equal(properties.BotURL))
2149+
Expect(params.BotIsInvalidBrowser).To(Equal(properties.BotIsInvalidBrowser))
2150+
Expect(params.BotAction).To(Equal(properties.BotAction))
2151+
})
2152+
20922153
It("doesn't override if receivedAt or request_ip already exists in payload", func() {
20932154
properties := stream.MessageProperties{
20942155
RequestType: "dummyRequestType",

gateway/handle.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -791,26 +791,26 @@ type jobWithMetadata struct {
791791
skipLiveEventRecording bool
792792
}
793793

794+
type jobParams struct {
795+
MessageID string `json:"message_id"`
796+
SourceID string `json:"source_id"`
797+
SourceJobRunID string `json:"source_job_run_id"`
798+
SourceTaskRunID string `json:"source_task_run_id"`
799+
UserID string `json:"user_id"`
800+
TraceParent string `json:"traceparent"`
801+
DestinationID string `json:"destination_id,omitempty"`
802+
SourceCategory string `json:"source_category"`
803+
IsBot bool `json:"is_bot,omitempty"`
804+
BotName string `json:"bot_name,omitempty"`
805+
BotURL string `json:"bot_url,omitempty"`
806+
BotIsInvalidBrowser bool `json:"bot_is_invalid_browser,omitempty"`
807+
BotAction string `json:"bot_action,omitempty"`
808+
IsEventBlocked bool `json:"is_event_blocked,omitempty"`
809+
}
810+
794811
func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byte) (
795812
[]jobWithMetadata, error,
796813
) {
797-
type params struct {
798-
MessageID string `json:"message_id"`
799-
SourceID string `json:"source_id"`
800-
SourceJobRunID string `json:"source_job_run_id"`
801-
SourceTaskRunID string `json:"source_task_run_id"`
802-
UserID string `json:"user_id"`
803-
TraceParent string `json:"traceparent"`
804-
DestinationID string `json:"destination_id,omitempty"`
805-
SourceCategory string `json:"source_category"`
806-
IsBot bool `json:"is_bot,omitempty"`
807-
BotName string `json:"bot_name,omitempty"`
808-
BotURL string `json:"bot_url,omitempty"`
809-
BotIsInvalidBrowser bool `json:"bot_is_invalid_browser,omitempty"`
810-
BotAction string `json:"bot_action,omitempty"`
811-
IsEventBlocked bool `json:"is_event_blocked,omitempty"`
812-
}
813-
814814
type singularEventBatch struct {
815815
Batch []json.RawMessage `json:"batch"`
816816
ReceivedAt string `json:"receivedAt"`
@@ -1006,7 +1006,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
10061006
"workspaceId": msg.Properties.WorkspaceID,
10071007
}).Since(msg.Properties.ReceivedAt)
10081008

1009-
jobsDBParams := params{
1009+
jobsDBParams := jobParams{
10101010
MessageID: messageID,
10111011
SourceID: msg.Properties.SourceID,
10121012
SourceJobRunID: msg.Properties.SourceJobRunID,
@@ -1088,6 +1088,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
10881088
EventPayload: payload,
10891089
EventCount: len(eventBatch.Batch),
10901090
WorkspaceId: msg.Properties.WorkspaceID,
1091+
PartitionID: msg.Properties.PartitionID,
10911092
},
10921093
})
10931094
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ require (
100100
github.com/rudderlabs/keydb v1.3.0
101101
github.com/rudderlabs/rudder-go-kit v0.67.0
102102
github.com/rudderlabs/rudder-observability-kit v0.0.6
103-
github.com/rudderlabs/rudder-schemas v0.7.0
103+
github.com/rudderlabs/rudder-schemas v0.8.0
104104
github.com/rudderlabs/rudder-transformer/go v0.0.0-20250707171833-9cd525669b1b
105105
github.com/rudderlabs/sql-tunnels v0.1.7
106106
github.com/rudderlabs/sqlconnect-go v1.20.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,8 +1210,8 @@ github.com/rudderlabs/rudder-go-kit v0.67.0 h1:4gSkAGaFE/v5axRjntUNVbn8snvxACnQS
12101210
github.com/rudderlabs/rudder-go-kit v0.67.0/go.mod h1:5w7w1e38HB3bAYh7d4+jrSrk57pYNvLcCadFI0+R26o=
12111211
github.com/rudderlabs/rudder-observability-kit v0.0.6 h1:xIA/1Sp38B542EYzxR7qUfNGJwsQCpmBnl6h5ul0AHA=
12121212
github.com/rudderlabs/rudder-observability-kit v0.0.6/go.mod h1:nR3GvY7HvuBaBqOKFfzLP9uYZu7OpzMqW2eeT2ikXtU=
1213-
github.com/rudderlabs/rudder-schemas v0.7.0 h1:hKShHYpbIldE1Q591vodI6iaAZ/IUOyC1DqUUJZysNU=
1214-
github.com/rudderlabs/rudder-schemas v0.7.0/go.mod h1:iZXMmwHMTuFUAdLgt2zzMLeHdQ3u/saIhXPzPP6dmM8=
1213+
github.com/rudderlabs/rudder-schemas v0.8.0 h1:xvBxP5JLuCROHgJ01tE77U8UJS/wpsGBUPfeeddTi9I=
1214+
github.com/rudderlabs/rudder-schemas v0.8.0/go.mod h1:iZXMmwHMTuFUAdLgt2zzMLeHdQ3u/saIhXPzPP6dmM8=
12151215
github.com/rudderlabs/rudder-transformer/go v0.0.0-20250707171833-9cd525669b1b h1:ZzGJA+3/rVIK9RUJDCSRHmDzaa8aVaKmR1A1BpXcHk0=
12161216
github.com/rudderlabs/rudder-transformer/go v0.0.0-20250707171833-9cd525669b1b/go.mod h1:3NGitPz4pYRRZ6Xt09S+8hb0tHK/9pZcKJ3OgOTaSmE=
12171217
github.com/rudderlabs/sonnet v1.0.2 h1:nPfmDKD9gUwT571Dwtcsx0VIglSchvyNjuRLju4Xs3s=

processor/internal/transformer/destination_transformer/embedded/warehouse/benchmark/deployment.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ spec:
102102
"destinationType": "POSTGRES",
103103
"destinationName": "",
104104
"messageId": "messageId",
105-
"oauthAccessToken": "",
106105
"traceparent": "",
107106
"messageIds": null,
108107
"rudderId": "",

processor/internal/transformer/trackingplan_validation/trackingplan_validation.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,14 @@ func (t *Client) Validate(ctx context.Context, clientEvents []types.TransformerE
9292
ctx, cancel := context.WithCancel(ctx)
9393
defer cancel()
9494

95-
trackWg.Add(1)
96-
go func() {
95+
trackWg.Go(func() {
9796
l := t.log.Withn(labels.ToLoggerFields()...)
9897
transformerutils.TrackLongRunningTransformation(ctx, "trackingPlan_validation", t.config.timeoutDuration, l)
99-
trackWg.Done()
100-
}()
98+
})
10199

102100
batches := lo.Chunk(clientEvents, batchSize)
103101

104-
t.stat.NewTaggedStat(
105-
"processor.transformer_request_batch_count",
106-
stats.HistogramType,
107-
labels.ToStatsTag(),
108-
).Observe(float64(len(batches)))
102+
t.stat.NewTaggedStat("processor.transformer_request_batch_count", stats.HistogramType, labels.ToStatsTag()).Observe(float64(len(batches)))
109103

110104
transformResponse := make([][]types.TransformerResponse, len(batches))
111105

processor/partition_worker_handle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ type workerHandle interface {
2222
markExecuting(ctx context.Context, partition string, jobs []*jobsdb.JobT) error
2323
jobSplitter(ctx context.Context, jobs []*jobsdb.JobT, rsourcesStats rsources.StatsCollector) []subJob
2424
preprocessStage(partition string, subJobs subJob, delay time.Duration) (*srcHydrationMessage, error)
25+
srcHydrationStage(partition string, in *srcHydrationMessage) (*preTransformationMessage, error)
2526
pretransformStage(partition string, preTrans *preTransformationMessage) (*transformationMessage, error)
2627
userTransformStage(partition string, in *transformationMessage) *userTransformData
2728
destinationTransformStage(partition string, in *userTransformData) *storeMessage
2829
storeStage(partition string, pipelineIndex int, in *storeMessage)
29-
srcHydrationStage(partition string, in *srcHydrationMessage) (*preTransformationMessage, error)
3030
}
3131

3232
// workerHandleConfig is a struct containing the processor.Handle configuration relevant for workers

0 commit comments

Comments
 (0)