Created
November 29, 2024 07:32
-
-
Save hiroshil/ec6d32a1d7534d1684659d54b04ccee2 to your computer and use it in GitHub Desktop.
Script to upload a file to a OneDrive folder using Office365-REST-Python-Client (with Tqdm progress bar and resumable upload supported)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import json | |
from tqdm import tqdm | |
from pathlib import Path | |
from office365.graph_client import GraphClient | |
from office365.onedrive.internal.paths.url import UrlPath | |
from office365.onedrive.driveitems.driveItem import DriveItem | |
from office365.onedrive.driveitems.uploadable_properties import ( | |
DriveItemUploadableProperties, | |
) | |
from office365.runtime.queries.upload_session import UploadSessionQuery | |
from upload_session_request_resumable import UploadSessionRequestResumable | |
class SessionData: | |
def __init__(self, file_name='', uploaded_bytes=0, item='', upload_id='', tempauth=''): | |
self.data = { | |
"file_name": file_name, | |
"uploaded_bytes": uploaded_bytes, | |
"item": item, | |
"upload_id": upload_id, | |
"tempauth": tempauth | |
} | |
def get(self, key): | |
if key in self.data: | |
return self.data[key] | |
else: | |
return None | |
def set(self, key, value): | |
self.data[key] = value | |
def save_to_json(self): | |
# add ur encrypt and protect file method here | |
with open("session.json", 'w') as f: | |
json.dump(self.data, f, indent=4) | |
def load_from_json(self): | |
try: | |
with open("session.json", 'r') as f: | |
data = json.load(f) | |
self.data = data | |
except FileNotFoundError: | |
pass | |
return self.data | |
def has_session(self, fn): | |
return True if (fn == self.get('file_name')) and self.get('uploaded_bytes') else False | |
def filter_url(url, t): | |
result = re.search(r"https:\/\/([^']+)\.sharepoint\.com\/personal\/([^']+)\/_api\/v2\.0\/drive\/" | |
"items\/([^']+)\/uploadSession\?guid='([^']+)'&([^']+)&tempauth=([^']+)", url) | |
match t: | |
case "item": | |
return result.group(3) | |
case "upload_id": | |
return result.group(4) | |
case "tempauth": | |
return result.group(6) | |
case _: | |
return None | |
def resumable_upload_with_cache(self, source_path, chunk_size=2000000, chunk_uploaded=None, session_cache=None): | |
# type: (str, int, Optional[Callable[[int], None]]) -> "DriveItem" | |
""" | |
Create an upload session to allow your app to upload files up to the maximum file size. | |
An upload session allows your app to upload ranges of the file in sequential API requests, | |
which allows the transfer to be resumed if a connection is dropped while the upload is in progress. | |
To upload a file using an upload session, there are two steps: | |
Create an upload session | |
Upload bytes to the upload session | |
:param chunk_uploaded: | |
:param str source_path: File path | |
:param int chunk_size: chunk size | |
""" | |
def _start_upload(result): | |
# type: (ClientResult[UploadSession]) -> None | |
session_url = qry.upload_session_url | |
if session_cache.has_session(source_path): | |
session_url = session_url.replace(filter_url(session_url, "item"), session_cache.get('item')) | |
session_url = session_url.replace(filter_url(session_url, "upload_id"), session_cache.get('upload_id')) | |
session_url = session_url.replace(filter_url(session_url, "tempauth"), session_cache.get('tempauth')) | |
qry.return_type.value.uploadUrl = session_url | |
else: | |
session_cache.set('file_name', source_path) | |
session_cache.set('item', filter_url(session_url, "item")) | |
session_cache.set('upload_id', filter_url(session_url, "upload_id")) | |
session_cache.set('tempauth', filter_url(session_url, "tempauth")) | |
session_cache.save_to_json() | |
with open(source_path, "rb") as local_file: | |
session_request = UploadSessionRequestResumable( | |
local_file, chunk_size, chunk_uploaded, session_cache | |
) | |
session_request.execute_query(qry) | |
file_name = os.path.basename(source_path) | |
return_type = DriveItem(self.context, UrlPath(file_name, self.resource_path)) | |
qry = UploadSessionQuery( | |
return_type, {"item": DriveItemUploadableProperties(name=file_name)} | |
) | |
self.context.add_query(qry).after_query_execute(_start_upload) | |
return return_type | |
DriveItem.resumable_upload_with_cache = resumable_upload_with_cache | |
def update_p(bar, current, resume_pos): | |
if resume_pos: | |
current += resume_pos * bar.total | |
bar.update(current - bar.n) | |
# User.Read.All | Application | Read all users' full profile | |
tenant_id = "ur_tenant_id" | |
client_id = "ur_client_id" # get permission here: siteurl + /_layouts/15/appinv.aspx | |
client_secret = "ur_client_secret" | |
principal_name = "ur_email" | |
client = GraphClient.with_client_secret(tenant_id, client_id, client_secret) | |
chunk_size = 10 * 1024 * 1024 # 10Mb | |
local_path = "FILE_PATH" | |
remote_folder = "REMOTE_PLDER" | |
file_size = Path(local_path).stat().st_size | |
session_cache = SessionData() | |
session_cache.load_from_json() | |
resume_pos = int(session_cache.get('uploaded_bytes') / file_size) | |
if session_cache.has_session(local_path): | |
print("found previous session: {0}".format(session_cache.get('upload_id'))) | |
remote_folder = client.users.get_by_principal_name(principal_name).drive.root.get_by_path(remote_folder) | |
with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024, desc=f"Uploading {local_path}") as pbar: | |
remote_file = (remote_folder.resumable_upload_with_cache( | |
local_path, | |
chunk_size=chunk_size, | |
chunk_uploaded=lambda offset: update_p(pbar, offset, resume_pos), | |
session_cache=session_cache | |
) | |
.get() | |
.execute_query() | |
) | |
print("File {0} has been uploaded".format(remote_file.web_url)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import typing | |
from typing import Callable | |
from pathlib import Path | |
import requests | |
from typing_extensions import Self | |
from office365.runtime.client_request import ClientRequest | |
from office365.runtime.http.http_method import HttpMethod | |
from office365.runtime.http.request_options import RequestOptions | |
from office365.runtime.queries.upload_session import UploadSessionQuery | |
class UploadSessionRequestResumable(ClientRequest): | |
def __init__(self, file_object, chunk_size, chunk_uploaded=None, session_cache=None): | |
# type: (typing.IO, int, Callable[[int], None], int) -> None | |
super(UploadSessionRequestResumable, self).__init__() | |
self._file_object = file_object | |
self._chunk_size = chunk_size | |
self._chunk_uploaded = chunk_uploaded | |
self._range_data = None | |
self._bytes_pos_uploaded = 0 # Track uploaded bytes position | |
self.session_cache = session_cache | |
if session_cache.has_session(self._file_object.name): | |
self._bytes_pos_uploaded = self.session_cache.get('uploaded_bytes') | |
self._file_object.read(self._bytes_pos_uploaded) | |
def build_request(self, query): | |
# type: (UploadSessionQuery) -> Self | |
request = RequestOptions(query.upload_session_url) | |
request.method = HttpMethod.Put | |
# Set Content-Length based on available data or remaining upload size | |
request.set_header("Content-Length", str(len(self._range_data))) | |
request.set_header( | |
"Content-Range", | |
"bytes {0}-{1}/{2}".format( | |
self.range_start, self.range_end - 1, self.file_size | |
), | |
) | |
request.set_header("Accept", "*/*") | |
request.data = self._range_data | |
return request | |
def process_response(self, response, query): | |
# type: (requests.Response, UploadSessionQuery) -> None | |
response.raise_for_status() | |
if callable(self._chunk_uploaded): | |
self._chunk_uploaded(self.range_end) | |
def execute_query(self, query): | |
# type: (UploadSessionQuery) -> None | |
for self._range_data in self._read_next(): | |
super(UploadSessionRequestResumable, self).execute_query(query) | |
def _read_next(self): | |
# type: () -> Iterator[bytes] | |
while True: | |
content = self._file_object.read(self._chunk_size) | |
if not content: | |
break | |
yield content | |
# Update uploaded bytes position after reading a chunk | |
self._bytes_pos_uploaded += len(content) | |
self.session_cache.set('uploaded_bytes', self._bytes_pos_uploaded) | |
self.session_cache.save_to_json() | |
@property | |
def file_size(self): | |
return os.fstat(self._file_object.fileno()).st_size | |
@property | |
def range_start(self): | |
if self.range_end == 0: | |
return 0 | |
return self.range_end - len(self._range_data) | |
@property | |
def range_end(self): | |
return self._file_object.tell() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment