Skip to content

Instantly share code, notes, and snippets.

@hiroshil
Created November 29, 2024 07:32
Show Gist options
  • Save hiroshil/ec6d32a1d7534d1684659d54b04ccee2 to your computer and use it in GitHub Desktop.
Save hiroshil/ec6d32a1d7534d1684659d54b04ccee2 to your computer and use it in GitHub Desktop.
Script to upload a file to a OneDrive folder using Office365-REST-Python-Client (with Tqdm progress bar and resumable upload supported)
import os
import re
import json
from tqdm import tqdm
from pathlib import Path
from office365.graph_client import GraphClient
from office365.onedrive.internal.paths.url import UrlPath
from office365.onedrive.driveitems.driveItem import DriveItem
from office365.onedrive.driveitems.uploadable_properties import (
DriveItemUploadableProperties,
)
from office365.runtime.queries.upload_session import UploadSessionQuery
from upload_session_request_resumable import UploadSessionRequestResumable
class SessionData:
def __init__(self, file_name='', uploaded_bytes=0, item='', upload_id='', tempauth=''):
self.data = {
"file_name": file_name,
"uploaded_bytes": uploaded_bytes,
"item": item,
"upload_id": upload_id,
"tempauth": tempauth
}
def get(self, key):
if key in self.data:
return self.data[key]
else:
return None
def set(self, key, value):
self.data[key] = value
def save_to_json(self):
# add ur encrypt and protect file method here
with open("session.json", 'w') as f:
json.dump(self.data, f, indent=4)
def load_from_json(self):
try:
with open("session.json", 'r') as f:
data = json.load(f)
self.data = data
except FileNotFoundError:
pass
return self.data
def has_session(self, fn):
return True if (fn == self.get('file_name')) and self.get('uploaded_bytes') else False
def filter_url(url, t):
result = re.search(r"https:\/\/([^']+)\.sharepoint\.com\/personal\/([^']+)\/_api\/v2\.0\/drive\/"
"items\/([^']+)\/uploadSession\?guid='([^']+)'&([^']+)&tempauth=([^']+)", url)
match t:
case "item":
return result.group(3)
case "upload_id":
return result.group(4)
case "tempauth":
return result.group(6)
case _:
return None
def resumable_upload_with_cache(self, source_path, chunk_size=2000000, chunk_uploaded=None, session_cache=None):
# type: (str, int, Optional[Callable[[int], None]]) -> "DriveItem"
"""
Create an upload session to allow your app to upload files up to the maximum file size.
An upload session allows your app to upload ranges of the file in sequential API requests,
which allows the transfer to be resumed if a connection is dropped while the upload is in progress.
To upload a file using an upload session, there are two steps:
Create an upload session
Upload bytes to the upload session
:param chunk_uploaded:
:param str source_path: File path
:param int chunk_size: chunk size
"""
def _start_upload(result):
# type: (ClientResult[UploadSession]) -> None
session_url = qry.upload_session_url
if session_cache.has_session(source_path):
session_url = session_url.replace(filter_url(session_url, "item"), session_cache.get('item'))
session_url = session_url.replace(filter_url(session_url, "upload_id"), session_cache.get('upload_id'))
session_url = session_url.replace(filter_url(session_url, "tempauth"), session_cache.get('tempauth'))
qry.return_type.value.uploadUrl = session_url
else:
session_cache.set('file_name', source_path)
session_cache.set('item', filter_url(session_url, "item"))
session_cache.set('upload_id', filter_url(session_url, "upload_id"))
session_cache.set('tempauth', filter_url(session_url, "tempauth"))
session_cache.save_to_json()
with open(source_path, "rb") as local_file:
session_request = UploadSessionRequestResumable(
local_file, chunk_size, chunk_uploaded, session_cache
)
session_request.execute_query(qry)
file_name = os.path.basename(source_path)
return_type = DriveItem(self.context, UrlPath(file_name, self.resource_path))
qry = UploadSessionQuery(
return_type, {"item": DriveItemUploadableProperties(name=file_name)}
)
self.context.add_query(qry).after_query_execute(_start_upload)
return return_type
DriveItem.resumable_upload_with_cache = resumable_upload_with_cache
def update_p(bar, current, resume_pos):
if resume_pos:
current += resume_pos * bar.total
bar.update(current - bar.n)
# User.Read.All | Application | Read all users' full profile
tenant_id = "ur_tenant_id"
client_id = "ur_client_id" # get permission here: siteurl + /_layouts/15/appinv.aspx
client_secret = "ur_client_secret"
principal_name = "ur_email"
client = GraphClient.with_client_secret(tenant_id, client_id, client_secret)
chunk_size = 10 * 1024 * 1024 # 10Mb
local_path = "FILE_PATH"
remote_folder = "REMOTE_PLDER"
file_size = Path(local_path).stat().st_size
session_cache = SessionData()
session_cache.load_from_json()
resume_pos = int(session_cache.get('uploaded_bytes') / file_size)
if session_cache.has_session(local_path):
print("found previous session: {0}".format(session_cache.get('upload_id')))
remote_folder = client.users.get_by_principal_name(principal_name).drive.root.get_by_path(remote_folder)
with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024, desc=f"Uploading {local_path}") as pbar:
remote_file = (remote_folder.resumable_upload_with_cache(
local_path,
chunk_size=chunk_size,
chunk_uploaded=lambda offset: update_p(pbar, offset, resume_pos),
session_cache=session_cache
)
.get()
.execute_query()
)
print("File {0} has been uploaded".format(remote_file.web_url))
import os
import typing
from typing import Callable
from pathlib import Path
import requests
from typing_extensions import Self
from office365.runtime.client_request import ClientRequest
from office365.runtime.http.http_method import HttpMethod
from office365.runtime.http.request_options import RequestOptions
from office365.runtime.queries.upload_session import UploadSessionQuery
class UploadSessionRequestResumable(ClientRequest):
def __init__(self, file_object, chunk_size, chunk_uploaded=None, session_cache=None):
# type: (typing.IO, int, Callable[[int], None], int) -> None
super(UploadSessionRequestResumable, self).__init__()
self._file_object = file_object
self._chunk_size = chunk_size
self._chunk_uploaded = chunk_uploaded
self._range_data = None
self._bytes_pos_uploaded = 0 # Track uploaded bytes position
self.session_cache = session_cache
if session_cache.has_session(self._file_object.name):
self._bytes_pos_uploaded = self.session_cache.get('uploaded_bytes')
self._file_object.read(self._bytes_pos_uploaded)
def build_request(self, query):
# type: (UploadSessionQuery) -> Self
request = RequestOptions(query.upload_session_url)
request.method = HttpMethod.Put
# Set Content-Length based on available data or remaining upload size
request.set_header("Content-Length", str(len(self._range_data)))
request.set_header(
"Content-Range",
"bytes {0}-{1}/{2}".format(
self.range_start, self.range_end - 1, self.file_size
),
)
request.set_header("Accept", "*/*")
request.data = self._range_data
return request
def process_response(self, response, query):
# type: (requests.Response, UploadSessionQuery) -> None
response.raise_for_status()
if callable(self._chunk_uploaded):
self._chunk_uploaded(self.range_end)
def execute_query(self, query):
# type: (UploadSessionQuery) -> None
for self._range_data in self._read_next():
super(UploadSessionRequestResumable, self).execute_query(query)
def _read_next(self):
# type: () -> Iterator[bytes]
while True:
content = self._file_object.read(self._chunk_size)
if not content:
break
yield content
# Update uploaded bytes position after reading a chunk
self._bytes_pos_uploaded += len(content)
self.session_cache.set('uploaded_bytes', self._bytes_pos_uploaded)
self.session_cache.save_to_json()
@property
def file_size(self):
return os.fstat(self._file_object.fileno()).st_size
@property
def range_start(self):
if self.range_end == 0:
return 0
return self.range_end - len(self._range_data)
@property
def range_end(self):
return self._file_object.tell()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment