Skip to content

Commit

Permalink
lint codes
Browse files Browse the repository at this point in the history
  • Loading branch information
paer committed Jul 13, 2023
1 parent ae6599c commit cd08616
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 99 deletions.
97 changes: 3 additions & 94 deletions fed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def __init__(self, raw_bytes: bytes) -> None:

@property
def cross_silo_comm_config(self):
return self._data.get(fed_constants.KEY_OF_CROSS_SILO_COMM_CONFIG, CrossSiloCommConfig())
return self._data.get(
fed_constants.KEY_OF_CROSS_SILO_COMM_CONFIG,
CrossSiloCommConfig())


# A module level cache for the cluster configurations.
Expand Down Expand Up @@ -71,98 +73,6 @@ def get_job_config():
return _job_config


# class CrossSiloCommConfig:
# """A class to store parameters used for Proxy Actor

# Attributes:
# proxy_max_restarts: The max restart times for the send proxy.
# serializing_allowed_list: The package or class list allowed for
# serializing(deserializating) cross silos. It's used for avoiding pickle
# deserializing execution attack when crossing solis.
# send_resource_label: Customized resource label, the SendProxyActor
# will be scheduled based on the declared resource label. For example,
# when setting to `{"my_label": 1}`, then the SendProxyActor will be started
# only on Nodes with `{"resource": {"my_label": $NUM}}` where $NUM >= 1.
# recv_resource_label: Customized resource label, the RecverProxyActor
# will be scheduled based on the declared resource label. For example,
# when setting to `{"my_label": 1}`, then the RecverProxyActor will be started
# only on Nodes with `{"resource": {"my_label": $NUM}}` where $NUM >= 1.
# exit_on_sending_failure: whether exit when failure on
# cross-silo sending. If True, a SIGTERM will be signaled to self
# if failed to sending cross-silo data.
# messages_max_size_in_bytes: The maximum length in bytes of
# cross-silo messages.
# If None, the default value of 500 MB is specified.
# timeout_in_seconds: The timeout in seconds of a cross-silo RPC call.
# It's 60 by default.
# http_header: The HTTP header, e.g. metadata in grpc, sent with the RPC request.
# This won't override basic tcp headers, such as `user-agent`, but concat
# them together.
# """
# def __init__(
# self,
# proxy_max_restarts: int = None,
# timeout_in_seconds: int = 60,
# messages_max_size_in_bytes: int = None,
# exit_on_sending_failure: Optional[bool] = False,
# serializing_allowed_list: Optional[Dict[str, str]] = None,
# send_resource_label: Optional[Dict[str, str]] = None,
# recv_resource_label: Optional[Dict[str, str]] = None,
# http_header: Optional[Dict[str, str]] = None) -> None:
# self.proxy_max_restarts = proxy_max_restarts
# self.timeout_in_seconds = timeout_in_seconds
# self.messages_max_size_in_bytes = messages_max_size_in_bytes
# self.exit_on_sending_failure = exit_on_sending_failure
# self.serializing_allowed_list = serializing_allowed_list
# self.send_resource_label = send_resource_label
# self.recv_resource_label = recv_resource_label
# self.http_header = http_header

# def __json__(self):
# return json.dumps(self.__dict__)

# @classmethod
# def from_json(cls, json_str):
# data = json.loads(json_str)
# return cls(**data)


# class CrossSiloGrpcCommConfig(CrossSiloCommConfig):
# """A class to store parameters used for GRPC communication

# Attributes:
# grpc_retry_policy: a dict descibes the retry policy for
# cross silo rpc call. If None, the following default retry policy
# will be used. More details please refer to
# `retry-policy <https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy>`_. # noqa

# .. code:: python
# {
# "maxAttempts": 4,
# "initialBackoff": "0.1s",
# "maxBackoff": "1s",
# "backoffMultiplier": 2,
# "retryableStatusCodes": [
# "UNAVAILABLE"
# ]
# }
# grpc_channel_options: A list of tuples to store GRPC channel options,
# e.g. [
# ('grpc.enable_retries', 1),
# ('grpc.max_send_message_length', 50 * 1024 * 1024)
# ]
# """
# def __init__(self,
# grpc_channel_options: List = None,
# grpc_retry_policy: Dict[str, str] = None,
# *args,
# **kwargs):
# super().__init__(*args, **kwargs)
# self.grpc_retry_policy = grpc_retry_policy
# self.grpc_channel_options = grpc_channel_options



@dataclass
class CrossSiloCommConfig:
"""A class to store parameters used for Proxy Actor
Expand Down Expand Up @@ -226,7 +136,6 @@ def from_dict(cls, data: Dict):
return cls(**filtered_data)



@dataclass
class CrossSiloGrpcCommConfig(CrossSiloCommConfig):
"""A class to store parameters used for GRPC communication
Expand Down
5 changes: 3 additions & 2 deletions fed/proxy/barriers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(
cluster: Dict,
party: str,
tls_config: Dict,
proxy_config: CrossSiloCommConfig=None
proxy_config: CrossSiloCommConfig = None
) -> None:
self._cluster = cluster
self._party = party
Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(
listen_addr: str,
party: str,
tls_config: Dict,
proxy_config: CrossSiloCommConfig=None
proxy_config: CrossSiloCommConfig = None
) -> None:
self._listen_addr = listen_addr
self._party = party
Expand Down Expand Up @@ -184,6 +184,7 @@ async def _get_cluster_info(self):
async def _get_proxy_config(self, dest_party=None):
return await self._proxy_instance.get_proxy_config(dest_party)


@ray.remote
class RecverProxyActor:
def __init__(
Expand Down
7 changes: 4 additions & 3 deletions fed/proxy/grpc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def parse_grpc_options(proxy_config: CrossSiloCommConfig):
if proxy_config is not None:
if proxy_config.messages_max_size_in_bytes is not None:
grpc_channel_options.update({
'grpc.max_send_message_length':
'grpc.max_send_message_length':
proxy_config.messages_max_size_in_bytes,
'grpc.max_receive_message_length':
proxy_config.messages_max_size_in_bytes
Expand Down Expand Up @@ -70,7 +70,7 @@ def __init__(
cluster: Dict,
party: str,
tls_config: Dict,
proxy_config: CrossSiloCommConfig=None
proxy_config: CrossSiloCommConfig = None
) -> None:
super().__init__(cluster, party, tls_config, proxy_config)
self._grpc_metadata = proxy_config.http_header or {}
Expand Down Expand Up @@ -100,7 +100,8 @@ async def send(
channel = grpc.aio.secure_channel(
dest_addr, credentials, options=grpc_channel_options)
else:
channel = grpc.aio.insecure_channel(dest_addr, options=grpc_channel_options)
channel = grpc.aio.insecure_channel(
dest_addr, options=grpc_channel_options)
stub = fed_pb2_grpc.GrpcServiceStub(channel)
self._stubs[dest_party] = stub

Expand Down
1 change: 1 addition & 0 deletions tests/test_transport_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from fed.grpc import fed_pb2_in_protobuf3 as fed_pb2
from fed.grpc import fed_pb2_grpc_in_protobuf3 as fed_pb2_grpc


def test_n_to_1_transport():
"""This case is used to test that we have N send_op barriers,
sending data to the target recver proxy, and there also have
Expand Down

0 comments on commit cd08616

Please sign in to comment.