Skip to content

Commit bc4ffa5

Browse files
author
Tsotne Tabidze
authored
Improve aws lambda deployment (logging, idempotency, etc) (#1985)
* Improve aws lambda deployment (logging, idempotency, etc) Signed-off-by: Tsotne Tabidze <[email protected]> * Fix linter error Signed-off-by: Tsotne Tabidze <[email protected]>
1 parent 600d38e commit bc4ffa5

File tree

3 files changed

+159
-117
lines changed

3 files changed

+159
-117
lines changed

sdk/python/feast/cli.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
8484
datefmt="%m/%d/%Y %I:%M:%S %p",
8585
level=level,
8686
)
87+
# Override the logging level for already created loggers (due to loggers being created at the import time)
88+
# Note, that format & datefmt does not need to be set, because by default child loggers don't override them
89+
90+
# Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written.
91+
# So we have to put a type ignore hint for mypy.
92+
for logger_name in logging.root.manager.loggerDict: # type: ignore
93+
if "feast" in logger_name:
94+
logger = logging.getLogger(logger_name)
95+
logger.setLevel(level)
96+
8797
except Exception as e:
8898
raise e
8999
pass

sdk/python/feast/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
MAX_WAIT_INTERVAL: str = "60"
1919

2020
AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server:aws"
21+
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY = "feast-python-server"
2122

2223
# feature_store.yaml environment variable name for remote feature server
2324
FEATURE_STORE_YAML_ENV_NAME: str = "FEATURE_STORE_YAML_BASE64"

sdk/python/feast/infra/aws.py

Lines changed: 148 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from feast.constants import (
1414
AWS_LAMBDA_FEATURE_SERVER_IMAGE,
15+
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
1516
FEAST_USAGE,
1617
FEATURE_STORE_YAML_ENV_NAME,
1718
)
@@ -29,6 +30,7 @@
2930
from feast.feature_view import FeatureView
3031
from feast.flags import FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME
3132
from feast.flags_helper import enable_aws_lambda_feature_server
33+
from feast.infra.feature_servers.aws_lambda.config import AwsLambdaFeatureServerConfig
3234
from feast.infra.passthrough_provider import PassthroughProvider
3335
from feast.infra.utils import aws_utils
3436
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
@@ -83,89 +85,108 @@ def update_infra(
8385
registry_store_class.__name__, S3RegistryStore.__name__
8486
)
8587

86-
image_uri = self._upload_docker_image(project)
87-
_logger.info("Deploying feature server...")
88+
ecr_client = boto3.client("ecr")
89+
repository_uri = self._create_or_get_repository_uri(ecr_client)
90+
version = _get_version_for_aws()
91+
# Only download & upload the docker image if it doesn't already exist in ECR
92+
if not ecr_client.batch_get_image(
93+
repositoryName=AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
94+
imageIds=[{"imageTag": version}],
95+
).get("images"):
96+
image_uri = self._upload_docker_image(
97+
ecr_client, repository_uri, version
98+
)
99+
else:
100+
image_uri = f"{repository_uri}:{version}"
88101

89-
if not self.repo_config.repo_path:
90-
raise RepoConfigPathDoesNotExist()
91-
with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f:
92-
config_bytes = f.read()
93-
config_base64 = base64.b64encode(config_bytes).decode()
102+
self._deploy_feature_server(project, image_uri)
94103

95-
resource_name = self._get_lambda_name(project)
96-
lambda_client = boto3.client("lambda")
97-
api_gateway_client = boto3.client("apigatewayv2")
98-
function = aws_utils.get_lambda_function(lambda_client, resource_name)
104+
def _deploy_feature_server(self, project: str, image_uri: str):
105+
_logger.info("Deploying feature server...")
106+
107+
if not self.repo_config.repo_path:
108+
raise RepoConfigPathDoesNotExist()
109+
with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f:
110+
config_bytes = f.read()
111+
config_base64 = base64.b64encode(config_bytes).decode()
99112

