Last active
December 14, 2024 16:46
-
-
Save agatemosu/a23a82b0e4c677541d3ac5bc5f00b417 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from enum import Enum | |
HOST = "127.0.0.1" | |
PORT = 62775 | |
class Commands(Enum): | |
UPDATE = 0 | |
LOGOUT = 1 | |
SHUTDOWN = 2 | |
REBOOT = 3 | |
HIBERNATE = 4 | |
async def send(choice: int): | |
try: | |
_, writer = await asyncio.open_connection(HOST, PORT) | |
writer.write(int.to_bytes(choice)) | |
await writer.drain() | |
writer.close() | |
await writer.wait_closed() | |
except OSError as error: | |
print(error.strerror) | |
async def main(): | |
print("Options:") | |
for cmd in Commands: | |
print(f"{cmd.value}: {cmd.name}") | |
str_choice = input("Select an option: ") | |
try: | |
choice = int(str_choice) | |
command = Commands(choice) | |
except ValueError: | |
print("Invalid option.") | |
return | |
print(f"Sending {command.name} to {HOST}:{PORT}.") | |
await send(choice) | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
print("Aborted.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import server | |
import update | |
update.main() | |
python = sys.executable | |
os.execl(python, python, server.__file__) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
import subprocess | |
import sys | |
from enum import Enum | |
import update | |
HOST = "0.0.0.0" | |
PORT = 62775 | |
class Commands(Enum): | |
UPDATE = 0 | |
LOGOUT = 1 | |
SHUTDOWN = 2 | |
REBOOT = 3 | |
HIBERNATE = 4 | |
def restart_script(): | |
python = sys.executable | |
os.execl(python, python, *sys.argv) | |
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): | |
addr = writer.get_extra_info("peername") | |
client_ip = f"{addr[0]}:{addr[1]}" | |
print(f"Connection with {client_ip} opened.") | |
while True: | |
data = await reader.read(1024) | |
if not data: | |
break | |
match Commands(int.from_bytes(data)): | |
case Commands.UPDATE: | |
update.main() | |
restart_script() | |
case Commands.LOGOUT: | |
subprocess.run(["shutdown", "/l"]) | |
case Commands.SHUTDOWN: | |
subprocess.run(["shutdown", "/s", "/t", "0"]) | |
case Commands.REBOOT: | |
subprocess.run(["shutdown", "/r", "/t", "0"]) | |
case Commands.HIBERNATE: | |
subprocess.run(["shutdown", "/h"]) | |
print(f"Connection with {client_ip} closed.") | |
writer.close() | |
async def main(): | |
server = await asyncio.start_server(handle_client, HOST, PORT) | |
for socket in server.sockets: | |
addr = socket.getsockname() | |
server_ip = f"{addr[0]}:{addr[1]}" | |
print(f"Serving on {server_ip}.") | |
async with server: | |
await server.serve_forever() | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
print("Server closed.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function New-Shortcut { | |
param ( | |
[string]$TargetPath, | |
[string]$ShortcutLocation | |
) | |
$shell = New-Object -ComObject WScript.Shell | |
$shortcut = $shell.CreateShortcut($ShortcutLocation) | |
$shortcut.TargetPath = $TargetPath | |
$shortcut.Save() | |
} | |
$localAppData = [System.Environment]::GetFolderPath("LocalApplicationData") | |
$startupFolder = [System.Environment]::GetFolderPath("Startup") | |
$files = Resolve-Path -Path "*" | |
$targetDirectory = Join-Path -Path $localAppData -ChildPath "shutdown" | |
$targetPath = Join-Path -Path $targetDirectory -ChildPath "run.pyw" | |
$shortcutLocation = Join-Path -Path $startupFolder -ChildPath "run.pyw.lnk" | |
If (-Not (Test-Path -Path $targetDirectory)) { | |
New-Item -Path $targetDirectory -ItemType Directory | |
} | |
Copy-Item -Path $files -Destination $targetDirectory | |
New-Shortcut -TargetPath $targetPath -ShortcutLocation $shortcutLocation |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import http.client | |
import json | |
import os | |
import tempfile | |
import urllib.parse | |
import zipfile | |
from dataclasses import dataclass | |
from typing import Optional | |
@dataclass | |
class Version: | |
version: Optional[int] | |
owner: Optional[str] | |
id: Optional[str] | |
@dataclass | |
class Response: | |
status: int | |
reason: str | |
content: bytes | |
def request_get(url: str) -> Response: | |
parsed_url = urllib.parse.urlparse(url) | |
host = parsed_url.netloc | |
path = parsed_url.path | |
conn = http.client.HTTPSConnection(host) | |
conn.request("GET", path) | |
response = conn.getresponse() | |
response_obj = Response(response.status, response.reason, response.read()) | |
conn.close() | |
return response_obj | |
def get_local_version(version_path: str) -> Version: | |
with open(version_path, "r") as version_file: | |
data = version_file.read() | |
json_data: dict = json.loads(data) | |
return Version( | |
json_data.get("version"), json_data.get("owner"), json_data.get("id") | |
) | |
def get_remote_version(gist_id: str) -> Optional[Version]: | |
response = request_get( | |
f"https://gist.githubusercontent.com/{gist_id}/raw/version.json" | |
) | |
if response.status != 200: | |
print(f"Failed to fetch version: {response.reason}") | |
return None | |
json_data: dict = json.loads(response.content) | |
return Version( | |
json_data.get("version"), json_data.get("owner"), json_data.get("id") | |
) | |
def download_archive(gist_id: str) -> Optional[str]: | |
response = request_get(f"https://codeload.github.com/{gist_id}/zip/main") | |
if response.status != 200: | |
print(f"Failed to download update: {response.reason}") | |
return None | |
with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
temp_file.write(response.content) | |
return temp_file.name | |
def decompress_archive(archive_path: str, extract_to: str): | |
with zipfile.ZipFile(archive_path, "r") as zip_ref: | |
first_iteration = True | |
for member in zip_ref.namelist(): | |
if first_iteration: | |
first_iteration = False | |
continue | |
dest_path = os.path.join(extract_to, member[38:]) | |
with zip_ref.open(member) as source, open(dest_path, "wb") as target: | |
target.write(source.read()) | |
def main(): | |
this_directory = os.path.dirname(__file__) | |
version_path = os.path.join(this_directory, "version.json") | |
local_version = get_local_version(version_path) | |
if local_version.version is None: | |
print("Updates disabled.") | |
return | |
gist_id = f"{local_version.owner}/{local_version.id}" | |
remote_version = get_remote_version(gist_id) | |
if remote_version is None: | |
return | |
if remote_version.version == local_version.version: | |
print("No updates!") | |
return | |
archive_path = download_archive(gist_id) | |
if archive_path is None: | |
return | |
decompress_archive(archive_path, this_directory) | |
os.unlink(archive_path) | |
print("Updated!") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"version": 4, | |
"owner": "agatemosu", | |
"id": "a23a82b0e4c677541d3ac5bc5f00b417" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment