forked from replicate/cog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
enhanced_file_upload_ai.py
106 lines (90 loc) · 3.33 KB
/
enhanced_file_upload_ai.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import base64
import io
import mimetypes
import os
from typing import Optional, Dict
from urllib.parse import urlparse
import requests
import json
def classify_file(fh: io.IOBase) -> Dict[str, str]:
"""Classify the file and extract metadata using an AI model."""
# Placeholder for AI-based file classification and metadata extraction
# Simulate AI model response
return {
"category": "document",
"tags": "important, confidential", # Assuming tags are provided as a single string
}
def optimize_upload_parameters(file_size: int) -> Dict[str, int]:
"""Optimize upload parameters based on AI predictions."""
# Placeholder for AI-based optimization of upload parameters
# Simulate AI model response
return {
"chunk_size": 1024 * 1024, # 1MB chunks
"retry_limit": 5,
}
def handle_upload_errors(error: Exception) -> None:
"""Handle errors using AI-driven retry mechanisms."""
# Placeholder for AI-based error handling
print(f"Error: {error}. AI retry logic will handle this.")
def upload_file(fh: io.IOBase, output_file_prefix: Optional[str] = None) -> str:
fh.seek(0)
# AI-based file classification
metadata = classify_file(fh)
print(f"File metadata: {json.dumps(metadata)}")
if output_file_prefix is not None:
name = getattr(fh, "name", "output")
url = output_file_prefix + os.path.basename(name)
try:
resp = requests.put(url, files={"file": fh}, timeout=None)
resp.raise_for_status()
return url
except requests.RequestException as e:
handle_upload_errors(e)
raise
b = fh.read()
if isinstance(b, str):
b = b.encode("utf-8")
encoded_body = base64.b64encode(b)
if getattr(fh, "name", None):
mime_type = mimetypes.guess_type(fh.name)[0] # type: ignore
else:
mime_type = "application/octet-stream"
s = encoded_body.decode("utf-8")
return f"data:{mime_type};base64,{s}"
def put_file_to_signed_endpoint(
fh: io.IOBase, endpoint: str, client: requests.Session, prediction_id: Optional[str]
) -> str:
fh.seek(0)
filename = guess_filename(fh)
content_type, _ = mimetypes.guess_type(filename)
# AI-based upload parameters optimization
file_size = os.path.getsize(filename)
upload_params = optimize_upload_parameters(file_size)
headers = {
"Content-Type": content_type,
"Chunk-Size": str(upload_params["chunk_size"]),
"Retry-Limit": str(upload_params["retry_limit"]),
}
if prediction_id is not None:
headers["X-Prediction-ID"] = prediction_id
try:
resp = client.put(
ensure_trailing_slash(endpoint) + filename,
fh,
headers=headers,
timeout=(10, 15),
)
resp.raise_for_status()
except requests.RequestException as e:
handle_upload_errors(e)
raise
final_url = resp.url
if "location" in resp.headers:
final_url = resp.headers.get("location")
return str(urlparse(final_url)._replace(query="").geturl())
def ensure_trailing_slash(url: str) -> str:
return url if url.endswith("/") else url + "/"
def guess_filename(obj: io.IOBase) -> str:
"""Tries to guess the filename of the given object."""
name = getattr(obj, "name", "file")
return os.path.basename(name)