1+ #!/usr/bin/env python
2+ # -*- coding: utf-8 -*-
3+ """
4+ Time : 2024/10/12 下午2:13
5+ Author : xuzh
6+ Project : hemera_indexer
7+ """
8+ import logging
9+ from typing import Any , Dict , List , Optional , Sequence , Tuple , Union , cast
10+
11+ import eth_abi
12+ from ens .utils import get_abi_output_types
13+ from eth_abi import abi
14+ from eth_abi .codec import ABICodec
15+ from eth_typing import HexStr
16+ from eth_utils import encode_hex , to_hex
17+ from hexbytes import HexBytes
18+ from web3 ._utils .abi import (
19+ exclude_indexed_event_inputs ,
20+ get_abi_input_types ,
21+ get_indexed_event_inputs ,
22+ map_abi_data ,
23+ named_tree ,
24+ )
25+ from web3 ._utils .contracts import decode_transaction_data
26+ from web3 ._utils .normalizers import BASE_RETURN_NORMALIZERS
27+ from web3 .types import ABIEvent , ABIFunction
28+
29+ from common .utils .format_utils import bytes_to_hex_str , convert_bytes_to_hex , convert_dict , hex_str_to_bytes
30+ from indexer .utils .abi import (
31+ abi_address_to_hex ,
32+ abi_bytes_to_bytes ,
33+ abi_string_to_text ,
34+ codec ,
35+ event_log_abi_to_topic ,
36+ function_abi_to_4byte_selector_str ,
37+ get_types_from_abi_type_list ,
38+ )
39+
40+ abi_codec = ABICodec (eth_abi .registry .registry )
41+
42+
43+ class Event :
44+ def __init__ (self , event_abi : ABIEvent ):
45+ self ._event_abi = event_abi
46+ self ._signature = event_log_abi_to_topic (event_abi )
47+
48+ def get_abi (self ) -> ABIEvent :
49+ return self ._event_abi
50+
51+ def get_signature (self ) -> str :
52+ return self ._signature
53+
54+ def decode_log (self , log ) -> Optional [Dict [str , Any ]]:
55+ return decode_log (self ._event_abi , log )
56+
57+ def decode_log_ignore_indexed (self , log ) -> Optional [Dict [str , Any ]]:
58+ return decode_log_ignore_indexed (self ._event_abi , log )
59+
60+
61+ def decode_log_ignore_indexed (
62+ fn_abi : ABIEvent ,
63+ log ,
64+ ) -> Optional [Dict [str , Any ]]:
65+ from indexer .domain .log import Log
66+
67+ if not isinstance (log , Log ):
68+ raise ValueError (f"log: { log } is not a Log instance" )
69+
70+ data_types = get_indexed_event_inputs (fn_abi ) + exclude_indexed_event_inputs (fn_abi )
71+ decoded_data = decode_data ([t ["type" ] for t in data_types ], log .get_topic_with_data ())
72+ data = named_tree (data_types , decoded_data )
73+ return data
74+
75+
76+ def decode_log (
77+ fn_abi : ABIEvent ,
78+ log ,
79+ ) -> Optional [Dict [str , Any ]]:
80+ from indexer .domain .log import Log
81+
82+ if not isinstance (log , Log ):
83+ raise ValueError (f"log: { log } is not a Log instance" )
84+
85+ try :
86+ indexed_types = get_indexed_event_inputs (fn_abi )
87+ for indexed_type in indexed_types :
88+ if indexed_type ["type" ] == "string" :
89+ indexed_type ["type" ] = "bytes32"
90+
91+ data_types = exclude_indexed_event_inputs (fn_abi )
92+
93+ decode_indexed = decode_data (get_types_from_abi_type_list (indexed_types ), log .get_bytes_topics ())
94+ indexed = named_tree (indexed_types , decode_indexed )
95+
96+ decoded_data = decode_data (get_types_from_abi_type_list (data_types ), log .get_bytes_data ())
97+ data = named_tree (data_types , decoded_data )
98+ except Exception as e :
99+ logging .warning (f"Failed to decode log: { e } , log: { log } " )
100+ return None
101+
102+ return {** indexed , ** data }
103+
104+
105+ class Function :
106+ def __init__ (self , function_abi : ABIFunction ):
107+ self ._function_abi = function_abi
108+ self ._signature = function_abi_to_4byte_selector_str (function_abi )
109+ self ._inputs_type = get_abi_input_types (function_abi )
110+ self ._outputs_type = get_abi_output_types (function_abi )
111+
112+ def get_abi (self ) -> ABIFunction :
113+ return self ._function_abi
114+
115+ def get_signature (self ) -> str :
116+ return self ._signature
117+
118+ def get_inputs_type (self ) -> List [str ]:
119+ return self ._inputs_type
120+
121+ def get_outputs_type (self ) -> List [str ]:
122+ return self ._outputs_type
123+
124+ def decode_data (self , data : str ) -> Optional [Dict [str , Any ]]:
125+ try :
126+ decoded = decode_data (self ._inputs_type , hex_str_to_bytes (data )[4 :])
127+ decoded = named_tree (self ._function_abi ["inputs" ], decoded )
128+ return decoded
129+ except Exception as e :
130+ logging .warning (f"Failed to decode transaction input data: { e } , input data: { data } " )
131+ return None
132+
133+
134+ def decode_transaction_data (
135+ fn_abi : ABIFunction ,
136+ data : str ,
137+ ) -> Optional [Dict [str , Any ]]:
138+ try :
139+ types = get_abi_input_types (fn_abi )
140+ decoded = decode_data (types , hex_str_to_bytes (data [4 :]))
141+ decoded = named_tree (fn_abi ["inputs" ], decoded )
142+ return decoded
143+ except Exception as e :
144+ logging .warning (f"Failed to decode transaction input data: { e } , input data: { data } " )
145+ return None
146+
147+
148+ def decode_data (decode_type : Union [Sequence [str ], List [str ], str ], data : bytes ) -> Tuple [Any , ...]:
149+ if isinstance (decode_type , str ):
150+ data = abi_codec .decode ([decode_type ], data )
151+ elif isinstance (decode_type , list ):
152+ for tpe in decode_type :
153+ if not isinstance (tpe , str ):
154+ raise ValueError (f"Invalid decode_type: { decode_type } is not a List[str]" )
155+ try :
156+ data = abi_codec .decode (decode_type , data )
157+ except Exception as e :
158+ print (f"Failed to decode data: { e } " )
159+ else :
160+ raise ValueError (f"Invalid decode_type: { decode_type } , it should be str or list[str]" )
161+ return data
162+
163+
164+ def encode_data (
165+ abi : ABIFunction ,
166+ arguments : Sequence [Any ],
167+ data : str = None ,
168+ ) -> HexStr :
169+ argument_types = get_abi_input_types (abi )
170+
171+ normalizers = [
172+ abi_address_to_hex ,
173+ abi_bytes_to_bytes ,
174+ abi_string_to_text ,
175+ ]
176+
177+ normalized_arguments = map_abi_data (
178+ normalizers ,
179+ argument_types ,
180+ arguments ,
181+ )
182+ encoded_arguments = codec .encode (
183+ argument_types ,
184+ normalized_arguments ,
185+ )
186+ if data :
187+ return to_hex (HexBytes (data ) + encoded_arguments )
188+ else :
189+ return encode_hex (encoded_arguments )
190+
191+
192+ def decode_log_data (types , data_str ):
193+ data_hex_str = hex_str_to_bytes (data_str )
194+ decoded_abi = decode_data (types , data_hex_str )
195+
196+ encoded_abi = []
197+ decoded_abi_real = []
198+ for index in range (len (types )):
199+ encoded_abi .append (bytes_to_hex_str (abi .encode (types [index : index + 1 ], decoded_abi [index : index + 1 ])))
200+
201+ if types [index ].startswith ("byte" ):
202+ if type (decoded_abi [index ]) is tuple :
203+ encode_tuple = []
204+ for element in decoded_abi [index ]:
205+ encode_tuple .append (bytes_to_hex_str (element ))
206+ decoded_abi_real .append (encode_tuple )
207+ else :
208+ decoded_abi_real .append (bytes_to_hex_str (decoded_abi [index ]))
209+ else :
210+ decoded_abi_real .append (str (decoded_abi [index ]))
211+
212+ return decoded_abi_real , encoded_abi
213+
214+
215+ def decode_function (function_abi_json , data_str , output_str ):
216+ if data_str is not None and len (data_str ) > 0 :
217+ input = decode_transaction_data (
218+ cast (ABIFunction , function_abi_json ),
219+ data_str ,
220+ normalizers = BASE_RETURN_NORMALIZERS ,
221+ )
222+ input = convert_dict (convert_bytes_to_hex (input ))
223+ else :
224+ input = []
225+
226+ if output_str is not None and len (output_str ) > 0 :
227+ types = get_abi_output_types (cast (ABIFunction , function_abi_json ))
228+ data = hex_str_to_bytes (output_str )
229+ value = decode_data (types , data )
230+ output = named_tree (function_abi_json ["outputs" ], value )
231+ output = convert_dict (convert_bytes_to_hex (output ))
232+ else :
233+ output = []
234+ return input , output
0 commit comments