|
1 | 1 | import abc |
2 | 2 | import argparse |
3 | 3 | import json |
| 4 | +import logging |
| 5 | +import os |
4 | 6 | from base64 import b64decode |
5 | 7 | from datetime import timedelta |
| 8 | +from logging.config import dictConfig |
6 | 9 | from typing import Any, Dict, List, NamedTuple, Optional |
7 | 10 |
|
8 | 11 | from pyspark.sql import DataFrame, SparkSession, Window |
|
13 | 16 | CREATED_TIMESTAMP_ALIAS = "created_timestamp" |
14 | 17 |
|
15 | 18 |
|
| 19 | +def get_termination_log_path(): |
| 20 | + if os.access("/dev/termination-log", os.W_OK): |
| 21 | + return "/dev/termination-log" |
| 22 | + return "/dev/stderr" |
| 23 | + |
| 24 | + |
| 25 | +DEFAULT_LOGGING = { |
| 26 | + "version": 1, |
| 27 | + "disable_existing_loggers": False, |
| 28 | + "formatters": {"standard": {"format": "%(asctime)s [%(levelname)s] %(message)s"}}, |
| 29 | + "handlers": { |
| 30 | + "default": { |
| 31 | + "class": "logging.StreamHandler", |
| 32 | + "level": "INFO", |
| 33 | + "formatter": "standard", |
| 34 | + }, |
| 35 | + "file": { |
| 36 | + "class": "logging.FileHandler", |
| 37 | + "level": "ERROR", |
| 38 | + "formatter": "standard", |
| 39 | + "filename": get_termination_log_path(), |
| 40 | + "mode": "a", |
| 41 | + }, |
| 42 | + }, |
| 43 | + "loggers": {"__main__": {"level": "INFO", "handlers": ["default", "file"]}}, |
| 44 | +} |
| 45 | + |
| 46 | +dictConfig(DEFAULT_LOGGING) |
| 47 | +logger = logging.getLogger(__name__) |
| 48 | + |
| 49 | + |
16 | 50 | class Source(abc.ABC): |
17 | 51 | """ |
18 | 52 | Source for an entity or feature dataframe. |
@@ -804,11 +838,15 @@ def json_b64_decode(s: str) -> Any: |
804 | 838 | feature_tables_sources_conf = json_b64_decode(args.feature_tables_sources) |
805 | 839 | entity_source_conf = json_b64_decode(args.entity_source) |
806 | 840 | destination_conf = json_b64_decode(args.destination) |
807 | | - start_job( |
808 | | - spark, |
809 | | - entity_source_conf, |
810 | | - feature_tables_sources_conf, |
811 | | - feature_tables_conf, |
812 | | - destination_conf, |
813 | | - ) |
| 841 | + try: |
| 842 | + start_job( |
| 843 | + spark, |
| 844 | + entity_source_conf, |
| 845 | + feature_tables_sources_conf, |
| 846 | + feature_tables_conf, |
| 847 | + destination_conf, |
| 848 | + ) |
| 849 | + except Exception as e: |
| 850 | + logger.exception(e) |
| 851 | + raise e |
814 | 852 | spark.stop() |
0 commit comments