100-
if function is None:
101-
# If the Lambda function does not exist, create it.
102-
_logger.info(" Creating AWS Lambda...")
103-
lambda_client.create_function(
113+
resource_name = _get_lambda_name(project)
114+
lambda_client = boto3.client("lambda")
115+
api_gateway_client = boto3.client("apigatewayv2")
116+
function = aws_utils.get_lambda_function(lambda_client, resource_name)
117+
118+
if function is None:
119+
# If the Lambda function does not exist, create it.
120+
_logger.info(" Creating AWS Lambda...")
121+
assert isinstance(
122+
self.repo_config.feature_server, AwsLambdaFeatureServerConfig
123+
)
124+
lambda_client.create_function(
125+
FunctionName=resource_name,
126+
Role=self.repo_config.feature_server.execution_role_name,
127+
Code={"ImageUri": image_uri},
128+
PackageType="Image",
129+
MemorySize=1769,
130+
Environment={
131+
"Variables": {
132+
FEATURE_STORE_YAML_ENV_NAME: config_base64,
133+
FEAST_USAGE: "False",
134+
}
135+
},
136+
Tags={
137+
"feast-owned": "True",
138+
"project": project,
139+
"feast-sdk-version": get_version(),
140+
},
141+
)
142+
function = aws_utils.get_lambda_function(lambda_client, resource_name)
143+
if not function:
144+
raise AwsLambdaDoesNotExist(resource_name)
145+
else:
146+
# If the feature_store.yaml has changed, need to update the environment variable.
147+
env = function.get("Environment", {}).get("Variables", {})
148+
if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64:
149+
# Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
150+
# It's expected that feature_store.yaml is not regularly updated while the lambda
151+
# is serving production traffic. However, the update in registry (e.g. modifying
152+
# feature views, feature services, and other definitions does not update lambda).
153+
_logger.info(" Updating AWS Lambda...")
154+
155+
lambda_client.update_function_configuration(
104156
FunctionName=resource_name,
105-
Role=self.repo_config.feature_server.execution_role_name,
106-
Code={"ImageUri": image_uri},
107-
PackageType="Image",
108-
MemorySize=1769,
109157
Environment={
110-
"Variables": {
111-
FEATURE_STORE_YAML_ENV_NAME: config_base64,
112-
FEAST_USAGE: "False",
113-
}
114-
},
115-
Tags={
116-
"feast-owned": "True",
117-
"project": project,
118-
"feast-sdk-version": get_version(),
158+
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
119159
},
120160
)
121-
function = aws_utils.get_lambda_function(lambda_client, resource_name)
122-
if not function:
123-
raise AwsLambdaDoesNotExist(resource_name)
124-
else:
125-
# If the feature_store.yaml has changed, need to update the environment variable.
126-
env = function.get("Environment", {}).get("Variables", {})
127-
if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64:
128-
# Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
129-
# It's expected that feature_store.yaml is not regularly updated while the lambda
130-
# is serving production traffic. However, the update in registry (e.g. modifying
131-
# feature views, feature services, and other definitions does not update lambda).
132-
_logger.info(" Updating AWS Lambda...")
133-
134-
lambda_client.update_function_configuration(
135-
FunctionName=resource_name,
136-
Environment={
137-
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
138-
},
139-
)
140161

141-
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
162+
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
163+
if not api:
164+
# If the API Gateway doesn't exist, create it
165+
_logger.info(" Creating AWS API Gateway...")
166+
api = api_gateway_client.create_api(
167+
Name=resource_name,
168+
ProtocolType="HTTP",
169+
Target=function["FunctionArn"],
170+
RouteKey="POST /get-online-features",
171+
Tags={
172+
"feast-owned": "True",
173+
"project": project,
174+
"feast-sdk-version": get_version(),
175+
},
176+
)
142177
if not api:
143-
# If the API Gateway doesn't exist, create it
144-
_logger.info(" Creating AWS API Gateway...")
145-
api = api_gateway_client.create_api(
146-
Name=resource_name,
147-
ProtocolType="HTTP",
148-
Target=function["FunctionArn"],
149-
RouteKey="POST /get-online-features",
150-
Tags={
151-
"feast-owned": "True",
152-
"project": project,
153-
"feast-sdk-version": get_version(),
154-
},
155-
)
156-
if not api:
157-
raise AwsAPIGatewayDoesNotExist(resource_name)
158-
# Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
159-
api_id = api["ApiId"]
160-
region = lambda_client.meta.region_name
161-
account_id = aws_utils.get_account_id()
162-
lambda_client.add_permission(
163-
FunctionName=function["FunctionArn"],
164-
StatementId=str(uuid.uuid4()),
165-
Action="lambda:InvokeFunction",
166-
Principal="apigateway.amazonaws.com",
167-
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
168-
)
178+
raise AwsAPIGatewayDoesNotExist(resource_name)
179+
# Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
180+
api_id = api["ApiId"]
181+
region = lambda_client.meta.region_name
182+
account_id = aws_utils.get_account_id()
183+
lambda_client.add_permission(
184+
FunctionName=function["FunctionArn"],
185+
StatementId=str(uuid.uuid4()),
186+
Action="lambda:InvokeFunction",
187+
Principal="apigateway.amazonaws.com",
188+
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
189+
)
169190

170191
def teardown_infra(
171192
self,
@@ -180,7 +201,7 @@ def teardown_infra(
180201
and self.repo_config.feature_server.enabled
181202
):
182203
_logger.info("Tearing down feature server...")
183-
resource_name = self._get_lambda_name(project)
204+
resource_name = _get_lambda_name(project)
184205
lambda_client = boto3.client("lambda")
185206
api_gateway_client = boto3.client("apigatewayv2")
186207

@@ -197,7 +218,7 @@ def teardown_infra(
197218

198219
def get_feature_server_endpoint(self) -> Optional[str]:
199220
project = self.repo_config.project
200-
resource_name = self._get_lambda_name(project)
221+
resource_name = _get_lambda_name(project)
201222
api_gateway_client = boto3.client("apigatewayv2")
202223
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
203224

@@ -209,25 +230,15 @@ def get_feature_server_endpoint(self) -> Optional[str]:
209230
region = lambda_client.meta.region_name
210231
return f"https://{api_id}.execute-api.{region}.amazonaws.com"
211232

212-
def _upload_docker_image(self, project: str) -> str:
233+
def _upload_docker_image(
234+
self, ecr_client, repository_uri: str, version: str
235+
) -> str:
213236
"""
214237
Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR.
215238
216-
Args:
217-
project: Feast project name
218-
219239
Returns:
220240
The URI of the uploaded docker image.
221241
"""
222-
import base64
223-
224-
try:
225-
import boto3
226-
except ImportError as e:
227-
from feast.errors import FeastExtrasDependencyImportError
228-
229-
raise FeastExtrasDependencyImportError("aws", str(e))
230-
231242
try:
232243
import docker
233244
from docker.errors import APIError
@@ -244,52 +255,72 @@ def _upload_docker_image(self, project: str) -> str:
244255
raise DockerDaemonNotRunning()
245256

246257
_logger.info(
247-
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}:"
258+
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}"
248259
)
249-
docker_client.images.pull(AWS_LAMBDA_FEATURE_SERVER_IMAGE)
250-
251-
version = self._get_version_for_aws()
252-
repository_name = f"feast-python-server-{project}-{version}"
253-
ecr_client = boto3.client("ecr")
254-
try:
255-
_logger.info(
256-
f"Creating remote ECR repository {Style.BRIGHT + Fore.GREEN}{repository_name}{Style.RESET_ALL}:"
257-
)
258-
response = ecr_client.create_repository(repositoryName=repository_name)
259-
repository_uri = response["repository"]["repositoryUri"]
260-
except ecr_client.exceptions.RepositoryAlreadyExistsException:
261-
response = ecr_client.describe_repositories(
262-
repositoryNames=[repository_name]
263-
)
264-
repository_uri = response["repositories"][0]["repositoryUri"]
260+
for line in docker_client.api.pull(
261+
AWS_LAMBDA_FEATURE_SERVER_IMAGE, stream=True, decode=True
262+
):
263+
_logger.debug(f" {line}")
265264

266265
auth_token = ecr_client.get_authorization_token()["authorizationData"][0][
267266
"authorizationToken"
268267
]
269268
username, password = base64.b64decode(auth_token).decode("utf-8").split(":")
270269

271270
ecr_address = repository_uri.split("/")[0]
272-
docker_client.login(username=username, password=password, registry=ecr_address)
271+
_logger.info(
272+
f"Logging in Docker client to {Style.BRIGHT + Fore.GREEN}{ecr_address}{Style.RESET_ALL}"
273+
)
274+
login_status = docker_client.login(
275+
username=username, password=password, registry=ecr_address
276+
)
277+
_logger.debug(f" {login_status}")
273278

274279
image = docker_client.images.get(AWS_LAMBDA_FEATURE_SERVER_IMAGE)
275280
image_remote_name = f"{repository_uri}:{version}"
276281
_logger.info(
277-
f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}:"
282+
f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}"
278283
)
279284
image.tag(image_remote_name)
280-
docker_client.api.push(repository_uri, tag=version)
285+
for line in docker_client.api.push(
286+
repository_uri, tag=version, stream=True, decode=True
287+
):
288+
_logger.debug(f" {line}")
289+
281290
return image_remote_name
282291

