Skip to content

Commit c539478

Browse files
authored
Log spark retrieval error message (#74)
* Log spark retrieval error message Signed-off-by: Terence Lim <[email protected]> * Update check Signed-off-by: Terence Lim <[email protected]> * Shift logging to retrieval job Signed-off-by: Terence Lim <[email protected]> * Fix termination log path and update logger Signed-off-by: Terence Lim <[email protected]> * Fix logger dictConfig Signed-off-by: Terence Lim <[email protected]> * Fix handler import Signed-off-by: Terence Lim <[email protected]> * Create /dev directory Signed-off-by: Terence Lim <[email protected]> * Add default logging handler Signed-off-by: Terence Lim <[email protected]> * Reraise exception after logging Signed-off-by: Terence Lim <[email protected]> * Update remote job Signed-off-by: Terence Lim <[email protected]> * Some fixes Signed-off-by: Terence Lim <[email protected]> * Address PR comments Signed-off-by: Terence Lim <[email protected]> * Rename function Signed-off-by: Terence Lim <[email protected]> * Catch permission error Signed-off-by: Terence Lim <[email protected]> * Rename variables Signed-off-by: Terence Lim <[email protected]>
1 parent 740f5a9 commit c539478

File tree

8 files changed

+79
-9
lines changed

8 files changed

+79
-9
lines changed

infra/docker/spark/Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,7 @@ RUN mkdir -p /opt/spark/conf
3333
RUN echo 'spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"' >> $SPARK_HOME/conf/spark-defaults.conf
3434
RUN echo 'spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"' >> $SPARK_HOME/conf/spark-defaults.conf
3535

36+
# For logging to /dev/termination-log
37+
RUN mkdir -p /dev
38+
3639
ENTRYPOINT [ "/opt/entrypoint.sh" ]

protos/feast_spark/api/JobService.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ message Job {
104104

105105
// Path to Spark job logs, if available
106106
string log_uri = 9;
107+
108+
// Spark job error message, if available
109+
string error_message = 10;
107110
}
108111

109112
// Ingest data from offline store into online store

python/feast_spark/job_service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto:
7171
job = JobProto()
7272
job.id = spark_job.get_id()
7373
job.log_uri = cast(str, spark_job.get_log_uri() or "")
74+
job.error_message = cast(str, spark_job.get_error_message() or "")
7475
status = spark_job.get_status()
7576
if status == SparkJobStatus.COMPLETED:
7677
job.status = JobStatus.JOB_STATUS_DONE

python/feast_spark/pyspark/abc.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ def get_log_uri(self) -> Optional[str]:
8080
"""
8181
return None
8282

83+
def get_error_message(self) -> Optional[str]:
84+
"""
85+
Get Spark job error message, if applicable.
86+
"""
87+
return None
88+
8389

8490
class SparkJobParameters(abc.ABC):
8591
@abc.abstractmethod

python/feast_spark/pyspark/historical_feature_retrieval_job.py

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import abc
22
import argparse
33
import json
4+
import logging
5+
import os
46
from base64 import b64decode
57
from datetime import timedelta
8+
from logging.config import dictConfig
69
from typing import Any, Dict, List, NamedTuple, Optional
710

811
from pyspark.sql import DataFrame, SparkSession, Window
@@ -13,6 +16,37 @@
1316
CREATED_TIMESTAMP_ALIAS = "created_timestamp"
1417

1518

19+
def get_termination_log_path():
20+
if os.access("/dev/termination-log", os.W_OK):
21+
return "/dev/termination-log"
22+
return "/dev/stderr"
23+
24+
25+
DEFAULT_LOGGING = {
26+
"version": 1,
27+
"disable_existing_loggers": False,
28+
"formatters": {"standard": {"format": "%(asctime)s [%(levelname)s] %(message)s"}},
29+
"handlers": {
30+
"default": {
31+
"class": "logging.StreamHandler",
32+
"level": "INFO",
33+
"formatter": "standard",
34+
},
35+
"file": {
36+
"class": "logging.FileHandler",
37+
"level": "ERROR",
38+
"formatter": "standard",
39+
"filename": get_termination_log_path(),
40+
"mode": "a",
41+
},
42+
},
43+
"loggers": {"__main__": {"level": "INFO", "handlers": ["default", "file"]}},
44+
}
45+
46+
dictConfig(DEFAULT_LOGGING)
47+
logger = logging.getLogger(__name__)
48+
49+
1650
class Source(abc.ABC):
1751
"""
1852
Source for an entity or feature dataframe.
@@ -804,11 +838,15 @@ def json_b64_decode(s: str) -> Any:
804838
feature_tables_sources_conf = json_b64_decode(args.feature_tables_sources)
805839
entity_source_conf = json_b64_decode(args.entity_source)
806840
destination_conf = json_b64_decode(args.destination)
807-
start_job(
808-
spark,
809-
entity_source_conf,
810-
feature_tables_sources_conf,
811-
feature_tables_conf,
812-
destination_conf,
813-
)
841+
try:
842+
start_job(
843+
spark,
844+
entity_source_conf,
845+
feature_tables_sources_conf,
846+
feature_tables_conf,
847+
destination_conf,
848+
)
849+
except Exception as e:
850+
logger.exception(e)
851+
raise e
814852
spark.stop()

python/feast_spark/pyspark/launchers/k8s/k8s.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ def __init__(self, api: CustomObjectsApi, namespace: str, job_id: str):
8686
def get_id(self) -> str:
8787
return self._job_id
8888

89+
def get_error_message(self) -> str:
90+
job = _get_job_by_id(self._api, self._namespace, self._job_id)
91+
assert job is not None
92+
return job.job_error_message
93+
8994
def get_status(self) -> SparkJobStatus:
9095
job = _get_job_by_id(self._api, self._namespace, self._job_id)
9196
assert job is not None
@@ -487,7 +492,7 @@ def start_stream_to_online_ingestion(
487492
def get_job_by_id(self, job_id: str) -> SparkJob:
488493
job_info = _get_job_by_id(self._api, self._namespace, job_id)
489494
if job_info is None:
490-
raise KeyError(f"Job iwth id {job_id} not found")
495+
raise KeyError(f"Job with id {job_id} not found")
491496
else:
492497
return self._job_from_job_info(job_info)
493498

python/feast_spark/pyspark/launchers/k8s/k8s_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ def _scheduled_crd_args(namespace: str) -> Dict[str, str]:
232232
class JobInfo(NamedTuple):
233233
job_id: str
234234
job_type: str
235+
job_error_message: str
235236
namespace: str
236237
extra_metadata: Dict[str, str]
237238
state: SparkJobStatus
@@ -266,12 +267,17 @@ def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo:
266267

267268
if "status" in resource:
268269
state = _k8s_state_to_feast(resource["status"]["applicationState"]["state"])
270+
error_message = (
271+
resource["status"].get("applicationState", {}).get("errorMessage", "")
272+
)
269273
else:
270274
state = _k8s_state_to_feast("")
275+
error_message = ""
271276

272277
return JobInfo(
273278
job_id=labels[LABEL_JOBID],
274279
job_type=labels.get(LABEL_JOBTYPE, ""),
280+
job_error_message=error_message,
275281
namespace=resource["metadata"].get("namespace", "default"),
276282
extra_metadata={k: v for k, v in sparkConf.items() if k in METADATA_KEYS},
277283
state=state,

python/feast_spark/remote_job.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ def _wait_for_job_status(
8383
def get_log_uri(self) -> Optional[str]:
8484
return self._log_uri
8585

86+
def get_error_message(self) -> str:
87+
job = self._service.GetJob(
88+
GetJobRequest(job_id=self._job_id), **self._grpc_extra_param_provider()
89+
).job
90+
return job.error_message
91+
8692

8793
class RemoteRetrievalJob(RemoteJobMixin, RetrievalJob):
8894
"""
@@ -117,7 +123,9 @@ def get_output_file_uri(self, timeout_sec=None):
117123
if status == SparkJobStatus.COMPLETED:
118124
return self._output_file_uri
119125
else:
120-
raise SparkJobFailure("Spark job failed")
126+
raise SparkJobFailure(
127+
f"Spark job failed; Reason:{self.get_error_message()}"
128+
)
121129

122130

123131
class RemoteBatchIngestionJob(RemoteJobMixin, BatchIngestionJob):

0 commit comments

Comments
 (0)