Add documentation for `get_onedrive_token.py` script in README files and update `run_ps_download.py` to include access token handling.
This commit is contained in:
parent
7bd1082f9c
commit
32b24c3a11
15
README.md
15
README.md
|
|
@ -14,6 +14,21 @@ This repository contains a set of Python and PowerShell scripts that help with:
|
|||
|
||||
### Python Scripts
|
||||
|
||||
#### `get_onedrive_token.py`
|
||||
A Python script to obtain an OAuth2 access token for OneDrive using the official OneDrive client ID. This is required for accessing OneDrive's API.
|
||||
|
||||
Usage:
|
||||
```bash
|
||||
python get_onedrive_token.py
|
||||
```
|
||||
|
||||
The script will:
|
||||
1. Open your browser for authentication
|
||||
2. Handle the OAuth2 flow
|
||||
3. Save the token data (including refresh token) to `onedrive_token.json`
|
||||
|
||||
Note: The script uses the official OneDrive client ID and doesn't require any additional setup.
|
||||
|
||||
#### `run_ps_download.py`
|
||||
A Python wrapper for the PowerShell Live Photo downloader. It handles the execution of the PowerShell script with proper year/month path organization.
|
||||
|
||||
|
|
|
|||
15
README_CN.md
15
README_CN.md
|
|
@ -14,6 +14,21 @@
|
|||
|
||||
### Python 脚本
|
||||
|
||||
#### `get_onedrive_token.py`
|
||||
使用官方 OneDrive 客户端 ID 获取 OAuth2 访问令牌的 Python 脚本。这是访问 OneDrive API 所必需的。
|
||||
|
||||
使用方法:
|
||||
```bash
|
||||
python get_onedrive_token.py
|
||||
```
|
||||
|
||||
该脚本将:
|
||||
1. 打开浏览器进行身份验证
|
||||
2. 处理 OAuth2 流程
|
||||
3. 将令牌数据(包括刷新令牌)保存到 `onedrive_token.json`
|
||||
|
||||
注意:该脚本使用官方 OneDrive 客户端 ID,无需额外设置。
|
||||
|
||||
#### `run_ps_download.py`
|
||||
PowerShell Live Photo 下载器的 Python 封装。它处理 PowerShell 脚本的执行,并按照年/月路径组织文件。
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,83 @@
|
|||
import msal
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import webbrowser
|
||||
import time
|
||||
|
||||
# OneDrive API configuration
|
||||
CLIENT_ID = "4765445b-32c6-49b0-83e6-1d93765276ca" # Official OneDrive client ID
|
||||
AUTHORITY = "https://login.microsoftonline.com/consumers"
|
||||
SCOPE = [
|
||||
"Files.ReadWrite",
|
||||
"User.Read",
|
||||
"Photos.ReadWrite"
|
||||
]
|
||||
|
||||
def get_access_token():
|
||||
"""
|
||||
Get an access token for OneDrive using device code flow
|
||||
Returns:
|
||||
dict: Token data if successful, None otherwise
|
||||
"""
|
||||
# Create MSAL app
|
||||
app = msal.PublicClientApplication(
|
||||
CLIENT_ID,
|
||||
authority=AUTHORITY
|
||||
)
|
||||
|
||||
try:
|
||||
# Get device code
|
||||
flow = app.initiate_device_flow(scopes=SCOPE)
|
||||
|
||||
if "user_code" not in flow:
|
||||
print("Failed to create device flow")
|
||||
print(json.dumps(flow, indent=2))
|
||||
return None
|
||||
|
||||
print(flow["message"])
|
||||
|
||||
# Open browser for authentication
|
||||
auth_url = flow["verification_uri"]
|
||||
print(f"Opening browser at {auth_url}...")
|
||||
webbrowser.open(auth_url)
|
||||
|
||||
# Wait for user to complete the flow
|
||||
result = app.acquire_token_by_device_flow(flow)
|
||||
|
||||
if "access_token" in result:
|
||||
print("Successfully obtained access token!")
|
||||
# Save both access token and refresh token
|
||||
return {
|
||||
"access_token": result["access_token"],
|
||||
"refresh_token": result.get("refresh_token"),
|
||||
"expires_in": result.get("expires_in"),
|
||||
"token_type": result.get("token_type")
|
||||
}
|
||||
else:
|
||||
print("Failed to obtain access token:")
|
||||
print(json.dumps(result, indent=2))
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during authentication: {str(e)}")
|
||||
return None
|
||||
|
||||
def save_token(token_data):
|
||||
"""Save the token data to a file"""
|
||||
if token_data:
|
||||
with open('onedrive_token.json', 'w') as f:
|
||||
json.dump(token_data, f, indent=2)
|
||||
print("Token data saved to onedrive_token.json")
|
||||
|
||||
def main():
|
||||
"""Main function"""
|
||||
print("Getting OneDrive access token...")
|
||||
token_data = get_access_token()
|
||||
if token_data:
|
||||
save_token(token_data)
|
||||
return 0
|
||||
return 1
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
msal>=1.20.0
|
||||
requests>=2.28.0
|
||||
|
|
@ -4,7 +4,8 @@ import argparse
|
|||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def get_access_token():
|
||||
return "bearer EwAIBK1DBAAUnIIyw3fu//VwydnOZRl0mzVDtSEAAa3wFdyLrgrC3j2sOkOMd3hNpH8n3dYiP1vgycIPwjjzXom9dpAiplwB/4doYFX%2bvSJpKhKGpe9WwptRrjYG1uRxD0smkGkpLj/GWDqUDPfxyPr0RJwbdfSaLGW8kxEUSOVCzx4431HKY7lzJD5fb3UD7TONoUpwdLxzn/2p4j335Wqa4vmzWKmE6CspARSh3rb/9iNomEcm44rea7G%2bTewk2fuQTvcAPPv%2bRJ76sdxjfArcxXuDOEHzUq9vw72wIBLGN8o2T5HAk24dkPFFossGtUz4O47sObcnOufmJ8NZr8avVbQts6Wl6lgNSaScyEwgBbPg72l965eyoyAV6g8QZgAAEKFGOGv3PMlPVLjh66AKYurQAoyHnVIe%2bKx2pyhT3Fi4CCwuiJbOE4AEfWdYmNfzMzcsfpv90s5m7w9g3mIRLVUCKPuEwhDOWJFEsiWmi9RD%2bfhJfWeXa5BzRIVjXoz/oAvwKyGJc6VTMmPvLLh1MpvmWgqF9tpqBx6d28sXcqoXGya29MrQutOyd%2bUanqOggcgXl4Lhh3hvq0AdaEz1NgT0BHBMekcoLXbffUmMFF68rMa63f/4uCeH7S2SWd1IjflRMg9SnBX1PA0By0Z5MRNJLuYv44KgCazLbFXs6HtozaDy8TAY3m8QRMfPpu2oz1juaD4lPmmClZ47SyXoRqoy%2b4tfY/XWNmcl8T4o3uh9rE33b5e9gn4WMiQDE/n4OPlWa451y01IBSuuueBlUByDF6JQ7WtCAwbbaSANqA5uN35qI45sDlhWezqnBDeCbcAmpgcOutTao%2bNOtSe44icp1tj69TujEtzxq5W00TJe4EBz9WuYenZ0iJFgvT9WM2UMEHa%2bZEOFKjMWRutOU%2bv96eTjMiLPGpbkq9Wgq09B60kLE4wWq4/nbGmbItvxITQmMz5QCJi42kQaKSq3VUjKZudBpEeEBag0hRBZfO4E1S5x5FNeurqSXfbD4lrv6oZet5ysWUYILg8CeRycRJ8nXfpdWM0gwhLK30Fwz4KyR0oviUpne04QbDo87Fg20XKSuSzj4b8Pm1hvmPhtT1jdcaSNmExveIsAzpmxYal49JoVXS1jcQcFZ488gbWKMQ0jJ034DRl/0Bm9ETYrfoERbnBi1Gc8%2bm22%2bn67CPdq6hf%2bz9O8eJEEZZ7qoPSXE%2bUrGzu44gu%2bpp/17BRm/shxB%2b81le/8BPosyl2zew6Pa1amsIT2Flma7AYr7E33lsHj9yniWrbCNXUMuwQkvNp%2blkrHpHH9aKNHBNdWSU98xSrRf5ysz2GfbKC6K2aceujz44m94rnQxZkEFQQwMtJOb/EC"
|
||||
|
||||
def run_download_script(base_save_to: str, base_path_to_scan: str, target_date: datetime) -> bool:
|
||||
"""
|
||||
|
|
@ -22,6 +23,8 @@ def run_download_script(base_save_to: str, base_path_to_scan: str, target_date:
|
|||
# Construct full paths
|
||||
save_to = str(Path(base_save_to) / year / month)
|
||||
path_to_scan = str(Path(base_path_to_scan) / year / month)
|
||||
|
||||
access_token = get_access_token()
|
||||
|
||||
try:
|
||||
# Get the script path (assuming it's in the same directory)
|
||||
|
|
@ -34,7 +37,8 @@ def run_download_script(base_save_to: str, base_path_to_scan: str, target_date:
|
|||
'-ExecutionPolicy', 'Bypass',
|
||||
'-File', str(script_path),
|
||||
'-SaveTo', save_to,
|
||||
'-PathToScan', path_to_scan
|
||||
'-PathToScan', path_to_scan,
|
||||
'-AccessToken', access_token
|
||||
]
|
||||
|
||||
print(f"Processing {year}/{month}")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,68 @@
|
|||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
def sync_files(src_path: str, dest_path: str) -> None:
|
||||
"""
|
||||
Synchronize files from source to destination directory, skipping existing files.
|
||||
|
||||
Args:
|
||||
src_path: Source directory path
|
||||
dest_path: Destination directory path
|
||||
"""
|
||||
# Convert to Path objects for easier handling
|
||||
src = Path(src_path)
|
||||
dest = Path(dest_path)
|
||||
|
||||
# Ensure source directory exists
|
||||
if not src.exists():
|
||||
raise FileNotFoundError(f"Source directory {src_path} does not exist")
|
||||
|
||||
# Create destination directory if it doesn't exist
|
||||
dest.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Walk through source directory
|
||||
for root, _, files in os.walk(src):
|
||||
# Convert current root to Path object
|
||||
root_path = Path(root)
|
||||
|
||||
# Calculate relative path from source
|
||||
rel_path = root_path.relative_to(src)
|
||||
|
||||
# Create corresponding destination directory
|
||||
dest_dir = dest / rel_path
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Copy each file
|
||||
for file in files:
|
||||
src_file = root_path / file
|
||||
dest_file = dest_dir / file
|
||||
|
||||
# Skip if destination file exists
|
||||
if dest_file.exists():
|
||||
print(f"Skipping existing file: {dest_file}")
|
||||
continue
|
||||
|
||||
# Copy the file
|
||||
print(f"Copying: {src_file} -> {dest_file}")
|
||||
shutil.copy2(src_file, dest_file)
|
||||
|
||||
# Usage
|
||||
# python sync.py "C:\Users\petr\OneDrive\Photos\Auto-saved\2024\11" "D:\Photos\OneDrive-Photo\2024\11"
|
||||
# python sync.py /path/to/source /path/to/destination
|
||||
if __name__ == "__main__":
|
||||
# Set up argument parser
|
||||
parser = argparse.ArgumentParser(description='Synchronize files from source to destination directory')
|
||||
parser.add_argument('src', help='Source directory path')
|
||||
parser.add_argument('dest', help='Destination directory path')
|
||||
|
||||
# Parse arguments
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
sync_files(args.src, args.dest)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
Loading…
Reference in New Issue