Skip to content

Instantly share code, notes, and snippets.

@agatemosu
Last active December 14, 2024 16:46
Show Gist options
  • Save agatemosu/a23a82b0e4c677541d3ac5bc5f00b417 to your computer and use it in GitHub Desktop.
Save agatemosu/a23a82b0e4c677541d3ac5bc5f00b417 to your computer and use it in GitHub Desktop.
import asyncio
from enum import Enum
HOST = "127.0.0.1"
PORT = 62775
class Commands(Enum):
UPDATE = 0
LOGOUT = 1
SHUTDOWN = 2
REBOOT = 3
HIBERNATE = 4
async def send(choice: int):
try:
_, writer = await asyncio.open_connection(HOST, PORT)
writer.write(int.to_bytes(choice))
await writer.drain()
writer.close()
await writer.wait_closed()
except OSError as error:
print(error.strerror)
async def main():
print("Options:")
for cmd in Commands:
print(f"{cmd.value}: {cmd.name}")
str_choice = input("Select an option: ")
try:
choice = int(str_choice)
command = Commands(choice)
except ValueError:
print("Invalid option.")
return
print(f"Sending {command.name} to {HOST}:{PORT}.")
await send(choice)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Aborted.")
import os
import sys
import server
import update
update.main()
python = sys.executable
os.execl(python, python, server.__file__)
import asyncio
import os
import subprocess
import sys
from enum import Enum
import update
HOST = "0.0.0.0"
PORT = 62775
class Commands(Enum):
UPDATE = 0
LOGOUT = 1
SHUTDOWN = 2
REBOOT = 3
HIBERNATE = 4
def restart_script():
python = sys.executable
os.execl(python, python, *sys.argv)
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
client_ip = f"{addr[0]}:{addr[1]}"
print(f"Connection with {client_ip} opened.")
while True:
data = await reader.read(1024)
if not data:
break
match Commands(int.from_bytes(data)):
case Commands.UPDATE:
update.main()
restart_script()
case Commands.LOGOUT:
subprocess.run(["shutdown", "/l"])
case Commands.SHUTDOWN:
subprocess.run(["shutdown", "/s", "/t", "0"])
case Commands.REBOOT:
subprocess.run(["shutdown", "/r", "/t", "0"])
case Commands.HIBERNATE:
subprocess.run(["shutdown", "/h"])
print(f"Connection with {client_ip} closed.")
writer.close()
async def main():
server = await asyncio.start_server(handle_client, HOST, PORT)
for socket in server.sockets:
addr = socket.getsockname()
server_ip = f"{addr[0]}:{addr[1]}"
print(f"Serving on {server_ip}.")
async with server:
await server.serve_forever()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Server closed.")
function New-Shortcut {
param (
[string]$TargetPath,
[string]$ShortcutLocation
)
$shell = New-Object -ComObject WScript.Shell
$shortcut = $shell.CreateShortcut($ShortcutLocation)
$shortcut.TargetPath = $TargetPath
$shortcut.Save()
}
$localAppData = [System.Environment]::GetFolderPath("LocalApplicationData")
$startupFolder = [System.Environment]::GetFolderPath("Startup")
$files = Resolve-Path -Path "*"
$targetDirectory = Join-Path -Path $localAppData -ChildPath "shutdown"
$targetPath = Join-Path -Path $targetDirectory -ChildPath "run.pyw"
$shortcutLocation = Join-Path -Path $startupFolder -ChildPath "run.pyw.lnk"
If (-Not (Test-Path -Path $targetDirectory)) {
New-Item -Path $targetDirectory -ItemType Directory
}
Copy-Item -Path $files -Destination $targetDirectory
New-Shortcut -TargetPath $targetPath -ShortcutLocation $shortcutLocation
import http.client
import json
import os
import tempfile
import urllib.parse
import zipfile
from dataclasses import dataclass
from typing import Optional
@dataclass
class Version:
version: Optional[int]
owner: Optional[str]
id: Optional[str]
@dataclass
class Response:
status: int
reason: str
content: bytes
def request_get(url: str) -> Response:
parsed_url = urllib.parse.urlparse(url)
host = parsed_url.netloc
path = parsed_url.path
conn = http.client.HTTPSConnection(host)
conn.request("GET", path)
response = conn.getresponse()
response_obj = Response(response.status, response.reason, response.read())
conn.close()
return response_obj
def get_local_version(version_path: str) -> Version:
with open(version_path, "r") as version_file:
data = version_file.read()
json_data: dict = json.loads(data)
return Version(
json_data.get("version"), json_data.get("owner"), json_data.get("id")
)
def get_remote_version(gist_id: str) -> Optional[Version]:
response = request_get(
f"https://gist.githubusercontent.com/{gist_id}/raw/version.json"
)
if response.status != 200:
print(f"Failed to fetch version: {response.reason}")
return None
json_data: dict = json.loads(response.content)
return Version(
json_data.get("version"), json_data.get("owner"), json_data.get("id")
)
def download_archive(gist_id: str) -> Optional[str]:
response = request_get(f"https://codeload.github.com/{gist_id}/zip/main")
if response.status != 200:
print(f"Failed to download update: {response.reason}")
return None
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(response.content)
return temp_file.name
def decompress_archive(archive_path: str, extract_to: str):
with zipfile.ZipFile(archive_path, "r") as zip_ref:
first_iteration = True
for member in zip_ref.namelist():
if first_iteration:
first_iteration = False
continue
dest_path = os.path.join(extract_to, member[38:])
with zip_ref.open(member) as source, open(dest_path, "wb") as target:
target.write(source.read())
def main():
this_directory = os.path.dirname(__file__)
version_path = os.path.join(this_directory, "version.json")
local_version = get_local_version(version_path)
if local_version.version is None:
print("Updates disabled.")
return
gist_id = f"{local_version.owner}/{local_version.id}"
remote_version = get_remote_version(gist_id)
if remote_version is None:
return
if remote_version.version == local_version.version:
print("No updates!")
return
archive_path = download_archive(gist_id)
if archive_path is None:
return
decompress_archive(archive_path, this_directory)
os.unlink(archive_path)
print("Updated!")
if __name__ == "__main__":
main()
{
"version": 4,
"owner": "agatemosu",
"id": "a23a82b0e4c677541d3ac5bc5f00b417"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment