Skip to content
This repository was archived by the owner on Feb 7, 2026. It is now read-only.

Commit 1435f00

Browse files
committed
Move Decryptor and Constants to Utils
1 parent 2b0e129 commit 1435f00

File tree

2 files changed

+57
-55
lines changed

2 files changed

+57
-55
lines changed

src/main.py

Lines changed: 1 addition & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
from typing import Iterator
2-
from Crypto.Cipher import AES
32
import xmltodict
43
import requests
54
import json
65
import os
76

87
# Helper modules
98
from src.keyholder import Keyholder
10-
from src.constants import Constants
11-
from src.crypto import Crypto
9+
from src.utils import Constants, Decryptor
1210

1311
# FastAPI
1412
from fastapi import FastAPI, HTTPException
@@ -48,56 +46,6 @@
4846
)
4947

5048

51-
# A custom iterator to decrypt the bytes without writing the whole file to the disk.
52-
class Decryptor:
53-
"""
54-
A custom iterator to decrypt the bytes without writing the whole file to the disk.
55-
"""
56-
57-
def __init__(self, response: requests.Response, key: str):
58-
self.iterator : Iterator = response.iter_content(chunk_size = 0x10000)
59-
# We need to unpad (basically modify) the last chunk,
60-
# so we need to learn when will the iterator end.
61-
# Because of that, we hold next chunks too.
62-
self.chunks = [next(self.iterator), None]
63-
self.cipher = AES.new(key, AES.MODE_ECB)
64-
65-
def __iter__(self):
66-
return self
67-
68-
def move(self):
69-
chunk = next(self.iterator, None)
70-
self.chunks = [chunk, self.chunks[0]]
71-
72-
def is_end(self):
73-
return self.chunks[0] == None
74-
75-
def is_start(self):
76-
return self.chunks[1] == None
77-
78-
def is_end_exceed(self):
79-
return self.chunks[0] == None and self.chunks[1] == None
80-
81-
def __next__(self):
82-
# Get the current chunk.
83-
current = self.chunks[1]
84-
returned = None
85-
# Check if ending point exceed.
86-
if self.is_end_exceed():
87-
raise StopIteration
88-
# Check if the chunk is starting point.
89-
if self.is_start():
90-
returned = b""
91-
# Check if the chunk is ending point.
92-
elif self.is_end():
93-
returned = Crypto.unpad(self.cipher.decrypt(current))
94-
else:
95-
returned = self.cipher.decrypt(current)
96-
# Shift to the next chunk and keep the previous one.
97-
self.move()
98-
return returned
99-
100-
10149
CSC_CODES = json.loads(open(os.path.join("src", "csc_list.json"), "r").read())
10250

10351

src/constants.py renamed to src/utils.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
from typing import Iterator
12
import dicttoxml
3+
from src.crypto import Crypto
4+
from Crypto.Cipher import AES
25

36

47
class Constants:
5-
68
# Get firmware information url
79
GET_FIRMWARE_URL = "http://fota-cloud-dn.ospserver.net/firmware/{0}/{1}/version.xml"
810

@@ -71,4 +73,56 @@ def parse_firmware(firmware: str) -> str:
7173
l[2] = l[0]
7274
return "/".join(l)
7375
else:
74-
return None
76+
return None
77+
78+
79+
# A custom iterator to decrypt the bytes without writing the whole file to the disk
80+
# for downloading firmwares.
81+
class Decryptor:
82+
"""
83+
A custom iterator to decrypt the bytes without writing the whole file to the disk
84+
for downloading firmwares.
85+
"""
86+
87+
def __init__(self, response, key: str):
88+
self.iterator : Iterator = response.iter_content(chunk_size = 0x10000)
89+
# We need to unpad (basically modify) the last chunk,
90+
# so we need to learn when will the iterator end.
91+
# Because of that, we hold next chunks too.
92+
self.chunks = [next(self.iterator), None]
93+
self.cipher = AES.new(key, AES.MODE_ECB)
94+
95+
def __iter__(self):
96+
return self
97+
98+
def move(self):
99+
chunk = next(self.iterator, None)
100+
self.chunks = [chunk, self.chunks[0]]
101+
102+
def is_end(self):
103+
return self.chunks[0] == None
104+
105+
def is_start(self):
106+
return self.chunks[1] == None
107+
108+
def is_end_exceed(self):
109+
return self.chunks[0] == None and self.chunks[1] == None
110+
111+
def __next__(self):
112+
# Get the current chunk.
113+
current = self.chunks[1]
114+
returned = None
115+
# Check if ending point exceed.
116+
if self.is_end_exceed():
117+
raise StopIteration
118+
# Check if the chunk is starting point.
119+
if self.is_start():
120+
returned = b""
121+
# Check if the chunk is ending point.
122+
elif self.is_end():
123+
returned = Crypto.unpad(self.cipher.decrypt(current))
124+
else:
125+
returned = self.cipher.decrypt(current)
126+
# Shift to the next chunk and keep the previous one.
127+
self.move()
128+
return returned

0 commit comments

Comments
 (0)