Tuyên bố sự cố - Sử dụng thư viện Boto3 trong Python để tải một đối tượng lên S3. Ví dụ:cách tải test.zip lên Bucket_1 của S3.
Phương pháp tiếp cận / Thuật toán để giải quyết vấn đề này
Bước 1 - Nhập các ngoại lệ boto3 và botocore để xử lý các ngoại lệ.
Bước 2 - Từ pathlib , nhập PurePosixPath để lấy lại tên tệp từ đường dẫn
Bước 3 - s3_path và đường dẫn tệp là hai tham số trong hàm upload_object_into_s3
Bước 4 - Xác thực s3_path được chuyển ở định dạng AWS là s3:// bucket_name / key và đường dẫn tệp dưới dạng đường dẫn cục bộ C:// users / filename
Bước 5 - Tạo phiên AWS bằng thư viện boto3.
Bước 6 - Tạo tài nguyên AWS cho S3.
Bước 7 - Tách đường dẫn S3 và thực hiện các thao tác để tách tên nhóm gốc và đường dẫn khóa
Bước 8 - Lấy tên tệp cho đường dẫn tệp hoàn chỉnh và thêm vào đường dẫn khóa S3.
Bước 9 - Bây giờ sử dụng chức năng upload_fileobj để tải tệp cục bộ lên S3.
Bước 10 - Sử dụng chức năng wait_until_exists để đợi cho đến khi hoạt động kết thúc.
Bước 11 - Xử lý ngoại lệ dựa trên mã phản hồi để xác thực xem tệp có được tải lên hay không.
Bước 12 - Xử lý ngoại lệ chung nếu có sự cố khi tải tệp lên
Ví dụ
Sử dụng mã sau để tải tệp lên AWS S3 -
import boto3
from botocore.exceptions import ClientError
from pathlib import PurePosixPath
def upload_object_into_s3(s3_path, filepath):
if 's3://' in filepath:
print('SourcePath is not a valid path.' + filepath)
raise Exception('SourcePath is not a valid path.')
elif s3_path.find('s3://') == -1:
print('DestinationPath is not a s3 path.' + s3_path)
raise Exception('DestinationPath is not a valid path.')
session = boto3.session.Session()
s3_resource = session.resource('s3')
tokens = s3_path.split('/')
target_key = ""
if len(tokens) > 3:
for tokn in range(3, len(tokens)):
if tokn == 3:
target_key += tokens[tokn]
else:
target_key += "/" + tokens[tokn]
target_bucket_name = tokens[2]
file_name = PurePosixPath(filepath).name
if target_key != '':
target_key.strip()
key_path = target_key + "/" + file_name
else:
key_path = file_name
print(("key_path: " + key_path, 'target_bucket: ' + target_bucket_name))
try:
# uploading Entity from local path
with open(filepath, "rb") as file:
s3_resource.meta.client.upload_fileobj(file, target_bucket_name, key_path)
try:
s3_resource.Object(target_bucket_name, key_path).wait_until_exists()
file.close()
except ClientError as error:
error_code = int(error.response['Error']['Code'])
if error_code == 412 or error_code == 304:
print("Object didn't Upload Successfully ", target_bucket_name)
raise error
return "Object Uploaded Successfully"
except Exception as error:
print("Error in upload object function of s3 helper: " + error.__str__())
raise error
print(upload_object_into_s3('s3://Bucket_1/testfolder', 'c://test.zip')) Đầu ra
key_path:/testfolder/test.zip, target_bucket: Bucket_1 Object Uploaded Successfully