@@ -360,6 +360,7 @@ def __init__(
360360 tags : Optional [Dict [str , str ]] = None ,
361361 owner : Optional [str ] = "" ,
362362 timestamp_field : Optional [str ] = "" ,
363+ batch_source : Optional [DataSource ] = None ,
363364 ):
364365 super ().__init__ (
365366 event_timestamp_column = event_timestamp_column ,
@@ -372,6 +373,7 @@ def __init__(
372373 name = name ,
373374 timestamp_field = timestamp_field ,
374375 )
376+ self .batch_source = batch_source
375377 self .kafka_options = KafkaOptions (
376378 bootstrap_servers = bootstrap_servers ,
377379 message_format = message_format ,
@@ -411,6 +413,7 @@ def from_proto(data_source: DataSourceProto):
411413 description = data_source .description ,
412414 tags = dict (data_source .tags ),
413415 owner = data_source .owner ,
416+ batch_source = DataSource .from_proto (data_source .batch_source ),
414417 )
415418
416419 def to_proto (self ) -> DataSourceProto :
@@ -427,6 +430,8 @@ def to_proto(self) -> DataSourceProto:
427430 data_source_proto .timestamp_field = self .timestamp_field
428431 data_source_proto .created_timestamp_column = self .created_timestamp_column
429432 data_source_proto .date_partition_column = self .date_partition_column
433+ if self .batch_source :
434+ data_source_proto .batch_source .MergeFrom (self .batch_source .to_proto ())
430435 return data_source_proto
431436
432437 @staticmethod
@@ -546,6 +551,7 @@ def from_proto(data_source: DataSourceProto):
546551 description = data_source .description ,
547552 tags = dict (data_source .tags ),
548553 owner = data_source .owner ,
554+ batch_source = DataSource .from_proto (data_source .batch_source ),
549555 )
550556
551557 @staticmethod
@@ -569,6 +575,7 @@ def __init__(
569575 tags : Optional [Dict [str , str ]] = None ,
570576 owner : Optional [str ] = "" ,
571577 timestamp_field : Optional [str ] = "" ,
578+ batch_source : Optional [DataSource ] = None ,
572579 ):
573580 super ().__init__ (
574581 name = name ,
@@ -581,6 +588,7 @@ def __init__(
581588 owner = owner ,
582589 timestamp_field = timestamp_field ,
583590 )
591+ self .batch_source = batch_source
584592 self .kinesis_options = KinesisOptions (
585593 record_format = record_format , region = region , stream_name = stream_name
586594 )
@@ -618,6 +626,8 @@ def to_proto(self) -> DataSourceProto:
618626 data_source_proto .timestamp_field = self .timestamp_field
619627 data_source_proto .created_timestamp_column = self .created_timestamp_column
620628 data_source_proto .date_partition_column = self .date_partition_column
629+ if self .batch_source :
630+ data_source_proto .batch_source .MergeFrom (self .batch_source .to_proto ())
621631
622632 return data_source_proto
623633
@@ -634,6 +644,7 @@ class PushSource(DataSource):
634644
635645 def __init__ (
636646 self ,
647+ * ,
637648 name : str ,
638649 schema : Dict [str , ValueType ],
639650 batch_source : DataSource ,
@@ -693,8 +704,8 @@ def from_proto(data_source: DataSourceProto):
693704 for key , val in schema_pb .items ():
694705 schema [key ] = ValueType (val )
695706
696- assert data_source .push_options . HasField ("batch_source" )
697- batch_source = DataSource .from_proto (data_source .push_options . batch_source )
707+ assert data_source .HasField ("batch_source" )
708+ batch_source = DataSource .from_proto (data_source .batch_source )
698709
699710 return PushSource (
700711 name = data_source .name ,
@@ -714,9 +725,7 @@ def to_proto(self) -> DataSourceProto:
714725 if self .batch_source :
715726 batch_source_proto = self .batch_source .to_proto ()
716727
717- options = DataSourceProto .PushOptions (
718- schema = schema_pb , batch_source = batch_source_proto
719- )
728+ options = DataSourceProto .PushOptions (schema = schema_pb ,)
720729 data_source_proto = DataSourceProto (
721730 name = self .name ,
722731 type = DataSourceProto .PUSH_SOURCE ,
@@ -725,6 +734,7 @@ def to_proto(self) -> DataSourceProto:
725734 description = self .description ,
726735 tags = self .tags ,
727736 owner = self .owner ,
737+ batch_source = batch_source_proto ,
728738 )
729739
730740 return data_source_proto
0 commit comments