283-
def _get_lambda_name(self, project: str):
284-
return f"feast-python-server-{project}-{self._get_version_for_aws()}"
292+
def _create_or_get_repository_uri(self, ecr_client):
293+
try:
294+
return ecr_client.describe_repositories(
295+
repositoryNames=[AWS_LAMBDA_FEATURE_SERVER_REPOSITORY]
296+
)["repositories"][0]["repositoryUri"]
297+
except ecr_client.exceptions.RepositoryNotFoundException:
298+
_logger.info(
299+
f"Creating remote ECR repository {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_REPOSITORY}{Style.RESET_ALL}"
300+
)
301+
response = ecr_client.create_repository(
302+
repositoryName=AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
303+
)
304+
return response["repository"]["repositoryUri"]
305+
285306

286-
@staticmethod
287-
def _get_version_for_aws():
288-
"""Returns Feast version with certain characters replaced.
307+
def _get_lambda_name(project: str):
308+
lambda_prefix = AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
309+
lambda_suffix = f"{project}-{_get_version_for_aws()}"
310+
# AWS Lambda name can't have the length greater than 64 bytes.
311+
# This usually occurs during integration tests or when feast is
312+
# installed in editable mode (pip install -e), where feast version is long
313+
if len(lambda_prefix) + len(lambda_suffix) >= 63:
314+
lambda_suffix = base64.b64encode(lambda_suffix.encode()).decode()[:40]
315+
return f"{lambda_prefix}-{lambda_suffix}"
289316

290-
This allows the version to be included in names for AWS resources.
291-
"""
292-
return get_version().replace(".", "_").replace("+", "_")
317+
318+
def _get_version_for_aws():
319+
"""Returns Feast version with certain characters replaced.
320+
321+
This allows the version to be included in names for AWS resources.
322+
"""
323+
return get_version().replace(".", "_").replace("+", "_")
293324

294325

295326
class S3RegistryStore(RegistryStore):

0 commit comments

Comments
 (0)