Last active
March 28, 2019 22:35
-
-
Save intelfx/7beead590668173f228086749862f31a to your computer and use it in GitHub Desktop.
Idempotent register/password reset for https://github.com/matrix-org/synapse
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import os.path as p | |
import sys | |
import argparse | |
import json | |
import yaml | |
import requests | |
import attr | |
import hmac | |
import hashlib | |
@attr.s(kw_only=True) | |
class MatrixRequests(): | |
url = attr.ib() | |
mxid = attr.ib() | |
password = attr.ib() | |
device_id = attr.ib(default=None) | |
access_token = attr.ib(default=None) | |
def homeserver(self): | |
return self.mxid.split(':', maxsplit=1)[1] | |
def authenticate(self): | |
if self.access_token: | |
logging.warning('NatrixRequests.auth(): access_token already set') | |
return | |
req = { | |
'type': 'm.login.password', | |
'identifier': { | |
'type': 'm.id.user', | |
'user': self.mxid, | |
}, | |
'password': self.password, | |
} | |
if self.device_id is not None: | |
req.update({ | |
'device_id': self.device_id, | |
}) | |
login_resp = self.request('POST', '/r0/login', obj=req, auth=False) | |
login_json = login_resp.json() | |
self.mxid = login_json['user_id'] | |
self.device_id = login_json['device_id'] | |
self.access_token = login_json['access_token'] | |
def request(self, method, path, obj=None, auth=True, check=True): | |
if not path.startswith('/'): | |
raise RuntimeError(f'MatrixRequests: request: bad path: {path}') | |
if self.url.endswith('/'): | |
raise RuntimeError(f'MatrixRequests: request: bad self.url: {self.url}') | |
url = f'{self.url}{path}' | |
params = {} | |
if auth: | |
if not self.access_token: | |
raise RuntimeError(f'MatrixRequests: request: auth=True, but self.access_token = {self.access_token}') | |
params['access_token'] = self.access_token | |
r = requests.request(method, url, params=params, data=obj and json.dumps(obj)) | |
if check: | |
r.raise_for_status() | |
return r | |
@staticmethod | |
def _admin_register_mac(secret, req): | |
mac = hmac.new( | |
key=secret.encode('ascii'), | |
digestmod=hashlib.sha1, | |
) | |
mac.update(req['nonce'].encode('utf8')) | |
mac.update(b"\x00") | |
mac.update(req['username'].encode('utf8')) | |
mac.update(b"\x00") | |
mac.update(req['password'].encode('utf8')) | |
mac.update(b"\x00") | |
mac.update(req['admin'] and b'admin' or b'notadmin') | |
return mac.hexdigest() | |
def admin_register(self, secret, localpart, password, admin): | |
nonce_resp = self.request('GET', '/r0/admin/register', auth=False) | |
nonce = nonce_resp.json()['nonce'] | |
req = { | |
'nonce': nonce, | |
'username': localpart, | |
'password': password, | |
'admin': admin, | |
} | |
req['mac'] = MatrixRequests._admin_register_mac(secret=secret, req=req) | |
register_resp = self.request('POST', '/r0/admin/register', obj=req, auth=False) | |
return register_resp | |
def admin_reset_password(self, mxid, password): | |
req = { | |
'new_password': password, | |
} | |
reset_resp = self.request('POST', f'/r0/admin/reset_password/{mxid}', obj=req) | |
return reset_resp | |
def register_or_reset(*, hs, secret, localpart, password, admin): | |
# | |
# attempt to register a new user | |
# | |
try: | |
resp = hs.admin_register( | |
secret=secret, | |
localpart=localpart, | |
password=password, | |
admin=admin, | |
) | |
return resp.json() | |
except requests.HTTPError as e: | |
if e.response.status_code == 400 and e.response.json()['errcode'] == 'M_USER_IN_USE': | |
pass # continue to change password | |
else: | |
raise | |
# | |
# if we got M_USER_IN_USE, reset the existing user's password | |
# | |
hs.authenticate() | |
mxid = f'@{localpart}:{hs.homeserver()}' | |
resp = hs.admin_reset_password( | |
mxid=mxid, | |
password=password, | |
) | |
return resp.json() | |
def read_registration_shared_secret(*, config): | |
with open(config, 'r') as f: | |
config_obj = yaml.load(f) | |
return config_obj['registration_shared_secret'] | |
def read_password(*, password, password_fd): | |
if password is not None: | |
return password | |
if password_fd is not None: | |
with os.fdopen(password_fd, 'r') as f: | |
return f.read() | |
raise RuntimeError() | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--admin-mxid', type=str, required=True, | |
help='Matrix ID of a server admin account (used to reset passwords)' | |
) | |
admin_password = parser.add_mutually_exclusive_group(required=True) | |
admin_password.add_argument('--admin-password', type=str, | |
help='Password to a server admin account (used to reset passwords)' | |
) | |
admin_password.add_argument('--admin-password-fd', type=int, | |
help='fd to read password to a server admin account (used to reset passwords)' | |
) | |
parser.add_argument('-H', '--homeserver', type=str, required=True, | |
help='Matrix homeserver address (/_matrix/client)' | |
) | |
parser.add_argument('-c', '--config', type=str, required=True, | |
help='Path to Matrix homeserver configuration (used for registration_shared_secret)' | |
) | |
parser.add_argument('--register-localpart', type=str, required=True, | |
help='"localpart" of a Matrix ID to register or reset password on' | |
) | |
register_password = parser.add_mutually_exclusive_group(required=True) | |
register_password.add_argument('--register-password', type=str, | |
help='Password to set on specified Matrix ID' | |
) | |
register_password.add_argument('--register-password-fd', type=int, | |
help='fd to read password to set on specified Matrix ID' | |
) | |
args = parser.parse_args() | |
return args | |
def main(): | |
args = parse_args() | |
hs = MatrixRequests( | |
url=args.homeserver, | |
mxid=args.admin_mxid, | |
device_id='REGIST~1', | |
password=read_password( | |
password=args.admin_password, | |
password_fd=args.admin_password_fd, | |
), | |
) | |
try: | |
status = register_or_reset( | |
hs=hs, | |
secret=read_registration_shared_secret(config=args.config), | |
localpart=args.register_localpart, | |
password=read_password( | |
password=args.register_password, | |
password_fd=args.register_password_fd, | |
), | |
admin=False, | |
) | |
verdict = { | |
'verdict': 'ok', | |
'status': status, | |
} | |
exitcode = 0 | |
except requests.HTTPError as e: | |
verdict = { | |
'verdict': 'error', | |
'error': { | |
'code': e.response.status_code, | |
}, | |
} | |
try: | |
verdict['error']['data'] = e.response.json() | |
except ValueError: | |
verdict['error']['data'] = e.response.text | |
exitcode = 1 | |
json.dump(verdict, sys.stdout, indent=4) | |
sys.exit(exitcode) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment