-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathapi_ext.py
1230 lines (1067 loc) · 60.9 KB
/
api_ext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Core API extensions for specialization in specific use-cases and for simplified experience.
For finer granular control, more flexibility core API should be used.
"""
import copy
import json
import logging
import uuid
from enum import Enum
from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Sequence, Tuple, Union, cast, overload
from intelliflow.core.application.context.node.filtered_views import FilteredView
from intelliflow.core.application.context.node.marshaling.nodes import MarshalingView
from intelliflow.core.entity import CoreData
from intelliflow.core.platform.compute_targets.descriptor import ComputeDescriptor
from intelliflow.core.platform.definitions.aws.common import CommonParams as AWSCommonParams
from intelliflow.core.platform.definitions.aws.glue.client_wrapper import GlueVersion, GlueWorkerType
from intelliflow.core.platform.definitions.aws.s3.bucket_wrapper import get_bucket
from intelliflow.core.platform.definitions.aws.s3.object_wrapper import list_objects
from intelliflow.core.signal_processing.signal import Signal, SignalDimensionTuple
from intelliflow.core.signal_processing.signal_source import (
PRIMARY_KEYS_KEY,
DataType,
ModelSignalSourceFormat,
DatasetSignalSourceAccessSpec,
S3SignalSourceAccessSpec,
SignalSourceType,
)
from intelliflow.core.signal_processing.slot import SlotCode, SlotCodeMetadata, SlotCodeType, SlotType
from .api import *
from .core.application.context.node.base import DataNode
from .core.platform.constructs import RoutingTable
from .core.platform.definitions.aws.sagemaker.client_wrapper import (
BUILTIN_ALGO_MODEL_ARTIFACT_RESOURCE_NAME,
SagemakerBuiltinAlgo,
convert_image_uri_to_arn,
)
from .core.platform.definitions.compute import ComputeExecutionDetails, ComputeSessionStateType
from .core.platform.drivers.compute.aws_emr import InstanceConfig, RuntimeConfig
from .core.signal_processing.definitions.dimension_defs import DEFAULT_DATETIME_GRANULARITY, DatetimeGranularity, Type
from .core.signal_processing.definitions.metric_alarm_defs import AlarmType, MetricSubDimensionMap, MetricSubDimensionMapType
from .core.signal_processing.dimension_constructs import (
AnyVariant,
DateVariant,
Dimension,
DimensionFilter,
DimensionSpec,
DimensionVariant,
DimensionVariantMapFunc,
)
from .core.signal_processing.routing_runtime_constructs import Route, RouteID, RuntimeLinkNode
from .utils.spark import SparkType
logger = logging.getLogger(__name__)
class S3DatasetExt(CoreData):
def __init__(self, account_id: str, bucket: str, folder: str, dimension_vars: List[DimensionVariant], attrs: Dict[str, Any]) -> None:
self.account_id = account_id
self.bucket = bucket
self.folder = folder
self.dimension_vars = dimension_vars
self.attrs = attrs
def S3(account_id: str, bucket: str, key_prefix: str, *dimension: DimensionVariant, **kwargs):
bucket = bucket[len("s3://") :] if bucket.startswith("s3://") else bucket
return S3DatasetExt(account_id, bucket, key_prefix, list(dimension), kwargs)
class AnyDate(AnyVariant):
def __init__(self, name: DimensionNameType, params: Optional[Dict[str, Any]] = None) -> None:
if params:
if DateVariant.FORMAT_PARAM not in params:
params.update({DateVariant.FORMAT_PARAM: "%Y-%m-%d"})
if DateVariant.GRANULARITY_PARAM not in params:
# DateVariant already takes care of the granulurity but at this level, we should
# still make sure for user convenience (and to be immune from future changes in that core module).
params.update({DateVariant.GRANULARITY_PARAM: DEFAULT_DATETIME_GRANULARITY})
super().__init__(name, Type.DATETIME, params)
self._value = self.ANY_DIMENSION_VALUE_SPECIAL_CHAR
class AnyString(AnyVariant):
def __init__(self, name: DimensionNameType, params: Optional[Dict[str, Any]] = None) -> None:
super().__init__(name, Type.STRING, params)
self._value = self.ANY_DIMENSION_VALUE_SPECIAL_CHAR
class AnyLong(AnyVariant):
def __init__(self, name: DimensionNameType, params: Optional[Dict[str, Any]] = None) -> None:
super().__init__(name, Type.Long, params)
self._value = self.ANY_DIMENSION_VALUE_SPECIAL_CHAR
# protocols
HADOOP_SUCCESS_FILE = completion_file("_SUCCESS")
class GlueBatchCompute(InternalDataNode.BatchComputeDescriptor):
def __init__(
self,
code: str,
lang: Lang = Lang.PYTHON,
abi: ABI = ABI.GLUE_EMBEDDED,
extra_permissions: List[Permission] = None,
retry_count: int = 0,
**kwargs,
) -> None:
"""Does basic check on AWS Glue Parametrization
Defaults:
Workers -> 25
Timeout -> 10 hours
"""
if not kwargs:
kwargs = dict()
if "Timeout" not in kwargs:
kwargs["Timeout"] = 600
if "GlueVersion" not in kwargs:
# let BatchCompute driver choose the right version.
# existence of this definition provides a hint to the platform as to what runtime conf is desired, impacting
# which BatchCompute to be used eventually.
kwargs["GlueVersion"] = GlueVersion.AUTO.value
if "MaxCapacity" not in kwargs:
if "WorkerType" not in kwargs:
kwargs["WorkerType"] = GlueWorkerType.G_1X.value
if "NumberOfWorkers" not in kwargs:
kwargs["NumberOfWorkers"] = 25
super().__init__(code, lang, abi, extra_permissions, retry_count, **kwargs)
Glue = GlueBatchCompute
class EmrBatchCompute(InternalDataNode.BatchComputeDescriptor):
def __init__(
self,
code: str,
extra_permissions: List[Permission] = None,
retry_count: int = 0,
**kwargs,
) -> None:
"""Does basic check on AWS EMR Parametrization
Defaults:
Workers -> 25 m5.xlarge
"""
lang: Lang = Lang.PYTHON
abi: ABI = ABI.GLUE_EMBEDDED
if not kwargs:
kwargs = dict()
if "InstanceConfig" not in kwargs:
kwargs["InstanceConfig"] = InstanceConfig(25)
if "RuntimeConfig" not in kwargs:
kwargs["RuntimeConfig"] = RuntimeConfig.AUTO
super().__init__(code, lang, abi, extra_permissions, retry_count, **kwargs)
EMR = EmrBatchCompute
class Spark(InternalDataNode.BatchComputeDescriptor):
def __init__(
self,
code: str,
lang: Lang = Lang.PYTHON,
abi: ABI = ABI.GLUE_EMBEDDED,
extra_permissions: List[Permission] = None,
retry_count: int = 0,
**kwargs,
) -> None:
super().__init__(code, lang, abi, extra_permissions, retry_count, **kwargs)
class SparkSQL(InternalDataNode.BatchComputeDescriptor):
def __init__(self, code: str, extra_permissions: List[Permission] = None, retry_count: int = 0, **kwargs) -> None:
# Temporarily use a typical PySpark conf so that any of the drivers would pick this up (till SPARK_SQL support)
# The following defaulting is not actually a coupling with Glue based BatchCompute drivers.
# AWS Glue and its API parametrization is adapted as a good basis for overall serverless model in IF
# We expect the following parameters to be supported by all of the BatchCompute drivers.
# They are hints to the drivers which still can overwrite or map them.
if "WorkerType" not in kwargs:
kwargs["WorkerType"] = GlueWorkerType.G_1X.value
if "NumberOfWorkers" not in kwargs:
kwargs["NumberOfWorkers"] = 25
if "GlueVersion" not in kwargs and "RuntimeConfig" not in kwargs:
kwargs["GlueVersion"] = GlueVersion.AUTO.value
## means "run in PySpark as inlined/embedded code"
super().__init__(
f'''
import re
queries = [query.strip() for query in re.split(";\\s+|;$", """{code}""")]
for query in queries:
if query:
output = spark.sql(query)
''',
Lang.PYTHON,
ABI.GLUE_EMBEDDED,
extra_permissions,
retry_count,
**kwargs,
)
## TODO support this Lang + ABI pair in Spark based BatchCompute drivers (Glue, EMR).
# we need this for two things:
# - parametrization of code with runtime 'dimensions' in BatchCompute::compute
# - enforcing language and ABI as PYTHON and EMBEDDED at this level keeps drivers from handling SQL code in a
# flexible way
# super().__init__(code, Lang.SPARK_SQL, ABI.PARAMETRIZED_QUERY, extra_permissions, retry_count, **kwargs)
class PrestoSQL(InternalDataNode.BatchComputeDescriptor):
def __init__(self, code: str, extra_permissions: List[Permission] = None, retry_count: int = 0, **kwargs) -> None:
super().__init__(code, Lang.PRESTO_SQL, ABI.PARAMETRIZED_QUERY, extra_permissions, retry_count, **kwargs)
class SagemakerTrainingJob(InternalDataNode.BatchComputeDescriptor):
def __init__(
self,
extra_permissions: List[Permission] = None,
retry_count: int = 0,
**kwargs,
) -> None:
if not kwargs:
raise ValueError(
"AWS Sagemaker training job parameters cannot be empty! " "Please provide them as keyword args to this compute descriptor."
)
# TODO do validations on kwargs based on
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_training_job
super().__init__("", Lang.AWS_SAGEMAKER_TRAINING_JOB, ABI.NONE, extra_permissions, retry_count, **kwargs)
class SagemakerTransformJob(InternalDataNode.BatchComputeDescriptor):
def __init__(
self,
extra_permissions: List[Permission] = None,
retry_count: int = 0,
**kwargs,
) -> None:
if not kwargs:
raise ValueError(
"AWS Sagemaker transform job parameters cannot be empty! " "Please provide them as keyword args to this compute descriptor."
)
super().__init__("", Lang.AWS_SAGEMAKER_TRANSFORM_JOB, ABI.NONE, extra_permissions, retry_count, **kwargs)
class AWSBatchJob(InternalDataNode.BatchComputeDescriptor):
def __init__(
self,
jobDefinition: Union[str, Dict],
jobQueue: Union[str, Dict],
extra_permissions: List[Permission] = None,
retry_count: int = 0,
**kwargs,
) -> None:
"""Define a AWS Batch Job compute with at least jobDefinition and jobQueue parameters.
Parameter naming and values/types are compatible with
https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.htmlo
with convenience exceptions of options;
- to define `jobDefinition` or `jobQueue` as strings (names) pointing to unmanaged
external resources (define `OrchestratorRoleARN` if they are from another account)
- or to define them as dictionaries for managed resources to be provisioned by the framework
- jobQueues can have "compute environment"s declared in nested dictionaries or again as unmanaged / external
resource strings (ARNs)
"""
if not jobDefinition:
raise ValueError("AWS Batch job must have 'jobDefinition' defined!")
if not jobQueue:
raise ValueError("AWS Batch job must have 'jobQueue' defined!")
kwargs.update({"jobDefinition": jobDefinition})
kwargs.update({"jobQueue": jobQueue})
super().__init__("", Lang.AWS_BATCH_JOB, ABI.NONE, extra_permissions, retry_count, **kwargs)
class AWSSSFNStateMachine(InternalDataNode.BatchComputeDescriptor):
def __init__(
self, definition: Union[str, Dict], extra_permissions: List[Permission] = None, retry_count: int = 0, **other_api_params
) -> None:
"""Creates or updates a framework managed AWS SFN State Machine using the API from
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions/client/create_state_machine.html
- do not define `name` as the resource is managed by the framework per node
- use of `publish` is not allowed as the new version is always auto-published during
application activation
- if `roleArn` is not defined, exec role of the application will be used automatically
Since create API is the common interface for both initial creation and update, framework takes care of corner
cases such as a change in `type` (which would normally not be possible) by resetting the resource and recreating
it if needed.
@param definition: Only "required" (bare minimum) parameter to create an AWS SFN state-machine compute with its definition in Amazon States language.
Refer
https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html
As an input to state-machine executions, framework will provide `inputs` and `output` maps within the input JSON that can be used via JSONPath in the
definition. e.g `$.output.dimension_map.region`, ``
@param extra_permissions: For `Task`s relying on invocations to resources not managed by this workflow, please define permissions
accordingly in `extra_permissions` so that the runtime exec role will have them in its policy.
@param retry_count: High-level retry logic (on top of any retry strategy used within the state-machine) applied
by the framework against transient issues communicating with AWS SFN APIs or transient execution errors.
@param other_api_params: For all the other parameters from
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions/client/create_state_machine.html
"""
if not definition:
raise ValueError("AWS SFN state-machine must have 'definition' parameter defined!")
other_api_params.update({"definition": definition})
super().__init__("", Lang.AWS_SFN_STATE_MACHINE, ABI.NONE, extra_permissions, retry_count, **other_api_params)
AWSStateMachine = AWSSSFNStateMachine
StateMachine = AWSSSFNStateMachine
class ApplicationExt(Application):
@classmethod
def _create_spec(cls, variants: List[DimensionVariant]) -> DimensionSpec:
if not variants:
return None
spec: DimensionSpec = DimensionSpec()
dimension_variant = variants[0]
sub_spec = cls._create_spec(variants[1:])
spec.add_dimension(Dimension(dimension_variant.name, dimension_variant.type, dimension_variant.params), sub_spec)
return spec
@classmethod
def _create_filter(cls, variants: List[DimensionVariant]) -> DimensionFilter:
if not variants:
return None
filter: DimensionFilter = DimensionFilter()
dimension_variant = variants[0]
sub_filter = cls._create_filter(variants[1:])
filter.add_dimension(dimension_variant, sub_filter)
return filter
def add_external_data(self, data_id: str, s3_dataset: S3DatasetExt, completion_file: str = "_SUCCESS"):
# use the same decl order to setup hierarchy for DimensionSpec and DimensionFilter
spec: DimensionSpec = self._create_spec(s3_dataset.dimension_vars)
filter: DimensionFilter = self._create_filter(s3_dataset.dimension_vars)
return self.marshal_external_data(
S3Dataset(
s3_dataset.account_id,
s3_dataset.bucket,
s3_dataset.folder,
*["{}" for dim in s3_dataset.dimension_vars],
**s3_dataset.attrs,
),
data_id,
spec,
filter,
SignalIntegrityProtocol("FILE_CHECK", {"file": completion_file}) if completion_file else None,
)
def add_timer(
self,
id: str,
schedule_expression: str,
time_dimension_id: str = "time",
time_dimension_format: str = "%Y-%m-%d",
time_dimension_granularity: DatetimeGranularity = None,
**kwargs,
) -> MarshalerNode:
"""Create new timer signal within this application. The signal can be bound to other Application APIs most
notably to create_data to achieve time based scheduling, etc.
This API is actually a convenience wrapper around "Application::create_timer".
:param id: internal ID of the new signal, can be used to retrieve this signal using get_timer API. It will be
used as the default alias for this signal if it is used as an input in downstream create_data calls.
:param schedule_expression: expression supported by underlying platform configuration. E.g in AWSConfiguration
this parameter is AWS CloudWatch scheduled expressions that can be in either CRON format or "rate(x [minute(s)|day(s)|...])
:param time_dimension_id: Just as other RheocerOS signals, this timer will have a DimensionSpec with only one
dimension of DATETIME type implicitly. Provide the 'id' ('name') of the dimension (e.g dataset partition) using
this parameter. Partition name will be defaulted to 'time' if not defined by the user.
:param time_dimension_format: Just as other RheocerOS signals, this timer will have a DimensionSpec with only one
dimension of DATETIME type implicitly. Provide datetime 'format' of the dimension (e.g '%Y-%m-%d %H-%M') using
this parameter. Partition format will be set as '%Y-%m-%d' if not defined by the user.
:param time_dimension_granularity: Just as other RheocerOS signals, this timer will have a DimensionSpec with only one
dimension of DATETIME type implicitly. Provide datetime 'granularity' of the dimension (e.g DatatimeGranularity.DAY)
using this parameter. Partition granularity will be set as DatetimeGranularity.DAY implicitly if not defined by
the user.
:param kwargs: user provided metadata (for purposes such as cataloguing, etc)
:return: A new MarshalerNode that encapsulates the timer in development time. Returned value can be used
as an input to many other Application APIs in a convenient way. Most important of them is 'create_data'
which can use MarshalerNode and its filtered version (FilteredView) as inputs.
"""
time_dim_params = {DateVariant.FORMAT_PARAM: time_dimension_format, DateVariant.TIMEZONE_PARAM: "UTC"}
if time_dimension_granularity:
time_dim_params.update({DateVariant.GRANULARITY_PARAM: time_dimension_granularity.value})
time_dim = AnyDate(time_dimension_id, time_dim_params)
return self.create_timer(id, schedule_expression, time_dim, **kwargs)
def data(self, data_id: str) -> Optional[MarshalerNode]:
# check ALL applications (self + upstreams) from the active context
marshalers: List[MarshalerNode] = self.get_data(data_id)
if marshalers:
if len(marshalers) > 1:
raise ValueError(
f"Cannot resolve DataNode! Application::data API cannot be used when the ID maps to multiple data nodes!"
f" Entity id: {data_id}, type: <data>, # of mapped entities: {len(marshalers)}, entities: {marshalers}"
)
return marshalers[0]
def timer(self, timer_id: str) -> Optional[MarshalerNode]:
marshalers: List[MarshalerNode] = self.get_timer(timer_id)
if marshalers:
if len(marshalers) > 1:
raise ValueError(
f"Application::timer API cannot be used when the ID maps to multiple nodes of same type!"
f" Entity id: {timer_id}, type: <timer>, # of mapped entities: {len(marshalers)}"
)
return marshalers[0]
def metric(self, metric_id: str, sub_dimensions: Optional[MetricSubDimensionMapType] = None) -> Optional[MarshalerNode]:
marshalers: List[MarshalerNode] = self.get_metric(metric_id, sub_dimensions)
if marshalers:
if len(marshalers) > 1:
raise ValueError(
f"Application::metric API cannot be used when the ID (and sub_dimensions) map to "
f"multiple metric nodes!"
f" Metric id: {metric_id}, sub_dimensions: {sub_dimensions!r},"
f" type: <metric>, # of mapped entities: {len(marshalers)}"
)
return marshalers[0]
def alarm(self, alarm_id: str, alarm_type: AlarmType = AlarmType.ALL) -> Optional[MarshalerNode]:
marshalers: List[MarshalerNode] = self.get_alarm(alarm_id, alarm_type)
if marshalers:
if len(marshalers) > 1:
raise ValueError(
f"Application::alarm API cannot be used when the ID (and alarm_type) map to "
f"multiple alarm nodes!"
f" Alarm id: {alarm_id}, alarm_type: {alarm_type!r}"
f" type: <alarm>, # of mapped entities: {len(marshalers)}"
)
return marshalers[0]
def __getitem__(self, entity_id_or_slice) -> MarshalerNode:
# FUTURE when other node types are introduced call their respective retrieval API
# ex: model(model_id)
if isinstance(entity_id_or_slice, str):
entity_id = entity_id_or_slice
marshaler = self.data(entity_id)
if marshaler:
return marshaler
marshaler = self.timer(entity_id)
if marshaler:
return marshaler
marshaler = self.metric(entity_id)
if marshaler:
return marshaler
marshaler = self.alarm(entity_id)
if marshaler:
return marshaler
elif isinstance(entity_id_or_slice, slice):
# exceptional treatment for metrics since 'sub_dimensions' are very likely to be used to uniquely identify
# a metric. in other terms, it is allowed among metrics nodes to share the same ID.
entity_id = entity_id_or_slice.start
resolver_param = entity_id_or_slice.stop
if not (isinstance(entity_id, str) and isinstance(resolver_param, (MetricSubDimensionMap, AlarmType))):
raise ValueError(
f"Wrong types in <slice> ({entity_id_or_slice!r}) for Application::__getitem__!"
f" Must be in the form of [<METRIC_ID | ALARM_ID> : <SUB_DIMENSIONS_MAP | AlarmType>],"
f" Metric example: ['my_custom_metric', {{'sub_dim': 'value'}}], "
f" Alarm example: ['my_alarm' : AlarmType.COMPOSITE]"
)
if isinstance(resolver_param, MetricSubDimensionMap):
marshaler = self.metric(entity_id, sub_dimensions=resolver_param)
if marshaler:
return marshaler
elif isinstance(resolver_param, AlarmType):
marshaler = self.alarm(entity_id, alarm_type=resolver_param)
if marshaler:
return marshaler
else:
raise ValueError(
f"Please provide a <str> or a <slice> (e.g app['metric_id':{{'sub_dim1': 'value'}}]) for"
f" Application::__getitem__ API! Parameter {entity_id_or_slice!r} is not valid."
)
raise ValueError(
f"Cannot find any active node with ID '{entity_id}' within the application '{self._id}'."
f" If you are looking for a node from the dev context (which was added to the application in"
f" this development session but not activated yet), then you cannot use this API. PLease"
f" use respective getter APIs depending on the node type (e.g get_data for data nodes, "
f"get_timer for timer nodes, get_metric for metric nodes, etc). These APIs allow you to"
f"specify 'context' parameter which can be set as either QueryContext.DEV_CONTEXT or "
f"QueryContext.ALL."
)
class AWSApplication(ApplicationExt):
def __init__(
self,
app_name: str,
region_or_platform: Union[str, HostPlatform] = None,
dev_role_account_id: str = None,
access_id: str = None,
access_key: str = None,
enforce_runtime_compatibility: bool = True,
**kwargs,
) -> None:
from intelliflow.core.deployment import is_on_remote_dev_env
# limiting application name length to fail quickly and to avoid creation of dev role and storage account.
# this is a temporary fix and should eventually be handled in the respective cloud services drivers
assert len(app_name) <= 16, "App name should be of length 16 or less"
conf_builder = AWSConfiguration.builder()
if isinstance(region_or_platform, str) or "region" in kwargs:
region = region_or_platform if isinstance(region_or_platform, str) else kwargs["region"]
if dev_role_account_id and not is_on_remote_dev_env():
conf_builder = conf_builder.with_dev_role_credentials(dev_role_account_id)
elif access_id is not None and access_key is not None:
conf_builder = conf_builder.with_admin_access_pair(access_id, access_key)
else:
conf_builder = conf_builder.with_default_credentials(as_admin=True)
for k, v in kwargs.items():
conf_builder.with_param(k, v)
super().__init__(app_name, HostPlatform(conf_builder.with_region(region).build()), enforce_runtime_compatibility)
elif isinstance(region_or_platform, HostPlatform):
platform = region_or_platform
for k, v in kwargs.items():
platform.conf.add_param(k, v)
super().__init__(app_name, platform, enforce_runtime_compatibility)
else:
raise ValueError(f"Please provide HostPlatform object or 'region' parameter for AWSApplication!")
def get_upstream(
self, remote_app_name: str, aws_acc_id: str, region: str
) -> Mapping[Application.QueryContext, List["RemoteApplication"]]:
return self.get_upstream_applications(
remote_app_name,
AWSConfiguration.builder().with_account_id(aws_acc_id).with_region(region).build(),
Application.QueryContext.ALL,
)
def import_upstream(self, remote_app_name: str, aws_acc_id: str, region: str) -> "RemoteApplication":
"""Make a remote application's artifacts (data, model, etc) available to this application.
Remote app must have authorized this caller app before via 'authorize_downstream' API.
"""
return self.import_upstream_application(
remote_app_name, AWSConfiguration.builder().with_account_id(aws_acc_id).with_region(region).build()
)
def authorize_downstream(self, remote_app_name, aws_acc_id: str, region: str, **params: Dict[str, Any]) -> None:
"""Authorize another RheocerOS application so that it will be able to do
'import_upstream' for this application and be able to use/connect to the artifacts
(such as data, model, etc) seamlessly."""
self.export_to_downstream_application(
remote_app_name, AWSConfiguration.builder().with_account_id(aws_acc_id).with_region(region).build(), **params
)
def glue_table(
self,
database: str,
table_name: str,
source_account_id: str = None, # might be required or recommended to provide for S3 datasets
table_type: Optional[Union["DatasetType", str]] = None,
primary_keys: Optional[List[str]] = None,
id: Optional[str] = None,
dimension_spec: Optional[Union[DimensionSpec, Dict[str, Any]]] = None,
dimension_filter: Optional[Union[DimensionFilter, Dict[str, Any]]] = None,
protocol: SignalIntegrityProtocol = None,
tags: str = None,
**metadata, # catalog metadata, etc
) -> MarshalerNode:
"""Convenience API that flattens out all of the parameters that *might* be required based on the information
available on catalog and table type.
If stars are aligned, this API provides the best UX as shown below:
ucsi_table= app.glue_table('booker', 'unified_customer_shipment_items')
"""
if source_account_id is not None:
metadata["account_id"] = source_account_id
if table_type is not None:
metadata["table_type"] = table_type
if primary_keys is not None:
metadata[PRIMARY_KEYS_KEY] = primary_keys
return self.marshal_external_data(GlueTable(database, table_name, **metadata), id, dimension_spec, dimension_filter, protocol, tags)
def create_data(
self,
id: str,
inputs: Union[List[Union[FilteredView, MarshalerNode]], Dict[str, Union[FilteredView, MarshalerNode]]] = None,
input_dim_links: Optional[Sequence[Tuple[SignalDimensionTuple, DimensionVariantMapFunc, SignalDimensionTuple]]] = None,
output_dimension_spec: Optional[Union[Dict[str, Any], DimensionSpec]] = None,
output_dim_links: Optional[
Sequence[
Tuple[
Union[OutputDimensionNameType, SignalDimensionTuple],
DimensionVariantMapFunc,
Union[OutputDimensionNameType, Tuple[OutputDimensionNameType, ...], SignalDimensionTuple],
]
]
] = None,
compute_targets: Optional[Union[Sequence[ComputeDescriptor], str]] = None,
execution_hook: RouteExecutionHook = None,
pending_node_hook: RoutePendingNodeHook = None,
pending_node_expiration_ttl_in_secs: int = None,
auto_input_dim_linking_enabled=True,
auto_output_dim_linking_enabled=True,
auto_backfilling_enabled=False,
protocol: SignalIntegrityProtocol = InternalDataNode.DEFAULT_DATA_COMPLETION_PROTOCOL,
**kwargs,
) -> MarshalerNode:
"""Overwrites Application::create_data in order to provide defaulting on parameters, specific to AWS use-case.
:param id: id/route_id/data_id of the new internal data node to be created. this ID will be used to find/get this
node while using the other APIs of the Application. It is also used as part of the route_id to be used in runtime.
:param inputs: Filtered or unfiltered references of other data nodes which are the return values of previous calls
to node generating APIs such as marshal_external_data or again the same API 'create_data'.
:param input_dim_links: How should the 'inputs' be linked to each other over their dimensions? This is important
to determine executions at runtime. DEFAULT value is EMPTY. While empty, if 'auto_input_dim_linking_enabled' is set False,
then any combination of input signals would yield an execution. Input dimensions on either side of the link must be referred
using the input signals from 'inputs' ( e.g input('input_dimension_name') or input.dimension('dim_name')).
For multiple dimension use of inputs on the right hand side, add extra dimensions as new arguments to __call__
call or 'dimension' method call on the input signal object (e.g input('dim1', 'dim2') or input.dimension('dim1', 'dim2')).
Multiple dimensions can only be used on the right hand side. Signature (argument count) of the mapper function
for the link should be compatible with the number of dimensions used on the right hand side.
:param output_dimension_spec: What are the dimensions of the signal that would represent this new node? And what is the
structure/order? If left unset, then the DEFAULT value is equivalent to the spec of the first input.
:param output_dim_links: How should the output and the inputs relate each other? Which dimensions of the output can
be retrieved from which input dimensions at runtime? if output_dimension_spec input is empty, then the DEFAULT value
is (aligned with the default behaviour for the spec) equality/linking on the dimensions of the first input. If
'output_dimension_spec' is not empty but this param is left empty, then the outcome is determined by
'auto_output_dim_linking_enabled' param. Output dimensions on either side of the link must be referred using
type str aliased as <OutputDimensionNameType>. Input dimensions on either side of the link must be referred
using the input signals from 'inputs' ( e.g input('input_dimension_name') or input.dimension('dim_name')).
To use multiple dimensions on the right hand side for output use a tuple of <OutputDimensionNameType>s. But for
multiple dimension use of inputs on the right hand side, add extra dimensions as new arguments to __call__ call
or 'dimension' method call on the input signal object (e.g input('dim1', 'dim2') or
input.dimension('dim1', 'dim2')). Multiple dimensions can only be used on the right hand side.
Signature (argument count) of the mapper function for the link should be compatible with the number of
dimensions used on the right hand side.
:param compute_targets: When incoming signals for the inputs are linked successfully and a new execution context is created
at runtime, which compute targets should be run using those signals and their material dimension values? For AWS case,
user can optionally provide the code. In that case, this API will by DEFAULT wrap with GlueBatchCompute.
:param execution_hook: Provide an instance of <ExecutionHook> (or <RouteExecutionHook>) to have runtime hooks
into your own code along with remote execution and compute actions. Each callback/hook can either be pure Python
Callable or a Callable wrapped by InlinedCompute type. RheocerOS provides interfaces for each hook type. Please
see the internal types from class <RoutingHookInterface.Execution>: <IExecutionBeginHook>, <IExecutionSkippedHook>,
<IExecutionSuccessHook>, <IExecutionFailureHook>, ...
:param pending_node_hook: Provide an instance of <PendingNodeHook> (or <RoutePendingNodeHook>) to have runtime hooks
into your own code when pending event-trigger groups (pending nodes) are created (first ever event is received), expired or
when a checkpoint is hit. For expiration hook to be called, the next param 'pending_node_expiration_ttl_in_secs' must be
defined. Defining expiration hook without an expiration TTL is not allowed. Each callback/hook can either be pure Python
Callable or a Callable wrapped by InlinedCompute type. RheocerOS provides interfaces for each hook type. Please
see the internal types from class <RoutingHookInterface.PendingNode>: <IPendingNodeCreationHook>, <IPendingNodeExpirationHook>,
<IPendingCheckpointHook>
:param pending_node_expiration_ttl_in_secs: Determine how long the system should keep track of a pending event trigger
group. For example: an event was received a week ago on a particular dimension values (e.g date partition), but for the
other inputs of your data node, there has been no events so far. This forms a Pending Node and without a TTL RheocerOS
persists and tracks them forever until routing data reset (incompatible update), terminate or internal error occurs.
:param auto_input_dim_linking_enabled: Enables the convenience functionality to link inputs to each other over
same 'dimensions'. Unique dimensions are still left unlinked.
:param auto_output_dim_linking_enabled: Enables the convenience functionality to link output dimensions to any
of the inputs based on the assumption of dimension name equality.
:param auto_backfilling_enabled: TODO
:param protocol: completition protocol for the output. default value if "_SUCCESS" file based pritimitive
protocol (also used by Hadoop, etc).
:param kwargs: Provide metadata. Format and content are up to the client and they are guaranteed to be preserved.
:return: A new MarshalerNode that encapsulates the internal data on the client side. Returned value can be used
as an input to many other Application APIs in a convenient way. Most important of them is again this same API
'create_data' which can use MarshalerNode and its filtered version (FilteredView) as inputs.
"""
return self._create_or_update_data_with_defaults(
id,
inputs,
input_dim_links,
output_dimension_spec,
output_dim_links,
compute_targets,
execution_hook,
pending_node_hook,
pending_node_expiration_ttl_in_secs,
auto_input_dim_linking_enabled,
auto_output_dim_linking_enabled,
auto_backfilling_enabled,
protocol,
is_update=False,
**kwargs,
)
def update_data(
self,
id: str,
inputs: Union[List[Union[FilteredView, MarshalerNode]], Dict[str, Union[FilteredView, MarshalerNode]]] = None,
input_dim_links: Optional[Sequence[Tuple[SignalDimensionTuple, DimensionVariantMapFunc, SignalDimensionTuple]]] = None,
output_dimension_spec: Optional[Union[Dict[str, Any], DimensionSpec]] = None,
output_dim_links: Optional[
Sequence[
Tuple[
Union[OutputDimensionNameType, SignalDimensionTuple],
DimensionVariantMapFunc,
Union[OutputDimensionNameType, Tuple[OutputDimensionNameType, ...], SignalDimensionTuple],
]
]
] = None,
compute_targets: Optional[Union[Sequence[ComputeDescriptor], str]] = None,
execution_hook: RouteExecutionHook = None,
pending_node_hook: RoutePendingNodeHook = None,
pending_node_expiration_ttl_in_secs: int = None,
auto_input_dim_linking_enabled=True,
auto_output_dim_linking_enabled=True,
auto_backfilling_enabled=False,
protocol: SignalIntegrityProtocol = InternalDataNode.DEFAULT_DATA_COMPLETION_PROTOCOL,
enforce_referential_integrity=True,
**kwargs,
) -> MarshalerNode:
"""See AWSApplication::create_data for parametrization.
Updates an existing data node using Application::update_data after applying some defaulting specific to AWS.
"""
return self._create_or_update_data_with_defaults(
id,
inputs,
input_dim_links,
output_dimension_spec,
output_dim_links,
compute_targets,
execution_hook,
pending_node_hook,
pending_node_expiration_ttl_in_secs,
auto_input_dim_linking_enabled,
auto_output_dim_linking_enabled,
auto_backfilling_enabled,
protocol,
enforce_referential_integrity,
is_update=True,
**kwargs,
)
def _create_or_update_data_with_defaults(
self,
id: str,
inputs: Union[List[Union[FilteredView, MarshalerNode]], Dict[str, Union[FilteredView, MarshalerNode]]] = None,
input_dim_links: Optional[Sequence[Tuple[SignalDimensionTuple, DimensionVariantMapFunc, SignalDimensionTuple]]] = None,
output_dimension_spec: Optional[Union[Dict[str, Any], DimensionSpec]] = None,
output_dim_links: Optional[
Sequence[
Tuple[
Union[OutputDimensionNameType, SignalDimensionTuple],
DimensionVariantMapFunc,
Union[OutputDimensionNameType, Tuple[OutputDimensionNameType, ...], SignalDimensionTuple],
]
]
] = None,
compute_targets: Optional[Union[Sequence[ComputeDescriptor], str]] = None,
execution_hook: RouteExecutionHook = None,
pending_node_hook: RoutePendingNodeHook = None,
pending_node_expiration_ttl_in_secs: int = None,
auto_input_dim_linking_enabled=True,
auto_output_dim_linking_enabled=True,
auto_backfilling_enabled=False,
protocol: SignalIntegrityProtocol = InternalDataNode.DEFAULT_DATA_COMPLETION_PROTOCOL,
enforce_referential_integrity=True,
is_update=False,
**kwargs,
) -> MarshalerNode:
if output_dimension_spec is None:
if inputs:
# set output dimension spec as first input's spec
if isinstance(inputs, Dict):
first_signal = self._get_input_signal(next(iter(inputs.items()))[1])
all_signals = [self._get_input_signal(input) for input in inputs.values()]
else:
first_signal = self._get_input_signal(inputs[0])
all_signals = [self._get_input_signal(input) for input in inputs]
output_dimension_spec = first_signal.domain_spec.dimension_spec
if not output_dim_links:
# if output dim links are not provided either, then again prefer linking to the first input. But
# check if that input is dependent or not. If dependent, then only already materialized dimensions
# from it can be mapped to the output, for other dimensions go over independent inputs and try to find
# the same dimension (with the same name) in them.
output_signal = first_signal.clone(None)
if not output_signal.is_dependent: # first signal -> reference, nearest, etc
output_dim_links = [
(
dim_name,
None,
SignalDimensionTuple(
output_signal, output_signal.domain_spec.dimension_spec.find_dimension_by_name(dim_name)
),
)
for dim_name in output_signal.domain_spec.dimension_spec.get_flattened_dimension_map().keys()
]
else:
# we have to make sure non-materialized dimensions are mapped from other signals.
# we cannot allow non-materialized dimension mapping to the output from a dependent signal.
output_dim_links = []
independent_signals = [input for input in all_signals if not input.is_dependent]
for dim_name, dim in output_signal.domain_spec.dimension_spec.get_flattened_dimension_map().items():
dim_variant = output_signal.domain_spec.dimension_filter_spec.find_dimension_by_name(dim_name)
if dim_variant.is_material_value():
# assign as literal value, otherwise output would require dependent signal at runtime to be
# materialized (until RuntimeLinkNode::materialized_output would support already materialized dimensions).
# RuntimeLinkNode::_check_dependents rely on materialized_output to compensate dependents,
# which in turn would rely on a dependent if we don't use literal assignment.
# output_dim_links.append((dim_name, None, SignalDimensionTuple(output_signal, dim)))
output_dim_links.append((dim_name, None, dim_variant.value))
else:
# if not material, then try to link the dimension from any of the independent signals
link = None
# TODO once the graph analyzer is added, use it retrieve to even from dependent signals
# just make sure that directly or transitively the link leads to a source dimension from
# a independent signal
for other_indep_input in independent_signals:
other_dim = other_indep_input.domain_spec.dimension_spec.find_dimension_by_name(dim_name)
if other_dim:
link = (dim_name, None, SignalDimensionTuple(other_indep_input.clone(None), other_dim))
break
if link is None:
raise ValueError(
f"Cannot link unmaterialized dimension {dim_name!r} from dependent "
f"input {first_signal.alias!r} to output {id!r}"
)
output_dim_links.append(link)
else:
output_dimension_spec = DimensionSpec()
output_dim_links = []
if not compute_targets:
raise ValueError(f"Cannot create data node {id!r} without compute targets! " f"Please define the param compute_targets!")
if isinstance(compute_targets, str):
# TODO add this default batch-compute generation to batch_compute driver as an abstract classmethod
# then remove this overwrite (keep everything in Application::create_data)
compute_targets = [
GlueBatchCompute(compute_targets, WorkerType=GlueWorkerType.G_1X.value, NumberOfWorkers=100, Timeout=12 * 60) # 72 hours
]
if not is_update:
return super(AWSApplication, self).create_data(
id,
inputs,
input_dim_links if input_dim_links else [],
output_dimension_spec,
output_dim_links if output_dim_links else [],
compute_targets,
execution_hook,
pending_node_hook,
pending_node_expiration_ttl_in_secs,
auto_input_dim_linking_enabled,
auto_output_dim_linking_enabled,
auto_backfilling_enabled,
protocol,
**kwargs,
)
else:
return super(AWSApplication, self).update_data(
id,
inputs,
input_dim_links if input_dim_links else [],
output_dimension_spec,
output_dim_links if output_dim_links else [],
compute_targets,
execution_hook,
pending_node_hook,
pending_node_expiration_ttl_in_secs,
auto_input_dim_linking_enabled,
auto_output_dim_linking_enabled,
auto_backfilling_enabled,
protocol,
enforce_referential_integrity,
**kwargs,
)
def _load_external_data(
self,
external_data_node: ExternalDataNode,
output: Union[MarshalingView, MarshalerNode],
materialized_output: Signal,
materialized_path: str,
limit: int = None,
) -> Iterator[Tuple[str, bytes]]:
"""Returns an iterator of Tuple[physical_path, BLOB] for the external data, in a platform specific way"""
platform = self._get_platform_for(external_data_node.signal())
aws_configuration: AWSConfiguration = cast(AWSConfiguration, platform.conf)
session = aws_configuration.get_param(AWSCommonParams.BOTO_SESSION)
region = aws_configuration.get_param(AWSCommonParams.REGION)
if materialized_output.resource_access_spec.source == SignalSourceType.S3:
s3_spec = cast(S3SignalSourceAccessSpec, materialized_output.resource_access_spec)
bucket_name = s3_spec.bucket
s3 = session.resource("s3", region_name=region)
bucket = get_bucket(s3, bucket_name)
materialized_paths = materialized_output.get_materialized_resource_paths()
for materialized_path in materialized_paths:
prefix = materialized_path.replace(f"s3://{bucket_name}/", "")
objects_in_folder = list_objects(bucket, prefix)
for object in objects_in_folder:
key = object.key
body = object.get()["Body"].read()
yield (f"{materialized_path}/{key}", body)
elif materialized_output.resource_access_spec.source == SignalSourceType.GLUE_TABLE:
if not self.get_data(external_data_node.data_id, self.QueryApplicationScope.ALL, self.QueryContext.DEV_CONTEXT):
error_str = (
f"Data node {external_data_node.data_id!r} does not exist in the current development "
f"context! If you refer it from the already active context of the application, try "
f"to attach it via Application::attach to pull the active nodes into your dev context."
)
logger.error(error_str)
raise ValueError(error_str)
# since we are about to modify the DAG temporarily push the dev-state.
self.save_dev_state()
# we will be modifying the active DAG and at the end of this operation restore it, so save it for later.
active_state = copy.deepcopy(self._active_context) if self._active_context else None
temp_data = self.create_data(
id=str(uuid.uuid1()),
inputs=[output],
compute_targets=f"output={materialized_output.alias}{f'.limit({limit})' if limit else ''}",
)
try:
# 'output' is materialized, so is 'temp_data' (since it by default adapts its spec and filter.
self.execute(temp_data)
# data should be ready now
# since it is internal, we can do the following conveniently (same logic as internal data handling
# within 'load_data')
data_it = self.platform.storage.load_internal(temp_data.signal())
for data in data_it:
path, _ = data
schema_file = temp_data.signal().resource_access_spec.data_schema_file
if not schema_file or not path.lower().endswith(schema_file.lower()):
success_file = temp_data.signal().get_required_resource_name().lower()
if not success_file or not path.lower().endswith(success_file):
yield data
finally:
# restore previous active state
# TODO support Application::delete_data (with dependency check)
# after the the following two operations won't be necessary
complete_cleanup = True
try:
# at this point, temp is created off of output. their specs are same. cloning output's materialized
# filter into the temp signals filter will make it materialized as well.
materialized_temp_output = temp_data.signal().filter(materialized_output.domain_spec.dimension_filter_spec)
if not platform.storage.delete_internal(materialized_temp_output):
complete_cleanup = False
except Exception:
complete_cleanup = False
if not complete_cleanup:
logger.critical(
f"There has been problems during the clean-up after the load data operation on {external_data_node.data_id!r}"
)
logger.critical(
f"Please try to clean-up following resources manually: {temp_data.signal().get_materialized_resource_paths()!r}"
)
if active_state:
self._dev_context = active_state
self.activate()