Skip to content

Commit d98f544

Browse files
authored
feat: Add local sync summary (#17112)
#### Summary This pr adds in a sync summary that gets outputted to a user defined location. If the user doesn't define the location via the cli flag then the summary will not be persisted. The file is a .jsonl format
1 parent 2d00a46 commit d98f544

6 files changed

Lines changed: 286 additions & 10 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@ __debug_bin
3232
.vercel
3333
venv
3434
__pycache__
35+
cli/cloudquery-sync-summary.json

cli/cmd/summary.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package cmd
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"os"
7+
"path/filepath"
8+
)
9+
10+
type syncSummary struct {
11+
CliVersion string `json:"cli_version"`
12+
DestinationErrors uint64 `json:"destination_errors"`
13+
DestinationName string `json:"destination_name"`
14+
DestinationPath string `json:"destination_path"`
15+
DestinationVersion string `json:"destination_version"`
16+
DestinationWarnings uint64 `json:"destination_warnings"`
17+
Resources uint64 `json:"resources"`
18+
SourceErrors uint64 `json:"source_errors"`
19+
SourceName string `json:"source_name"`
20+
SourcePath string `json:"source_path"`
21+
SourceVersion string `json:"source_version"`
22+
SourceWarnings uint64 `json:"source_warnings"`
23+
SyncID string `json:"sync_id"`
24+
}
25+
26+
func persistSummary(filename string, summaries []syncSummary) error {
27+
// if filename is not specified then we don't need to persist the summary and can return
28+
if filename == "" {
29+
return nil
30+
}
31+
err := checkFilePath(filename)
32+
if err != nil {
33+
return fmt.Errorf("failed to validate summary file path: %w", err)
34+
}
35+
for _, summary := range summaries {
36+
dataBytes, err := json.Marshal(summary)
37+
if err != nil {
38+
return err
39+
}
40+
dataBytes = append(dataBytes, []byte("\n")...)
41+
err = appendToFile(filename, dataBytes)
42+
if err != nil {
43+
return fmt.Errorf("failed to append summary to file: %w", err)
44+
}
45+
}
46+
return nil
47+
}
48+
49+
func appendToFile(fileName string, data []byte) error {
50+
f, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
51+
if err != nil {
52+
return err
53+
}
54+
if _, err := f.Write(data); err != nil {
55+
f.Close()
56+
return err
57+
}
58+
return f.Close()
59+
}
60+
61+
func checkFilePath(filename string) error {
62+
dirPath := filepath.Dir(filename)
63+
return os.MkdirAll(dirPath, 0755)
64+
}

cli/cmd/sync.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func NewCmdSync() *cobra.Command {
3535
}
3636
cmd.Flags().Bool("no-migrate", false, "Disable auto-migration before sync. By default, sync runs a migration before syncing resources.")
3737
cmd.Flags().String("license", "", "set offline license file")
38+
cmd.Flags().String("summary-location", "", "Sync summary file location. This feature is in Preview. Please provide feedback to help us improve it.")
3839

