Remove download_od_livephotos.py and sync.py.bak scripts
This commit is contained in:
parent
cfdb93a91e
commit
b90bcb566f
|
|
@ -1,172 +0,0 @@
|
||||||
import os
|
|
||||||
import requests
|
|
||||||
import webbrowser
|
|
||||||
import http.server
|
|
||||||
import socketserver
|
|
||||||
import threading
|
|
||||||
import urllib.parse
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import Optional, Dict
|
|
||||||
import uuid
|
|
||||||
import tkinter as tk
|
|
||||||
from tkinter import ttk
|
|
||||||
from webview import create_window, start
|
|
||||||
|
|
||||||
class OAuthCallbackHandler(http.server.SimpleHTTPRequestHandler):
|
|
||||||
def do_POST(self):
|
|
||||||
content_length = int(self.headers['Content-Length'])
|
|
||||||
post_data = self.rfile.read(content_length).decode('utf-8')
|
|
||||||
self.server.oauth_response = post_data
|
|
||||||
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-type', 'text/html')
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(b"Authentication complete. You can close this window.")
|
|
||||||
|
|
||||||
def log_message(self, format, *args):
|
|
||||||
# Suppress logging
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_onedrive_token() -> Dict[str, str]:
|
|
||||||
"""
|
|
||||||
Authenticate with OneDrive using the Photos web client ID.
|
|
||||||
Returns authentication tokens in a dictionary.
|
|
||||||
"""
|
|
||||||
client_id = "073204aa-c1e0-4e66-a200-e5815a0aa93d"
|
|
||||||
redirect_uri = "https://photos.onedrive.com/auth/login"
|
|
||||||
scope = "OneDrive.ReadWrite,offline_access,openid,profile"
|
|
||||||
|
|
||||||
# 创建一个简单的 Web 浏览器窗口
|
|
||||||
import tkinter as tk
|
|
||||||
from tkinter import ttk
|
|
||||||
from webview import create_window, start
|
|
||||||
|
|
||||||
def on_loaded():
|
|
||||||
# 监听 cookies 变化
|
|
||||||
cookies = webview.get_cookies()
|
|
||||||
for cookie in cookies:
|
|
||||||
if cookie['name'] == 'AccessToken-OneDrive.ReadWrite':
|
|
||||||
window.destroy()
|
|
||||||
return cookie['value']
|
|
||||||
|
|
||||||
auth_url = (
|
|
||||||
"https://login.microsoftonline.com/consumers/oauth2/v2.0/authorize"
|
|
||||||
f"?client_id={client_id}"
|
|
||||||
f"&nonce=uv.{str(uuid.uuid4())}"
|
|
||||||
"&response_mode=form_post"
|
|
||||||
f"&scope={scope}"
|
|
||||||
"&response_type=code"
|
|
||||||
f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# 创建浏览器窗口
|
|
||||||
window = create_window('OneDrive Authentication', auth_url,
|
|
||||||
width=660, height=775)
|
|
||||||
|
|
||||||
# 启动窗口并等待认证完成
|
|
||||||
start(func=on_loaded)
|
|
||||||
|
|
||||||
return window.get_cookies()
|
|
||||||
|
|
||||||
def download_live_photos(access_token: str, save_to: str, path_to_scan: str = "/Pictures/Camera Roll"):
|
|
||||||
"""
|
|
||||||
Download all Live Photos from the specified OneDrive path.
|
|
||||||
"""
|
|
||||||
if not path_to_scan.startswith('/'):
|
|
||||||
path_to_scan = '/' + path_to_scan
|
|
||||||
if not path_to_scan.endswith('/'):
|
|
||||||
path_to_scan += '/'
|
|
||||||
|
|
||||||
def process_folder(folder_id: str = 'root', current_path: str = '/'):
|
|
||||||
url = f"https://api.onedrive.com/v1.0/drive/items/{folder_id}/children"
|
|
||||||
params = {
|
|
||||||
"$filter": "photo/livePhoto ne null or folder ne null or remoteItem ne null",
|
|
||||||
"select": "fileSystemInfo,photo,id,name,size,folder,remoteItem"
|
|
||||||
}
|
|
||||||
headers = {"Authorization": f"Bearer {access_token}"}
|
|
||||||
|
|
||||||
while url:
|
|
||||||
response = requests.get(url, params=params, headers=headers).json()
|
|
||||||
|
|
||||||
for item in response.get('value', []):
|
|
||||||
folder_path = current_path + item['name'] + '/'
|
|
||||||
|
|
||||||
# Handle folders
|
|
||||||
if 'folder' in item:
|
|
||||||
if folder_path.startswith(path_to_scan) or path_to_scan.startswith(folder_path):
|
|
||||||
print(f"Checking folder {item['id']} - {folder_path}")
|
|
||||||
process_folder(item['id'], folder_path)
|
|
||||||
|
|
||||||
# Handle live photos
|
|
||||||
if 'photo' in item and 'livePhoto' in item['photo']:
|
|
||||||
if current_path.startswith(path_to_scan):
|
|
||||||
target_path = os.path.join(save_to, current_path[len(path_to_scan):])
|
|
||||||
download_single_live_photo(
|
|
||||||
access_token,
|
|
||||||
item['id'],
|
|
||||||
target_path,
|
|
||||||
item['size'],
|
|
||||||
item['fileSystemInfo']['lastModifiedDateTime']
|
|
||||||
)
|
|
||||||
|
|
||||||
url = response.get('@odata.nextLink')
|
|
||||||
params = {} # Clear params as nextLink includes them
|
|
||||||
|
|
||||||
process_folder()
|
|
||||||
|
|
||||||
def download_single_live_photo(access_token: str, element_id: str, save_to: str,
|
|
||||||
expected_size: int, last_modified: str):
|
|
||||||
"""
|
|
||||||
Download a single Live Photo (both video and image components).
|
|
||||||
"""
|
|
||||||
os.makedirs(save_to, exist_ok=True)
|
|
||||||
headers = {"Authorization": f"Bearer {access_token}"}
|
|
||||||
actual_size = 0
|
|
||||||
|
|
||||||
# Download video component
|
|
||||||
video_url = f"https://api.onedrive.com/v1.0/drive/items/{element_id}/content?format=video"
|
|
||||||
video_response = requests.get(video_url, headers=headers)
|
|
||||||
if video_response.ok:
|
|
||||||
filename = video_response.headers.get('Content-Disposition', '').split('filename=')[-1].strip('"')
|
|
||||||
if filename:
|
|
||||||
video_path = os.path.join(save_to, filename)
|
|
||||||
with open(video_path, 'wb') as f:
|
|
||||||
f.write(video_response.content)
|
|
||||||
actual_size += len(video_response.content)
|
|
||||||
os.utime(video_path, (datetime.fromisoformat(last_modified.replace('Z', '+00:00')).timestamp(),) * 2)
|
|
||||||
|
|
||||||
# Download image component
|
|
||||||
image_url = f"https://api.onedrive.com/v1.0/drive/items/{element_id}/content"
|
|
||||||
image_response = requests.get(image_url, headers=headers)
|
|
||||||
if image_response.ok:
|
|
||||||
filename = image_response.headers.get('Content-Disposition', '').split('filename=')[-1].strip('"')
|
|
||||||
if filename:
|
|
||||||
image_path = os.path.join(save_to, filename)
|
|
||||||
with open(image_path, 'wb') as f:
|
|
||||||
f.write(image_response.content)
|
|
||||||
actual_size += len(image_response.content)
|
|
||||||
os.utime(image_path, (datetime.fromisoformat(last_modified.replace('Z', '+00:00')).timestamp(),) * 2)
|
|
||||||
|
|
||||||
if actual_size != expected_size:
|
|
||||||
print(f"Warning: Size mismatch for {element_id}. Expected {expected_size}, got {actual_size}")
|
|
||||||
|
|
||||||
def main():
|
|
||||||
print("Live Photo downloader - Downloads Live Photos from OneDrive camera roll")
|
|
||||||
print("(C) 2024 Petr Vyskocil. Licensed under MIT license.")
|
|
||||||
print()
|
|
||||||
|
|
||||||
save_to = input("Enter the path to save Live Photos: ")
|
|
||||||
path_to_scan = input("Enter OneDrive path to scan [/Photos/Auto-saved]: ") or "/Photos/Auto-saved"
|
|
||||||
|
|
||||||
print("Getting OneDrive Authentication token...")
|
|
||||||
auth = get_onedrive_token()
|
|
||||||
|
|
||||||
if 'access_token' not in auth:
|
|
||||||
print("Failed to get authentication token")
|
|
||||||
return
|
|
||||||
|
|
||||||
print("Downloading Live Photos...")
|
|
||||||
download_live_photos(auth['access_token'], save_to, path_to_scan)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
36
sync.py.bak
36
sync.py.bak
|
|
@ -1,36 +0,0 @@
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
def sync_heic_mov(src_folder, dest_folder):
|
|
||||||
# Convert to Path objects for easier handling
|
|
||||||
src_path = Path(src_folder)
|
|
||||||
dest_path = Path(dest_folder)
|
|
||||||
|
|
||||||
# Walk through the source directory
|
|
||||||
for root, dirs, files in os.walk(src_path):
|
|
||||||
for file in files:
|
|
||||||
if file.lower().endswith('.mov'):
|
|
||||||
# Get the relative path from the source folder
|
|
||||||
rel_path = Path(root).relative_to(src_path)
|
|
||||||
|
|
||||||
# Construct the full paths for source and destination
|
|
||||||
src_file = Path(root) / file
|
|
||||||
dest_file = dest_path / rel_path / file
|
|
||||||
|
|
||||||
# Create the destination directory if it doesn't exist
|
|
||||||
dest_file.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
# Check if the destination file already exists
|
|
||||||
if not dest_file.exists():
|
|
||||||
print(f"Copying: {src_file} -> {dest_file}")
|
|
||||||
shutil.copy2(src_file, dest_file)
|
|
||||||
else:
|
|
||||||
print(f"Skipping (already exists): {dest_file}")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
source_folder = input("Enter the source folder path: ")
|
|
||||||
destination_folder = input("Enter the destination folder path: ")
|
|
||||||
|
|
||||||
sync_heic_mov(source_folder, destination_folder)
|
|
||||||
print("Sync completed.")
|
|
||||||
Loading…
Reference in New Issue