1212
1313from feast .constants import (
1414 AWS_LAMBDA_FEATURE_SERVER_IMAGE ,
15+ AWS_LAMBDA_FEATURE_SERVER_REPOSITORY ,
1516 FEAST_USAGE ,
1617 FEATURE_STORE_YAML_ENV_NAME ,
1718)
2930from feast .feature_view import FeatureView
3031from feast .flags import FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME
3132from feast .flags_helper import enable_aws_lambda_feature_server
33+ from feast .infra .feature_servers .aws_lambda .config import AwsLambdaFeatureServerConfig
3234from feast .infra .passthrough_provider import PassthroughProvider
3335from feast .infra .utils import aws_utils
3436from feast .protos .feast .core .Registry_pb2 import Registry as RegistryProto
@@ -83,89 +85,108 @@ def update_infra(
8385 registry_store_class .__name__ , S3RegistryStore .__name__
8486 )
8587
86- image_uri = self ._upload_docker_image (project )
87- _logger .info ("Deploying feature server..." )
88+ ecr_client = boto3 .client ("ecr" )
89+ repository_uri = self ._create_or_get_repository_uri (ecr_client )
90+ version = _get_version_for_aws ()
91+ # Only download & upload the docker image if it doesn't already exist in ECR
92+ if not ecr_client .batch_get_image (
93+ repositoryName = AWS_LAMBDA_FEATURE_SERVER_REPOSITORY ,
94+ imageIds = [{"imageTag" : version }],
95+ ).get ("images" ):
96+ image_uri = self ._upload_docker_image (
97+ ecr_client , repository_uri , version
98+ )
99+ else :
100+ image_uri = f"{ repository_uri } :{ version } "
88101
89- if not self .repo_config .repo_path :
90- raise RepoConfigPathDoesNotExist ()
91- with open (self .repo_config .repo_path / "feature_store.yaml" , "rb" ) as f :
92- config_bytes = f .read ()
93- config_base64 = base64 .b64encode (config_bytes ).decode ()
102+ self ._deploy_feature_server (project , image_uri )
94103
95- resource_name = self ._get_lambda_name (project )
96- lambda_client = boto3 .client ("lambda" )
97- api_gateway_client = boto3 .client ("apigatewayv2" )
98- function = aws_utils .get_lambda_function (lambda_client , resource_name )
104+ def _deploy_feature_server (self , project : str , image_uri : str ):
105+ _logger .info ("Deploying feature server..." )
106+
107+ if not self .repo_config .repo_path :
108+ raise RepoConfigPathDoesNotExist ()
109+ with open (self .repo_config .repo_path / "feature_store.yaml" , "rb" ) as f :
110+ config_bytes = f .read ()
111+ config_base64 = base64 .b64encode (config_bytes ).decode ()
99112
100- if function is None :
101- # If the Lambda function does not exist, create it.
102- _logger .info (" Creating AWS Lambda..." )
103- lambda_client .create_function (
113+ resource_name = _get_lambda_name (project )
114+ lambda_client = boto3 .client ("lambda" )
115+ api_gateway_client = boto3 .client ("apigatewayv2" )
116+ function = aws_utils .get_lambda_function (lambda_client , resource_name )
117+
118+ if function is None :
119+ # If the Lambda function does not exist, create it.
120+ _logger .info (" Creating AWS Lambda..." )
121+ assert isinstance (
122+ self .repo_config .feature_server , AwsLambdaFeatureServerConfig
123+ )
124+ lambda_client .create_function (
125+ FunctionName = resource_name ,
126+ Role = self .repo_config .feature_server .execution_role_name ,
127+ Code = {"ImageUri" : image_uri },
128+ PackageType = "Image" ,
129+ MemorySize = 1769 ,
130+ Environment = {
131+ "Variables" : {
132+ FEATURE_STORE_YAML_ENV_NAME : config_base64 ,
133+ FEAST_USAGE : "False" ,
134+ }
135+ },
136+ Tags = {
137+ "feast-owned" : "True" ,
138+ "project" : project ,
139+ "feast-sdk-version" : get_version (),
140+ },
141+ )
142+ function = aws_utils .get_lambda_function (lambda_client , resource_name )
143+ if not function :
144+ raise AwsLambdaDoesNotExist (resource_name )
145+ else :
146+ # If the feature_store.yaml has changed, need to update the environment variable.
147+ env = function .get ("Environment" , {}).get ("Variables" , {})
148+ if env .get (FEATURE_STORE_YAML_ENV_NAME ) != config_base64 :
149+ # Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
150+ # It's expected that feature_store.yaml is not regularly updated while the lambda
151+ # is serving production traffic. However, the update in registry (e.g. modifying
152+ # feature views, feature services, and other definitions does not update lambda).
153+ _logger .info (" Updating AWS Lambda..." )
154+
155+ lambda_client .update_function_configuration (
104156 FunctionName = resource_name ,
105- Role = self .repo_config .feature_server .execution_role_name ,
106- Code = {"ImageUri" : image_uri },
107- PackageType = "Image" ,
108- MemorySize = 1769 ,
109157 Environment = {
110- "Variables" : {
111- FEATURE_STORE_YAML_ENV_NAME : config_base64 ,
112- FEAST_USAGE : "False" ,
113- }
114- },
115- Tags = {
116- "feast-owned" : "True" ,
117- "project" : project ,
118- "feast-sdk-version" : get_version (),
158+ "Variables" : {FEATURE_STORE_YAML_ENV_NAME : config_base64 }
119159 },
120160 )
121- function = aws_utils .get_lambda_function (lambda_client , resource_name )
122- if not function :
123- raise AwsLambdaDoesNotExist (resource_name )
124- else :
125- # If the feature_store.yaml has changed, need to update the environment variable.
126- env = function .get ("Environment" , {}).get ("Variables" , {})
127- if env .get (FEATURE_STORE_YAML_ENV_NAME ) != config_base64 :
128- # Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
129- # It's expected that feature_store.yaml is not regularly updated while the lambda
130- # is serving production traffic. However, the update in registry (e.g. modifying
131- # feature views, feature services, and other definitions does not update lambda).
132- _logger .info (" Updating AWS Lambda..." )
133-
134- lambda_client .update_function_configuration (
135- FunctionName = resource_name ,
136- Environment = {
137- "Variables" : {FEATURE_STORE_YAML_ENV_NAME : config_base64 }
138- },
139- )
140161
141- api = aws_utils .get_first_api_gateway (api_gateway_client , resource_name )
162+ api = aws_utils .get_first_api_gateway (api_gateway_client , resource_name )
163+ if not api :
164+ # If the API Gateway doesn't exist, create it
165+ _logger .info (" Creating AWS API Gateway..." )
166+ api = api_gateway_client .create_api (
167+ Name = resource_name ,
168+ ProtocolType = "HTTP" ,
169+ Target = function ["FunctionArn" ],
170+ RouteKey = "POST /get-online-features" ,
171+ Tags = {
172+ "feast-owned" : "True" ,
173+ "project" : project ,
174+ "feast-sdk-version" : get_version (),
175+ },
176+ )
142177 if not api :
143- # If the API Gateway doesn't exist, create it
144- _logger .info (" Creating AWS API Gateway..." )
145- api = api_gateway_client .create_api (
146- Name = resource_name ,
147- ProtocolType = "HTTP" ,
148- Target = function ["FunctionArn" ],
149- RouteKey = "POST /get-online-features" ,
150- Tags = {
151- "feast-owned" : "True" ,
152- "project" : project ,
153- "feast-sdk-version" : get_version (),
154- },
155- )
156- if not api :
157- raise AwsAPIGatewayDoesNotExist (resource_name )
158- # Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
159- api_id = api ["ApiId" ]
160- region = lambda_client .meta .region_name
161- account_id = aws_utils .get_account_id ()
162- lambda_client .add_permission (
163- FunctionName = function ["FunctionArn" ],
164- StatementId = str (uuid .uuid4 ()),
165- Action = "lambda:InvokeFunction" ,
166- Principal = "apigateway.amazonaws.com" ,
167- SourceArn = f"arn:aws:execute-api:{ region } :{ account_id } :{ api_id } /*/*/get-online-features" ,
168- )
178+ raise AwsAPIGatewayDoesNotExist (resource_name )
179+ # Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
180+ api_id = api ["ApiId" ]
181+ region = lambda_client .meta .region_name
182+ account_id = aws_utils .get_account_id ()
183+ lambda_client .add_permission (
184+ FunctionName = function ["FunctionArn" ],
185+ StatementId = str (uuid .uuid4 ()),
186+ Action = "lambda:InvokeFunction" ,
187+ Principal = "apigateway.amazonaws.com" ,
188+ SourceArn = f"arn:aws:execute-api:{ region } :{ account_id } :{ api_id } /*/*/get-online-features" ,
189+ )
169190
170191 def teardown_infra (
171192 self ,
@@ -180,7 +201,7 @@ def teardown_infra(
180201 and self .repo_config .feature_server .enabled
181202 ):
182203 _logger .info ("Tearing down feature server..." )
183- resource_name = self . _get_lambda_name (project )
204+ resource_name = _get_lambda_name (project )
184205 lambda_client = boto3 .client ("lambda" )
185206 api_gateway_client = boto3 .client ("apigatewayv2" )
186207
@@ -197,7 +218,7 @@ def teardown_infra(
197218
198219 def get_feature_server_endpoint (self ) -> Optional [str ]:
199220 project = self .repo_config .project
200- resource_name = self . _get_lambda_name (project )
221+ resource_name = _get_lambda_name (project )
201222 api_gateway_client = boto3 .client ("apigatewayv2" )
202223 api = aws_utils .get_first_api_gateway (api_gateway_client , resource_name )
203224
@@ -209,25 +230,15 @@ def get_feature_server_endpoint(self) -> Optional[str]:
209230 region = lambda_client .meta .region_name
210231 return f"https://{ api_id } .execute-api.{ region } .amazonaws.com"
211232
212- def _upload_docker_image (self , project : str ) -> str :
233+ def _upload_docker_image (
234+ self , ecr_client , repository_uri : str , version : str
235+ ) -> str :
213236 """
214237 Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR.
215238
216- Args:
217- project: Feast project name
218-
219239 Returns:
220240 The URI of the uploaded docker image.
221241 """
222- import base64
223-
224- try :
225- import boto3
226- except ImportError as e :
227- from feast .errors import FeastExtrasDependencyImportError
228-
229- raise FeastExtrasDependencyImportError ("aws" , str (e ))
230-
231242 try :
232243 import docker
233244 from docker .errors import APIError
@@ -244,52 +255,72 @@ def _upload_docker_image(self, project: str) -> str:
244255 raise DockerDaemonNotRunning ()
245256
246257 _logger .info (
247- f"Pulling remote image { Style .BRIGHT + Fore .GREEN } { AWS_LAMBDA_FEATURE_SERVER_IMAGE } { Style .RESET_ALL } : "
258+ f"Pulling remote image { Style .BRIGHT + Fore .GREEN } { AWS_LAMBDA_FEATURE_SERVER_IMAGE } { Style .RESET_ALL } "
248259 )
249- docker_client .images .pull (AWS_LAMBDA_FEATURE_SERVER_IMAGE )
250-
251- version = self ._get_version_for_aws ()
252- repository_name = f"feast-python-server-{ project } -{ version } "
253- ecr_client = boto3 .client ("ecr" )
254- try :
255- _logger .info (
256- f"Creating remote ECR repository { Style .BRIGHT + Fore .GREEN } { repository_name } { Style .RESET_ALL } :"
257- )
258- response = ecr_client .create_repository (repositoryName = repository_name )
259- repository_uri = response ["repository" ]["repositoryUri" ]
260- except ecr_client .exceptions .RepositoryAlreadyExistsException :
261- response = ecr_client .describe_repositories (
262- repositoryNames = [repository_name ]
263- )
264- repository_uri = response ["repositories" ][0 ]["repositoryUri" ]
260+ for line in docker_client .api .pull (
261+ AWS_LAMBDA_FEATURE_SERVER_IMAGE , stream = True , decode = True
262+ ):
263+ _logger .debug (f" { line } " )
265264
266265 auth_token = ecr_client .get_authorization_token ()["authorizationData" ][0 ][
267266 "authorizationToken"
268267 ]
269268 username , password = base64 .b64decode (auth_token ).decode ("utf-8" ).split (":" )
270269
271270 ecr_address = repository_uri .split ("/" )[0 ]
272- docker_client .login (username = username , password = password , registry = ecr_address )
271+ _logger .info (
272+ f"Logging in Docker client to { Style .BRIGHT + Fore .GREEN } { ecr_address } { Style .RESET_ALL } "
273+ )
274+ login_status = docker_client .login (
275+ username = username , password = password , registry = ecr_address
276+ )
277+ _logger .debug (f" { login_status } " )
273278
274279 image = docker_client .images .get (AWS_LAMBDA_FEATURE_SERVER_IMAGE )
275280 image_remote_name = f"{ repository_uri } :{ version } "
276281 _logger .info (
277- f"Pushing local image to remote { Style .BRIGHT + Fore .GREEN } { image_remote_name } { Style .RESET_ALL } : "
282+ f"Pushing local image to remote { Style .BRIGHT + Fore .GREEN } { image_remote_name } { Style .RESET_ALL } "
278283 )
279284 image .tag (image_remote_name )
280- docker_client .api .push (repository_uri , tag = version )
285+ for line in docker_client .api .push (
286+ repository_uri , tag = version , stream = True , decode = True
287+ ):
288+ _logger .debug (f" { line } " )
289+
281290 return image_remote_name
282291
283- def _get_lambda_name (self , project : str ):
284- return f"feast-python-server-{ project } -{ self ._get_version_for_aws ()} "
292+ def _create_or_get_repository_uri (self , ecr_client ):
293+ try :
294+ return ecr_client .describe_repositories (
295+ repositoryNames = [AWS_LAMBDA_FEATURE_SERVER_REPOSITORY ]
296+ )["repositories" ][0 ]["repositoryUri" ]
297+ except ecr_client .exceptions .RepositoryNotFoundException :
298+ _logger .info (
299+ f"Creating remote ECR repository { Style .BRIGHT + Fore .GREEN } { AWS_LAMBDA_FEATURE_SERVER_REPOSITORY } { Style .RESET_ALL } "
300+ )
301+ response = ecr_client .create_repository (
302+ repositoryName = AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
303+ )
304+ return response ["repository" ]["repositoryUri" ]
305+
285306
286- @staticmethod
287- def _get_version_for_aws ():
288- """Returns Feast version with certain characters replaced.
307+ def _get_lambda_name (project : str ):
308+ lambda_prefix = AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
309+ lambda_suffix = f"{ project } -{ _get_version_for_aws ()} "
310+ # AWS Lambda name can't have the length greater than 64 bytes.
311+ # This usually occurs during integration tests or when feast is
312+ # installed in editable mode (pip install -e), where feast version is long
313+ if len (lambda_prefix ) + len (lambda_suffix ) >= 63 :
314+ lambda_suffix = base64 .b64encode (lambda_suffix .encode ()).decode ()[:40 ]
315+ return f"{ lambda_prefix } -{ lambda_suffix } "
289316
290- This allows the version to be included in names for AWS resources.
291- """
292- return get_version ().replace ("." , "_" ).replace ("+" , "_" )
317+
318+ def _get_version_for_aws ():
319+ """Returns Feast version with certain characters replaced.
320+
321+ This allows the version to be included in names for AWS resources.
322+ """
323+ return get_version ().replace ("." , "_" ).replace ("+" , "_" )
293324
294325
295326class S3RegistryStore (RegistryStore ):
0 commit comments