3940
return cmd
4041
}
@@ -258,9 +259,16 @@ func sync(cmd *cobra.Command, args []string) error {
258259
spec: *destinationForSourceBackendSpec,
259260
}
260261
}
261-
if err := syncConnectionV3(ctx, src, dests, backend, invocationUUID.String(), noMigrate); err != nil {
262+
263+
summaryLocation, err := cmd.Flags().GetString("summary-location")
264+
if err != nil {
265+
return err
266+
}
267+
268+
if err := syncConnectionV3(ctx, src, dests, backend, invocationUUID.String(), noMigrate, summaryLocation); err != nil {
262269
return fmt.Errorf("failed to sync v3 source %s: %w", cl.Name(), err)
263270
}
271+
264272
case 2:
265273
destinationsVersions := make([][]int, 0, len(destinationClientsForSource))
266274
for _, destination := range destinationClientsForSource {

cli/cmd/sync_test.go

Lines changed: 186 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
11
package cmd
22

33
import (
4+
"bytes"
5+
"encoding/json"
46
"os"
57
"path"
68
"runtime"
9+
"slices"
710
"testing"
811

12+
"github.com/google/go-cmp/cmp"
13+
"github.com/google/go-cmp/cmp/cmpopts"
914
"github.com/google/uuid"
1015
"github.com/stretchr/testify/assert"
1116
"github.com/stretchr/testify/require"
1217
)
1318

1419
func TestSync(t *testing.T) {
1520
configs := []struct {
16-
name string
17-
config string
18-
err string
21+
name string
22+
config string
23+
err string
24+
summary []syncSummary
1925
}{
2026
{
2127
name: "sync_success_sourcev1_destv0",
@@ -24,18 +30,159 @@ func TestSync(t *testing.T) {
2430
{
2531
name: "multiple_sources",
2632
config: "multiple-sources.yml",
33+
summary: []syncSummary{
34+
{
35+
CliVersion: "development",
36+
DestinationErrors: 0,
37+
DestinationName: "test",
38+
DestinationPath: "cloudquery/test",
39+
DestinationVersion: "v2.2.14",
40+
Resources: 12,
41+
SourceName: "test",
42+
SourcePath: "cloudquery/test",
43+
SourceVersion: "v3.1.15",
44+
},
45+
{
46+
CliVersion: "development",
47+
DestinationErrors: 0,
48+
DestinationName: "test",
49+
DestinationPath: "cloudquery/test",
50+
DestinationVersion: "v2.2.14",
51+
Resources: 12,
52+
SourceName: "test2",
53+
SourcePath: "cloudquery/test",
54+
SourceVersion: "v3.1.15",
55+
},
56+
},
2757
},
2858
{
2959
name: "multiple_destinations",
3060
config: "multiple-destinations.yml",
61+
summary: []syncSummary{
62+
63+
{
64+
CliVersion: "development",
65+
DestinationName: "test",
66+
DestinationPath: "cloudquery/test",
67+
DestinationVersion: "v2.2.14",
68+
Resources: 12,
69+
SourceName: "test",
70+
SourcePath: "cloudquery/test",
71+
SourceVersion: "v3.1.15",
72+
},
73+
{
74+
CliVersion: "development",
75+
DestinationName: "test",
76+
DestinationPath: "cloudquery/test",
77+
DestinationVersion: "v2.2.14",
78+
Resources: 12,
79+
SourceName: "test2",
80+
SourcePath: "cloudquery/test",
81+
SourceVersion: "v3.1.15",
82+
},
83+
},
3184
},
3285
{
3386
name: "multiple_sources_destinations",
3487
config: "multiple-sources-destinations.yml",
88+
summary: []syncSummary{
89+
{
90+
CliVersion: "development",
91+
DestinationName: "test",
92+
DestinationPath: "cloudquery/test",
93+
DestinationVersion: "v2.2.14",
94+
Resources: 12,
95+
SourceName: "test",
96+
SourcePath: "cloudquery/test",
97+
SourceVersion: "v3.1.15",
98+
},
99+
{
100+
CliVersion: "development",
101+
DestinationName: "test",
102+
DestinationPath: "cloudquery/test",
103+
DestinationVersion: "v2.2.14",
104+
Resources: 12,
105+
SourceName: "test2",
106+
SourcePath: "cloudquery/test",
107+
SourceVersion: "v3.1.15",
108+
},
109+
{
110+
CliVersion: "development",
111+
DestinationName: "test-1",
112+
DestinationPath: "cloudquery/test",
113+
DestinationVersion: "v2.2.14",
114+
Resources: 12,
115+
SourceName: "test-1",
116+
SourcePath: "cloudquery/test",
117+
SourceVersion: "v3.1.15",
118+
},
119+
{
120+
CliVersion: "development",
121+
DestinationName: "test-2",
122+
DestinationPath: "cloudquery/test",
123+
DestinationVersion: "v2.2.14",
124+
Resources: 12,
125+
SourceName: "test-2",
126+
SourcePath: "cloudquery/test",
127+
SourceVersion: "v3.1.15",
128+
},
129+
},
35130
},
36131
{
37132
name: "different_backend_from_destination",
38133
config: "different-backend-from-destination.yml",
134+
summary: []syncSummary{
135+
{
136+
CliVersion: "development",
137+
DestinationName: "test",
138+
DestinationPath: "cloudquery/test",
139+
DestinationVersion: "v2.2.14",
140+
Resources: 12,
141+
SourceName: "test",
142+
SourcePath: "cloudquery/test",
143+
SourceVersion: "v3.1.15",
144+
},
145+
{
146+
CliVersion: "development",
147+
DestinationName: "test",
148+
DestinationPath: "cloudquery/test",
149+
DestinationVersion: "v2.2.14",
150+
Resources: 12,
151+
SourceName: "test2",
152+
SourcePath: "cloudquery/test",
153+
SourceVersion: "v3.1.15",
154+
},
155+
{
156+
CliVersion: "development",
157+
DestinationName: "test-1",
158+
DestinationPath: "cloudquery/test",
159+
DestinationVersion: "v2.2.14",
160+
Resources: 12,
161+
SourceName: "test-1",
162+
SourcePath: "cloudquery/test",
163+
SourceVersion: "v3.1.15",
164+
},
165+
{
166+
CliVersion: "development",
167+
DestinationName: "test-2",
168+
DestinationPath: "cloudquery/test",
169+
DestinationVersion: "v2.2.14",
170+
Resources: 12,
171+
SourceName: "test-2",
172+
SourcePath: "cloudquery/test",
173+
SourceVersion: "v3.1.15",
174+
},
175+
{
176+
CliVersion: "development",
177+
DestinationName: "test1",
178+
DestinationPath: "cloudquery/test",
179+
DestinationVersion: "v2.2.14",
180+
Resources: 12,
181+
SourceName: "test",
182+
SourcePath: "cloudquery/test",
183+
SourceVersion: "v3.1.15",
184+
},
185+
},
39186
},
40187
{
41188
name: "should fail with missing path error when path is missing",
@@ -54,14 +201,31 @@ func TestSync(t *testing.T) {
54201
testConfig := path.Join(currentDir, "testdata", tc.config)
55202
logFileName := path.Join(cqDir, "cloudquery.log")
56203
cmd := NewCmdRoot()
57-
cmd.SetArgs([]string{"sync", testConfig, "--cq-dir", cqDir, "--log-file-name", logFileName})
204+
205+
argList := []string{"sync", testConfig, "--cq-dir", cqDir, "--log-file-name", logFileName}
206+
summaryPath := ""
207+
if len(tc.summary) > 0 {
208+
summaryPath = path.Join(cqDir, "/test/cloudquery-summary.jsonl")
209+
argList = append(argList, "--summary-location", summaryPath)
210+
}
211+
212+
cmd.SetArgs(argList)
58213
err := cmd.Execute()
59214
if tc.err != "" {
60215
assert.Contains(t, err.Error(), tc.err)
61216
} else {
62217
assert.NoError(t, err)
63218
}
64219

220+
if len(tc.summary) > 0 {
221+
summaries := readSummaries(t, summaryPath)
222+
// have to ignore SyncID because it's random
223+
diff := cmp.Diff(tc.summary, summaries, cmpopts.IgnoreFields(syncSummary{}, "SyncID"))
224+
if diff != "" {
225+
t.Errorf("unexpected summaries: %v", diff)
226+
}
227+
}
228+
65229
// check that log was written and contains some lines from the plugin
66230
b, logFileError := os.ReadFile(path.Join(cqDir, "cloudquery.log"))
67231
logContent := string(b)
@@ -165,3 +329,21 @@ func TestSync_IsolatedPluginEnvironmentsInCloud(t *testing.T) {
165329
})
166330
}
167331
}
332+
333+
func readSummaries(t *testing.T, filename string) []syncSummary {
334+
p, err := os.ReadFile(filename)
335+
assert.NoError(t, err)
336+
337+
lines := bytes.Split(p, []byte{'\n'})
338+
summaries := make([]syncSummary, len(lines))
339+
for i, line := range lines {
340+
if len(line) == 0 {
341+
summaries = slices.Delete(summaries, i, i+1)
342+
continue
343+
}
344+
var v syncSummary
345+
assert.NoError(t, json.Unmarshal(line, &v))
346+
summaries[i] = v
347+
}
348+
return summaries
349+
}

cli/cmd/sync_v3.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func getProgressAPIClient() (*cloudquery_api.ClientWithResponses, error) {
5252
}
5353

5454
// nolint:dupl
55-
func syncConnectionV3(ctx context.Context, source v3source, destinations []v3destination, backend *v3destination, uid string, noMigrate bool) error {
55+
func syncConnectionV3(ctx context.Context, source v3source, destinations []v3destination, backend *v3destination, uid string, noMigrate bool, summaryLocation string) error {
5656
var mt metrics.Metrics
5757
var exitReason = ExitReasonStopped
5858
tablesForDeleteStale := make(map[string]bool, 0)
@@ -346,6 +346,7 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
346346
if err != nil {
347347
return err
348348
}
349+
totals := sourceClient.Metrics()
349350

350351
for i := range destinationsClients {
351352
if destinationSpecs[i].WriteMode == specs.WriteModeOverwriteDeleteStale {
@@ -361,11 +362,30 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
361362
}
362363
}
363364

364-
totals := sourceClient.Metrics()
365+
syncSummaries := make([]syncSummary, len(destinationsClients))
365366
for i := range destinationsClients {
366367
m := destinationsClients[i].Metrics()
367368
totals.Warnings += m.Warnings
368369
totals.Errors += m.Errors
370+
syncSummaries[i] = syncSummary{
371+
Resources: uint64(totalResources),
372+
SourceErrors: totals.Errors,
373+
SourceWarnings: totals.Warnings,
374+
SyncID: uid,
375+
SourceName: sourceSpec.Name,
376+
SourceVersion: sourceSpec.Version,
377+
SourcePath: sourceSpec.Path,
378+
CliVersion: Version,
379+
DestinationErrors: m.Errors,
380+
DestinationWarnings: m.Warnings,
381+
DestinationName: destinationSpecs[i].Name,
382+
DestinationVersion: destinationSpecs[i].Version,
383+
DestinationPath: destinationSpecs[i].Path,
384+
}
385+
}
386+
err = persistSummary(summaryLocation, syncSummaries)
387+
if err != nil {
388+
log.Warn().Err(err).Msg("Failed to persist sync summary")
369389
}
370390

371391
err = bar.Finish()

0 commit comments

Comments
 (0)