- åæ¸ã
- 注æç¹
- åèãªã³ã¯
- ç°å¢
- å ¨ä½å
- 0. äºåæºå
- 1. CDKã®ããã¸ã§ã¯ãä½æ
- 2. S3
- 3. DynamoDB
- 4. IAM Role
- 5. Amazon OpenSearch Service
- 6. IAM Policy
- 7. Pipeline
- ãããã¤ã¨ãã¹ã
- åé¤
- ã¾ã¨ã
- CDKã®å®ç¾©å ¨ä½
åæ¸ã
æ¨å¹´æ«ã«å ¬éããããDynamoDBã¨OpenSearch Serviceã®zero-ETL integrationã AWS CDK ã§æ§ç¯ããä¾ã§ãã
æ¬è¨äºã¯ããã¡ãã®CLIæé ã®CDKãã¼ã¸ã§ã³ã§ãã
CLIã¨ã»ã¼åãã§ãããCDKã§ã¯ãªã½ã¼ã¹éã«ä¾åé¢ä¿ãããå ´åã«ã¯addDependencyã¡ã½ããã使ã£ã¦æ示ãã¦ãããå¿ è¦ãããã¾ããä½æãããªã½ã¼ã¹ã®IDãå ã«policyããã£ããç¸ãããæã«ã¯ç¹ã«éè¦ã§ãã ã¾ããpipelineã®å¦çå®ç¾©(yml)é¨åã¯ç¾å¨æååã§æ¸¡ããã¨ã«ãªã£ã¦ãã¾ããCLIã¨TypeScriptã§ã¯åãè¾¼ã¿æ¹ãç°ãªãã®ã§ããã®é¨åã«ã注æãã¦ãã ããã
â»å ã«CLIçã®æé ã試ãã¦ã¿ããã¨ããå§ããã¾ããCLIçã®ãªã½ã¼ã¹éã®é¢ä¿ãç解ãã¦ããã°ãä¾åé¢ä¿ã®ç解ã¯é£ãããªãã¨æãã¾ãã
注æç¹
ãªã½ã¼ã¹ã¯èª²éããã¾ãã ãã¹ããçµãã£ããåé¤ãã¦ããã¾ãããã以ä¸ã®æé ãå®è¡ãã¦çºçããåé¡ã«ã¤ãã¦ãçè ã¯ä¸åã®è²¬ä»»ãåããã¨ãã§ãã¾ãããèªå·±è²¬ä»»ã§ãé¡ããã¾ãã
åèãªã³ã¯
AWSå ¬å¼ã®ç´¹ä»è¨äºã§ãã
Amazon DynamoDB の Amazon OpenSearch Service とのゼロ ETL 統合が利用可能になりました | Amazon Web Services ブログ
å ¬å¼ãã¥ã¼ããªã¢ã«ã§ãããã¡ãã¯GUIãã¼ã¹ã§ããæ¬è¨äºã§ã¯ãä¸ã®collection(serverless)çã®ãªã½ã¼ã¹ããAWS CLIã使ã£ã¦æ§ç¯ãã¾ãã
Tutorial: Ingesting data into a domain using Amazon OpenSearch Ingestion - Amazon OpenSearch Service
DynamoDB zero-ETL integration with Amazon OpenSearch Service - Amazon DynamoDB
ç°å¢
ãã¼ã¸ã§ã³ | |
---|---|
MacOS Sonoma | 14.4.1 |
AWS CLI | 2.15.34 |
AWS CDK | 2.145.0 |
awscurl | 0.33 |
å ¨ä½å
以ä¸ã®ãªã½ã¼ã¹ãä½ãã¾ãã
- Amazon S3
- Amazon DynamoDBã®Table
- Amazon OpenSearch Service ã® collection
- 3種ã®ããªã·ã¼
- Data access policies
- Encryption policies
- Network policies
- 3種ã®ããªã·ã¼
- Pipeline
- IAM Role
- IAM Policy : OpenSearchã¨DynamoDBã¸ã®ã¢ã¯ã»ã¹æ¨©
Pipelineã¯ä»¥ä¸ã®åãããã¾ãã
- DynamoDBã®ç£è¦(ãã¼ã¿ãæå ¥ããããã¨ãæç¥)
- OpenSearchã¸ã®ãã¼ã¿æå ¥/åé¤/æ´æ°
- S3ã¸ããã¯ã¢ãããªã©ãã¢ãããã¼ã
ã¾ããAmazon OpenSearch Serviceã¯ãªã½ã¼ã¹ãã¼ã¹ã®ããªã·ã¼ãæã¡ã¾ããIAM Roleã§Amazon OpenSearch Serviceã¸ã®ã¢ã¯ã»ã¹ã許å¯ããã ãã§ã¯ãã¡ã§ãAmazon OpenSearch Serviceã® Data access policies å´ã§ãIAM Roleã«å¯¾ãã¦è¨±å¯ãåºãå¿ è¦ãããã¾ã(å¾è¿°)ã
0. äºåæºå
AWS CDKãå©ç¨ã§ããããã«ãã¦ããã¾ãã ã¾ããCDKãå®è¡ããã¦ã¼ã¶ã«å¿ è¦ãªæ¨©éãã¤ãã¦ããã¾ãã
1. CDKã®ããã¸ã§ã¯ãä½æ
mkdir zero-etl-dynamodb-aoss cd $_ cdk init app --language typescript
lib/zero-etl-dynamodb-aoss-stack.ts ã«ã³ã¼ããæ¸ãã¦ããã¾ãã
importé¨åãªã©ã¯æå¾ã«ã¾ã¨ããã³ã¼ããè¨è¼ããã®ã§ããã¡ããåèã«ãã¦ãã ããã
2. S3
cdkãdestroyããæã«åé¤ãããããã«ãremovalPolicy(ãã±ãç ´å£)ã¨autoDeleteObjects(ä¸èº«ç ´å£)ãè¨å®ãã¦ããã¾ãã
const s3bucket = new Bucket(this, 'S3Bucket', { bucketName: 'ingestion-dynamodb', removalPolicy: cdk.RemovalPolicy.DESTROY, autoDeleteObjects: true, });
3. DynamoDB
ãã®DynamoDBã«æå ¥ãããã¼ã¿ãPipelineã«ãã£ã¦OpenSearchã«èªåã§æå ¥ãããäºå®ã§ãã
partitionKeyã¨ãã¦æååã®nameãsortKeyã¨ãã¦æ°å¤ã®ageã使ããã¨ã«ãã¾ãã
両è
ãåãããã¨ä¸æã«ãªãã¾ããOpenSearchã®idã¯name:age
ã¨ããå½¢ã«ãªãæ³å®ã§ãã
pointInTimeRecoveryã¨streamã®è¨å®ã¯å¿ é ã§ã(å³å¯ã«ã¯å¾è ã ãã§ã以éã®å®é¨ã¯ã§ãã¾ã)ããã¡ãã®è¨å®ã«ãã£ã¦ãDynamoDBã®å 容ãpipelineãæ¤ç¥ãã¦OpenSearchã«æå ¥ã§ããããã«ãªãã¾ãã
cdkã§stackãåé¤ããéã«dynamoDBã¯ç ´å£ãããã®ã§ãremovalPolicyãã¤ãã¦ããã¾ãã
ã¾ããreadCapacityã¨writeCapacityãæå°ã®1ã«è¨å®ãã¾ãã(ããªãã¦ãããã§ã)
const table = new Table(this, 'DynamoDBTable', { tableName: 'ingestion-table', partitionKey: {name: 'name', type: AttributeType.STRING}, sortKey: {name: 'age', type: AttributeType.NUMBER}, pointInTimeRecovery: true, stream: StreamViewType.NEW_IMAGE, readCapacity: 1, writeCapacity: 1, removalPolicy: cdk.RemovalPolicy.DESTROY });
4. IAM Role
Pipelineã®Roleãå
ã«ä½ã£ã¦ããã¾ããOpenSearchã®Data Access Policyã«å¯¾ãã¦ããã®Roleã®Arnãæå®ãã¦ã¢ã¯ã»ã¹è¨±å¯ãåºãããããã§ãã
Pipelineç¨ã®Roleãªã®ã§ãtrust policyã®å¯¾è±¡ãosis-pipelines.amazonaws.com
ã«ãã¦ããã¾ãã
const pipelineRole = new Role(this, 'pipelineRole', { roleName: 'PipelineRole', assumedBy: new ServicePrincipal('osis-pipelines.amazonaws.com') });
5. Amazon OpenSearch Service
3種ã®ããªã·ã¼ã¨ãcollectionæ¬ä½ãä½æãã¾ãã ã¾ãã2024å¹´6æ18æ¥ç¾å¨ãL2ã³ã³ã¹ãã©ã¯ã¿ãç¡ããããªã®ã§ãL1(Cfnãé ã«ã¤ãã¦ããã¯ã©ã¹)ã使ã£ã¦ãã¾ãã
Data Access Policy
ãªã½ã¼ã¹ããªã·ã¼ã§ããå ã»ã©ä½ã£ãRoleã«å ããCLIã®ã¦ã¼ã¶ãprincipalã«å ¥ãã¦ããã¨è¯ãã§ã(ãã¹ãæã«CLIããOpenSearchã«ãªã¯ã¨ã¹ãã§ãã¾ã)ã
CLIã¦ã¼ã¶åãããããªããã°ãCLI㧠aws sts get-caller-identity
ãå®è¡ããã¨è¯ãã§ãã
const cliUser = User.fromUserName(this, 'existingUser', 'local-cli-user'); const dataAccessPolicy = new CfnAccessPolicy(this, "OpenSearchAccessPolicy", { name: "cdk-access-policy", type: "data", policy: JSON.stringify([ { "Rules": [ { "Resource": [`index/${collection.name}/*`], "Permission": [ "aoss:CreateIndex", "aoss:UpdateIndex", "aoss:DescribeIndex", "aoss:ReadDocument", "aoss:WriteDocument" ], "ResourceType": "index" } ], "Principal": [ cliUser.userArn, pipelineRole.roleArn] } ]), });
Encryption Policy
collectionã®ãã¼ã¿æå·åã®è¨å®ã®ãããEncryption Policyãå®ç¾©ãã¾ããResourceé ç®ã§å ã»ã©å®ç¾©ããcollection.nameãå©ç¨ãã¦ãã¾ãã
const encryptionPolicy = new CfnSecurityPolicy(this, 'OpenSearchEncryptionPolicy', { name: 'encryption-policy', type: 'encryption', policy: JSON.stringify({ "Rules": [ { "ResourceType": "collection", "Resource": [`collection/${collection.name}`] } ], "AWSOwnedKey": true }), });
encryption policyã¯ãcollectionä½æåã«åå¨ãã¦ããå¿ è¦ãããã¾ãã ãã®ãããaddDependencyã¡ã½ãããå®ç¾©ãã¦ä¾åé¢ä¿ãæ示ãã¦ããã¾ãããã
// NOTE Collectionã¯encryptionPolicyã«ä¾åãã¦ããã
collection.addDependency(encryptionPolicy)
Network Policy
å®é¨ãããããããpublicã¢ã¯ã»ã¹ãã§ããããã«ãã¦ããã¾ãã
const networkPolicy = new CfnSecurityPolicy(this, 'OpenSearchNetworkPolicy', { name: "network-policy", type: "network", policy: JSON.stringify([ { "Rules": [ { "ResourceType": "dashboard", "Resource": [ `collection/${collection.name}`] }, { "ResourceType": "collection", "Resource": [ `collection/${collection.name}`] }, ], "AllowFromPublic": true, } ]) });
6. IAM Policy
pipelineã®Roleã«ä»ä¸ããPolicyãä½æãã¾ãã(ä¸è¨ã§ä½æããcollectionã®idãå¿ è¦ãªã®ã§ãpolicyã¯ãã®ã¿ã¤ãã³ã°ã¾ã§ä½ããã«ãã¾ããã)
å ¬å¼tutorialã«å¾ã£ã¦ããå³ããã«è¨å®ãã¦ãã¾ãããå®é¨ç¨ãªããã£ã¨ã©ãã§ãè¯ãæ°ããã¾ãã
const pipelinePolicy = new Policy(this, 'pipelinePolicy', { policyName: 'pipelinePolicy', statements: [ new PolicyStatement({ effect: Effect.ALLOW, actions: [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime" ], resources: [`${table.tableArn}`] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "dynamodb:DescribeExport" ], resources: [`${table.tableArn}/export/*`] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator" ], resources: [`${table.tableArn}/stream/*`] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl" ], resources: [ `${s3bucket.bucketArn}/*` ] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "aoss:BatchGetCollection", "aoss:APIAccessAll" ], resources: [ `${collection.attrArn}` ] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "aoss:CreateSecurityPolicy", "aoss:GetSecurityPolicy", "aoss:UpdateSecurityPolicy" ], resources: ['*'], conditions: { StringEquals: { "aoss:collection": collection.name } } }), ] });
(éè¦) policyã¯Roleã«ã¢ã¿ãããã¦ããã¾ã
pipelinePolicy.attachToRole(pipelineRole)
7. Pipeline
pipelineãå®ç¾©ãã¾ãã
ETLå¦çã®å®ç¾©(pipelineConfigurationBody)ã¯ãæ®å¿µãªããæååã¨ãã¦å®ç¾©ããããç¡ãããã§ãã ãã£ããTypeScriptãªã®ã«... L2ã®ã³ã³ã¹ãã©ã¯ã¿ãåºãã®ãå¾ ã¡ã¾ãããã
éå½¢ã¨ãªãæååã欲ãããã°ã以ä¸ã§åå¾ãã¦ãã ããã詳ããã¯ãèªåãæ¸ããCLIçã®è¨äºã«è¨è¼ãã¦ããã¾ãã
aws osis get-pipeline-blueprint --blueprint-name AWS-DynamoDBChangeDataCapturePipeline --query Blueprint.PipelineConfigurationBody --output text > blueprint.yml
ã³ã¼ãã¯ä»¥ä¸ã®ããã«ãªãã¾ãã
CLIçã¨æ¯ã¹ã¦indexã¾ããã®ã¨ã¹ã±ã¼ãã®æ¸ãæ¹ãå¤æ´ãã¦ãã¾ãã yamlããã®ããã«æ¸ãã®ã¯ãã¹ãçºçããããå³ããã®ã§ãobjectã§æ¸ããå¾ã«å¤æãããªã©ããæ¹ãè¯ãããã§ã(æªæ¤è¨)
const pipelineConfiguration = ` version: "2" dynamodb-pipeline: source: dynamodb: acknowledgments: true tables: - table_arn: "${table.tableArn}" stream: start_position: "LATEST" export: s3_bucket: "${s3bucket.bucketName}" s3_region: "${this.region}" s3_prefix: "opensearch-export/" aws: sts_role_arn: "${pipelineRole.roleArn}" region: "${this.region}" sink: - opensearch: hosts: - "${collection.attrCollectionEndpoint}" index: '\${getMetadata("table_name")}' index_type: "custom" normalize_index: true document_id: '\${getMetadata("primary_key")}' action: '\${getMetadata("opensearch_action")}' document_version: '\${getMetadata("document_version")}' document_version_type: "external" aws: sts_role_arn: "${pipelineRole.roleArn}" region: "${this.region}" serverless: true dlq: s3: bucket: "${s3bucket.bucketName}" key_path_prefix: "dynamodb-pipeline/dlq" region: "${this.region}" sts_role_arn: "${pipelineRole.roleArn}" `; const pipeline = new CfnPipeline(this, "pipeline", { pipelineConfigurationBody: pipelineConfiguration, pipelineName: 'serverless-ingestion', minUnits: 1, maxUnits: 2, });
ãããã¤ã¨ãã¹ã
ãããã¤ãã¾ãã確èªç»é¢ãåºãããyãæ¼ãã¦é²ã¿ã¾ãããã
cdk deploy
å®äºããããDynamoDBã«ãã¼ã¿ãæå ¥ãã¦ã¿ã¾ãã
TABLE_NAME=ingestion-table aws dynamodb put-item \ --table-name $TABLE_NAME \ --item '{"name": {"S": "saki"}, "age": {"N": "16"}, "height": {"N": "152"}}' aws dynamodb put-item \ --table-name $TABLE_NAME \ --item '{"name": {"S": "temari"}, "age": {"N": "15"}, "height": {"N": "162"}}' aws dynamodb put-item \ --table-name $TABLE_NAME \ --item '{"name": {"S": "kotone"}, "age": {"N": "15"}, "height": {"N": "156"}}'
OpenSearchã確èªãã¾ããåæ ãããã¾ã§ãå°ãæéããããããããã¾ããã
export AWS_DEFAULT_REGION='ap-northeast-1' COLLECTION_NAME=ingestion-collection HOST=$(aws opensearchserverless batch-get-collection --names $COLLECTION_NAME --query 'collectionDetails[].collectionEndpoint' --output text) && echo $HOST awscurl --service aoss --region $AWS_DEFAULT_REGION -X GET ${HOST}/_cat/indices awscurl --service aoss --region $AWS_DEFAULT_REGION -X GET ${HOST}/${TABLE_NAME}/_search | jq .
DynamoDBã«å°å ¥ãããã¼ã¿ããOpenSearchã«åæ ããã¦ããã°æåã§ãï¼
åé¤
以ä¸ã®ã³ãã³ãã§åé¤ãã¾ããS3ãDynamoDBããCDKã®ã¹ã¿ãã¯ã¨ã¨ãã«åé¤ãããè¨å®ã«ãªã£ã¦ããã®ã§åé¤ãããã¯ãã§ããä»åã¯ãã°ãè¨å®ãã¦ãªãã®ã§ãä½ãæ®ããªãã¨æãã¾ãã(ä½ããªã½ã¼ã¹ãæ®ã£ã¦ãããæãã¦ãã ãã)
cdk destroy
ã¾ã¨ã
Amazon Dynamodbã¨Amazon OpenSearch Serviceã®zero-ETL integrationãAWS CDKã§æ§ç¯ããæé ãã¾ã¨ãã¾ããã
pipelineã®yamlãã¾ã æååã§ããæå®ã§ããªãã®ãå°ãä¸ä¾¿ã§ããã L2ã®ã³ã³ã¹ãã©ã¯ã¿ãåºãã°ãobjectã§æå®ã§ãã¦TypeScriptã®åã®è£å©ãåããããã®ããªã¨æã£ã¦ãã¾ãã
CDKã®å®ç¾©å ¨ä½
以ä¸ã¯ã³ã¼ãå ¨ä½ã§ãã
import * as cdk from 'aws-cdk-lib'; import { AttributeType, StreamViewType, Table } from 'aws-cdk-lib/aws-dynamodb'; import { Effect, Policy, PolicyStatement, Role, ServicePrincipal, User } from 'aws-cdk-lib/aws-iam'; import { CfnAccessPolicy, CfnCollection, CfnSecurityPolicy } from 'aws-cdk-lib/aws-opensearchserverless'; import { CfnPipeline } from 'aws-cdk-lib/aws-osis'; import { Bucket } from 'aws-cdk-lib/aws-s3'; import { Construct } from 'constructs'; export class ZeroEtlDynamodbAossStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const s3bucket = new Bucket(this, 'S3Bucket', { bucketName: 'ingestion-dynamodb', removalPolicy: cdk.RemovalPolicy.DESTROY, autoDeleteObjects: true, }); const table = new Table(this, 'DynamoDBTable', { tableName: 'ingestion-table', partitionKey: {name: 'name', type: AttributeType.STRING}, sortKey: {name: 'age', type: AttributeType.NUMBER}, readCapacity: 1, writeCapacity: 1, pointInTimeRecovery: true, stream: StreamViewType.NEW_IMAGE, removalPolicy: cdk.RemovalPolicy.DESTROY }); // Pieplineç¨ã®Role const pipelineRole = new Role(this, 'pipelineRole', { roleName: 'PipelineRole', assumedBy: new ServicePrincipal('osis-pipelines.amazonaws.com'), }); // AOSS Collection const collection = new CfnCollection(this, "OpenSearchCollection", { name: "ingestion-collection", type: "SEARCH", standbyReplicas: "DISABLED", }); // AOSS Data Access Policy const cliUser = User.fromUserName(this, 'existingUser', 'local-cli-user'); const dataAccessPolicy = new CfnAccessPolicy(this, "OpenSearchAccessPolicy", { name: "access-policy", type: "data", policy: JSON.stringify([ { "Rules": [ { "Resource": [`index/${collection.name}/*`], "Permission": [ "aoss:CreateIndex", "aoss:UpdateIndex", "aoss:DescribeIndex", "aoss:ReadDocument", "aoss:WriteDocument" ], "ResourceType": "index" } ], "Principal": [ cliUser.userArn, pipelineRole.roleArn] } ]), }); // AOSS Encryption Policy const encryptionPolicy = new CfnSecurityPolicy(this, 'OpenSearchEncryptionPolicy', { name: 'encryption-policy', type: 'encryption', policy: JSON.stringify({ "Rules": [ { "ResourceType": "collection", "Resource": [`collection/${collection.name}`] } ], "AWSOwnedKey": true }), }); // NOTE Collectionã¯encryptionPolicyã«ä¾åãã¦ããã collection.addDependency(encryptionPolicy) // AOSS Network Policy const networkPolicy = new CfnSecurityPolicy(this, 'OpenSearchNetworkPolicy', { name: "network-policy", type: "network", policy: JSON.stringify([ { "Rules": [ { "ResourceType": "dashboard", "Resource": [ `collection/${collection.name}`] }, { "ResourceType": "collection", "Resource": [ `collection/${collection.name}`] }, ], "AllowFromPublic": true, } ]) }); const pipelinePolicy = new Policy(this, 'pipelinePolicy', { policyName: 'pipelinePolicy', statements: [ new PolicyStatement({ effect: Effect.ALLOW, actions: [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime" ], resources: [`${table.tableArn}`] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "dynamodb:DescribeExport" ], resources: [`${table.tableArn}/export/*`] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator" ], resources: [`${table.tableArn}/stream/*`] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl" ], resources: [ `${s3bucket.bucketArn}/*` ] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "aoss:BatchGetCollection", "aoss:APIAccessAll" ], resources: [ `${collection.attrArn}` ] }), new PolicyStatement({ effect: Effect.ALLOW, actions: [ "aoss:CreateSecurityPolicy", "aoss:GetSecurityPolicy", "aoss:UpdateSecurityPolicy" ], resources: ['*'], conditions: { StringEquals: { "aoss:collection": collection.name } } }), ] }); pipelinePolicy.attachToRole(pipelineRole) const pipelineConfiguration = ` version: "2" dynamodb-pipeline: source: dynamodb: acknowledgments: true tables: - table_arn: "${table.tableArn}" stream: start_position: "LATEST" export: s3_bucket: "${s3bucket.bucketName}" s3_region: "${this.region}" s3_prefix: "opensearch-export/" aws: sts_role_arn: "${pipelineRole.roleArn}" region: "${this.region}" sink: - opensearch: hosts: - "${collection.attrCollectionEndpoint}" index: '\${getMetadata("table_name")}' index_type: "custom" normalize_index: true document_id: '\${getMetadata("primary_key")}' action: '\${getMetadata("opensearch_action")}' document_version: '\${getMetadata("document_version")}' document_version_type: "external" aws: sts_role_arn: "${pipelineRole.roleArn}" region: "${this.region}" serverless: true dlq: s3: bucket: "${s3bucket.bucketName}" key_path_prefix: "dynamodb-pipeline/dlq" region: "${this.region}" sts_role_arn: "${pipelineRole.roleArn}" `; const pipeline = new CfnPipeline(this, "pipeline", { pipelineConfigurationBody: pipelineConfiguration, pipelineName: 'serverless-ingestion', minUnits: 1, maxUnits: 2, }); pipeline.node.addDependency(pipelinePolicy); pipeline.node.addDependency(collection); } }