Skip to content

Commit a928d3c

Browse files
authored
Add Agni Feature (HemeraProtocol#138)
1 parent 743457a commit a928d3c

14 files changed

+1079
-2
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[5000]
2+
nft_address = 0x218bf598d1453383e2f4aa7b14ffb9bfb102d637
3+
factory_address = 0x25780dc8fc3cfbd75f33bfdab65e969b603b2035
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
import configparser
2+
import json
3+
import logging
4+
import os
5+
from collections import defaultdict
6+
from dataclasses import fields
7+
from itertools import groupby
8+
from operator import attrgetter
9+
from typing import Dict, List
10+
11+
import eth_abi
12+
from web3 import Web3
13+
14+
from indexer.domain.log import Log
15+
from indexer.domain.transaction import Transaction
16+
from indexer.executors.batch_work_executor import BatchWorkExecutor
17+
from indexer.jobs import FilterTransactionDataJob
18+
from indexer.modules.custom import common_utils
19+
from indexer.modules.custom.uniswap_v3 import constants, util
20+
from indexer.modules.custom.uniswap_v3.constants import AGNI_ABI
21+
from indexer.modules.custom.uniswap_v3.domain.feature_uniswap_v3 import (
22+
AgniV3Pool,
23+
AgniV3PoolCurrentPrice,
24+
AgniV3PoolPrice,
25+
AgniV3SwapEvent,
26+
)
27+
from indexer.modules.custom.uniswap_v3.models.feature_uniswap_v3_pools import UniswapV3Pools
28+
from indexer.specification.specification import TopicSpecification, TransactionFilterByLogs
29+
from indexer.utils.json_rpc_requests import generate_eth_call_json_rpc
30+
from indexer.utils.utils import rpc_response_to_result, zip_rpc_response
31+
32+
logger = logging.getLogger(__name__)
33+
34+
35+
class AgniPoolJob(FilterTransactionDataJob):
36+
dependency_types = [Transaction, Log]
37+
output_types = [
38+
AgniV3Pool,
39+
AgniV3PoolPrice,
40+
AgniV3PoolCurrentPrice,
41+
AgniV3SwapEvent,
42+
]
43+
able_to_reorg = True
44+
45+
def __init__(self, **kwargs):
46+
super().__init__(**kwargs)
47+
self._batch_work_executor = BatchWorkExecutor(
48+
kwargs["batch_size"],
49+
kwargs["max_workers"],
50+
job_name=self.__class__.__name__,
51+
)
52+
self._is_batch = kwargs["batch_size"] > 1
53+
self._service = kwargs["config"].get("db_service")
54+
self._chain_id = common_utils.get_chain_id(self._web3)
55+
self._load_config("agni_config.ini", self._chain_id)
56+
self._exist_pools = get_exist_pools(self._service, self._position_token_address)
57+
self._batch_size = kwargs["batch_size"]
58+
self._max_worker = kwargs["max_workers"]
59+
self._abi_list = AGNI_ABI
60+
self._create_pool_topic0 = constants.UNISWAP_V3_CREATE_POOL_TOPIC0
61+
self._pool_price_topic0_list = constants.AGNI_POOL_PRICE_TOPIC0_LIST
62+
63+
def get_filter(self):
64+
return TransactionFilterByLogs(
65+
[
66+
TopicSpecification(addresses=[self._factory_address], topics=[self._create_pool_topic0]),
67+
TopicSpecification(topics=self._pool_price_topic0_list),
68+
]
69+
)
70+
71+
def _load_config(self, filename, chain_id):
72+
base_path = os.path.dirname(os.path.abspath(__file__))
73+
full_path = os.path.join(base_path, filename)
74+
config = configparser.ConfigParser()
75+
config.read(full_path)
76+
chain_id_str = str(chain_id)
77+
try:
78+
chain_config = config[chain_id_str]
79+
except KeyError:
80+
return
81+
try:
82+
self._position_token_address = chain_config.get("nft_address").lower()
83+
self._factory_address = chain_config.get("factory_address").lower()
84+
except (configparser.NoOptionError, configparser.NoSectionError) as e:
85+
raise ValueError(f"Missing required configuration in {filename}: {str(e)}")
86+
87+
def _collect(self, **kwargs):
88+
logs = self._data_buff[Log.type()]
89+
self._batch_work_executor.execute(logs, self._collect_pool_batch, len(logs))
90+
self._batch_work_executor.wait()
91+
92+
collected_pools = self._data_buff[AgniV3Pool.type()]
93+
for data in collected_pools:
94+
self._exist_pools[data.pool_address] = data
95+
transactions = self._data_buff[Transaction.type()]
96+
self._transaction_hash_from_dict = {}
97+
for transaction in transactions:
98+
self._transaction_hash_from_dict[transaction.hash] = transaction.from_address
99+
self._batch_work_executor.execute(logs, self._collect_price_batch, len(logs))
100+
self._batch_work_executor.wait()
101+
self._transaction_hash_from_dict = {}
102+
self._process_current_pool_prices()
103+
104+
def _collect_pool_batch(self, logs):
105+
for log in logs:
106+
address = log.address
107+
current_topic0 = log.topic0
108+
if self._factory_address != address or self._create_pool_topic0 != current_topic0:
109+
continue
110+
entity = decode_pool_created(self._position_token_address, self._factory_address, log)
111+
self._collect_item(AgniV3Pool.type(), entity)
112+
113+
def _collect_price_batch(self, logs):
114+
unique_logs = set()
115+
for log in logs:
116+
if log.address not in self._exist_pools:
117+
continue
118+
# Collect swap logs
119+
if log.topic0 == constants.UNISWAP_V3_POOL_SWAP_TOPIC0:
120+
transaction_hash = log.transaction_hash
121+
part1, part2, part3, part4, part5 = split_swap_data_hex_string(log.data)
122+
amount0 = util.parse_hex_to_int256(part1)
123+
amount1 = util.parse_hex_to_int256(part2)
124+
sqrt_price_x96 = util.parse_hex_to_int256(part3)
125+
liquidity = util.parse_hex_to_int256(part4)
126+
tick = util.parse_hex_to_int256(part5)
127+
pool_data = self._exist_pools[log.address]
128+
self._collect_item(
129+
AgniV3SwapEvent.type(),
130+
AgniV3SwapEvent(
131+
pool_address=log.address,
132+
position_token_address=self._position_token_address,
133+
transaction_hash=transaction_hash,
134+
transaction_from_address=self._transaction_hash_from_dict[transaction_hash],
135+
log_index=log.log_index,
136+
block_number=log.block_number,
137+
block_timestamp=log.block_timestamp,
138+
sender=util.parse_hex_to_address(log.topic1),
139+
recipient=util.parse_hex_to_address(log.topic2),
140+
amount0=amount0,
141+
amount1=amount1,
142+
liquidity=liquidity,
143+
tick=tick,
144+
sqrt_price_x96=sqrt_price_x96,
145+
token0_address=pool_data.token0_address,
146+
token1_address=pool_data.token1_address,
147+
),
148+
)
149+
log_tuple = (log.address, log.block_number, log.block_timestamp)
150+
unique_logs.add(log_tuple)
151+
requests = [
152+
{"pool_address": address, "block_number": block_number, "block_timestamp": block_timestamp}
153+
for address, block_number, block_timestamp in unique_logs
154+
]
155+
pool_prices = slot0_rpc_requests(
156+
self._web3,
157+
self._batch_web3_provider.make_request,
158+
requests,
159+
self._is_batch,
160+
self._abi_list,
161+
self._batch_size,
162+
self._max_worker,
163+
)
164+
for data in pool_prices:
165+
detail = AgniV3PoolPrice(
166+
factory_address=self._factory_address,
167+
pool_address=data["pool_address"],
168+
sqrt_price_x96=data["sqrtPriceX96"],
169+
tick=data["tick"],
170+
block_number=data["block_number"],
171+
block_timestamp=data["block_timestamp"],
172+
)
173+
self._collect_item(AgniV3PoolPrice.type(), detail)
174+
175+
def _process(self, **kwargs):
176+
self._data_buff[AgniV3Pool.type()].sort(key=lambda x: x.block_number)
177+
self._data_buff[AgniV3PoolPrice.type()].sort(key=lambda x: x.block_number)
178+
self._data_buff[AgniV3PoolCurrentPrice.type()].sort(key=lambda x: x.block_number)
179+
self._data_buff[AgniV3SwapEvent.type()].sort(key=lambda x: x.block_number)
180+
181+
def _process_current_pool_prices(self):
182+
prices = self._data_buff[AgniV3PoolPrice.type()]
183+
self._data_buff[AgniV3PoolPrice.type()] = []
184+
unique_prices = {}
185+
for price in prices:
186+
key = (price.pool_address, price.block_number)
187+
unique_prices[key] = price
188+
189+
for price in unique_prices.values():
190+
self._collect_item(AgniV3PoolPrice.type(), price)
191+
192+
sorted_prices = sorted(unique_prices.values(), key=lambda x: (x.pool_address, x.block_number))
193+
current_prices = [
194+
max(group, key=attrgetter("block_number"))
195+
for _, group in groupby(sorted_prices, key=attrgetter("pool_address"))
196+
]
197+
for data in current_prices:
198+
self._collect_item(AgniV3PoolCurrentPrice.type(), self.create_current_price_status(data))
199+
200+
@staticmethod
201+
def create_current_price_status(detail: AgniV3PoolPrice) -> AgniV3PoolPrice:
202+
return AgniV3PoolCurrentPrice(
203+
**{field.name: getattr(detail, field.name) for field in fields(AgniV3PoolCurrentPrice)}
204+
)
205+
206+
207+
def decode_pool_created(position_token_address, factory_address, log):
208+
token0_address = util.parse_hex_to_address(log.topic1)
209+
token1_address = util.parse_hex_to_address(log.topic2)
210+
fee = util.parse_hex_to_int256(log.topic3)
211+
tick_hex, pool_hex = split_hex_string(log.data)
212+
pool_address = util.parse_hex_to_address(pool_hex)
213+
tick_spacing = util.parse_hex_to_int256(tick_hex)
214+
return AgniV3Pool(
215+
position_token_address=position_token_address,
216+
factory_address=factory_address,
217+
pool_address=pool_address,
218+
token0_address=token0_address,
219+
token1_address=token1_address,
220+
fee=fee,
221+
tick_spacing=tick_spacing,
222+
block_number=log.block_number,
223+
block_timestamp=log.block_timestamp,
224+
)
225+
226+
227+
def split_hex_string(hex_string):
228+
if hex_string.startswith("0x"):
229+
hex_string = hex_string[2:]
230+
231+
if len(hex_string) == 128:
232+
part1 = hex_string[:64]
233+
part2 = hex_string[64:]
234+
return part1, part2
235+
else:
236+
raise ValueError("The data is not belong to Agni Factory")
237+
238+
239+
def get_exist_pools(db_service, position_token_address):
240+
if not db_service:
241+
return {}
242+
243+
session = db_service.get_service_session()
244+
try:
245+
result = (
246+
session.query(UniswapV3Pools)
247+
.filter(UniswapV3Pools.position_token_address == bytes.fromhex(position_token_address[2:]))
248+
.all()
249+
)
250+
history_pools = {}
251+
if result is not None:
252+
for item in result:
253+
pool_key = "0x" + item.pool_address.hex()
254+
history_pools[pool_key] = AgniV3Pool(
255+
position_token_address="0x" + item.position_token_address.hex(),
256+
pool_address=pool_key,
257+
token0_address="0x" + item.token0_address.hex(),
258+
token1_address="0x" + item.token1_address.hex(),
259+
factory_address="0x" + item.factory_address.hex(),
260+
fee=item.fee,
261+
tick_spacing=item.tick_spacing,
262+
block_number=item.block_number,
263+
block_timestamp=item.block_timestamp,
264+
)
265+
except Exception as e:
266+
raise e
267+
finally:
268+
session.close()
269+
270+
return history_pools
271+
272+
273+
def slot0_rpc_requests(web3, make_requests, requests, is_batch, abi_list, batch_size, max_worker):
274+
if len(requests) == 0:
275+
return []
276+
fn_name = "slot0"
277+
function_abi = next((abi for abi in abi_list if abi["name"] == fn_name and abi["type"] == "function"), None)
278+
outputs = function_abi["outputs"]
279+
output_types = [output["type"] for output in outputs]
280+
281+
parameters = common_utils.build_no_input_method_data(web3, requests, fn_name, abi_list)
282+
token_name_rpc = list(generate_eth_call_json_rpc(parameters))
283+
if is_batch:
284+
response = make_requests(params=json.dumps(token_name_rpc))
285+
else:
286+
response = [make_requests(params=json.dumps(token_name_rpc[0]))]
287+
288+
token_infos = []
289+
for data in list(zip_rpc_response(parameters, response)):
290+
result = rpc_response_to_result(data[1])
291+
pool = data[0]
292+
value = result[2:] if result is not None else None
293+
try:
294+
part1, part2 = get_price_and_tick_from_hex(value)
295+
pool["sqrtPriceX96"] = part1
296+
pool["tick"] = part2
297+
except Exception as e:
298+
logger.error(f"Decoding {fn_name} failed. " f"rpc response: {result}. " f"exception: {e}")
299+
token_infos.append(pool)
300+
return token_infos
301+
302+
303+
def get_price_and_tick_from_hex(hex_string):
304+
if hex_string.startswith("0x"):
305+
hex_string = hex_string[2:]
306+
part1 = hex_string[:64]
307+
part2 = hex_string[64:128]
308+
return util.parse_hex_to_int256(part1), util.parse_hex_to_int256(part2)
309+
310+
311+
def split_swap_data_hex_string(hex_string):
312+
if hex_string.startswith("0x"):
313+
hex_string = hex_string[2:]
314+
if len(hex_string) == 320:
315+
part1 = hex_string[:64]
316+
part2 = hex_string[64:128]
317+
part3 = hex_string[128:192]
318+
part4 = hex_string[192:256]
319+
part5 = hex_string[256:]
320+
return part1, part2, part3, part4, part5
321+
else:
322+
raise ValueError("The data length is not suitable for this operation.")

0 commit comments

Comments
 (0)