summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go393
1 files changed, 393 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go b/vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go
new file mode 100644
index 00000000..a70d7054
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go
@@ -0,0 +1,393 @@
+/*
+ * MinIO Go Library for Amazon S3 Compatible Cloud Storage
+ * Copyright 2015-2017 MinIO, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package minio
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "encoding/hex"
+ "encoding/xml"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "sort"
+ "strconv"
+ "strings"
+
+ "github.com/google/uuid"
+ "github.com/minio/minio-go/v7/pkg/encrypt"
+ "github.com/minio/minio-go/v7/pkg/s3utils"
+)
+
+func (c Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
+ opts PutObjectOptions) (info UploadInfo, err error) {
+ info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
+ if err != nil {
+ errResp := ToErrorResponse(err)
+ // Verify if multipart functionality is not available, if not
+ // fall back to single PutObject operation.
+ if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
+ // Verify if size of reader is greater than '5GiB'.
+ if size > maxSinglePutObjectSize {
+ return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
+ }
+ // Fall back to uploading as single PutObject operation.
+ return c.putObject(ctx, bucketName, objectName, reader, size, opts)
+ }
+ }
+ return info, err
+}
+
+func (c Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
+ // Input validation.
+ if err = s3utils.CheckValidBucketName(bucketName); err != nil {
+ return UploadInfo{}, err
+ }
+ if err = s3utils.CheckValidObjectName(objectName); err != nil {
+ return UploadInfo{}, err
+ }
+
+ // Total data read and written to server. should be equal to
+ // 'size' at the end of the call.
+ var totalUploadedSize int64
+
+ // Complete multipart upload.
+ var complMultipartUpload completeMultipartUpload
+
+ // Calculate the optimal parts info for a given size.
+ totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+
+ // Initiate a new multipart upload.
+ uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+
+ defer func() {
+ if err != nil {
+ c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
+ }
+ }()
+
+ // Part number always starts with '1'.
+ partNumber := 1
+
+ // Initialize parts uploaded map.
+ partsInfo := make(map[int]ObjectPart)
+
+ // Create a buffer.
+ buf := make([]byte, partSize)
+
+ for partNumber <= totalPartsCount {
+ // Choose hash algorithms to be calculated by hashCopyN,
+ // avoid sha256 with non-v4 signature request or
+ // HTTPS connection.
+ hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5)
+
+ length, rErr := readFull(reader, buf)
+ if rErr == io.EOF && partNumber > 1 {
+ break
+ }
+
+ if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
+ return UploadInfo{}, rErr
+ }
+
+ // Calculates hash sums while copying partSize bytes into cw.
+ for k, v := range hashAlgos {
+ v.Write(buf[:length])
+ hashSums[k] = v.Sum(nil)
+ v.Close()
+ }
+
+ // Update progress reader appropriately to the latest offset
+ // as we read from the source.
+ rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
+
+ // Checksums..
+ var (
+ md5Base64 string
+ sha256Hex string
+ )
+ if hashSums["md5"] != nil {
+ md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"])
+ }
+ if hashSums["sha256"] != nil {
+ sha256Hex = hex.EncodeToString(hashSums["sha256"])
+ }
+
+ // Proceed to upload the part.
+ objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
+ md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)
+ if uerr != nil {
+ return UploadInfo{}, uerr
+ }
+
+ // Save successfully uploaded part metadata.
+ partsInfo[partNumber] = objPart
+
+ // Save successfully uploaded size.
+ totalUploadedSize += int64(length)
+
+ // Increment part number.
+ partNumber++
+
+ // For unknown size, Read EOF we break away.
+ // We do not have to upload till totalPartsCount.
+ if rErr == io.EOF {
+ break
+ }
+ }
+
+ // Loop over total uploaded parts to save them in
+ // Parts array before completing the multipart request.
+ for i := 1; i < partNumber; i++ {
+ part, ok := partsInfo[i]
+ if !ok {
+ return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
+ }
+ complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
+ ETag: part.ETag,
+ PartNumber: part.PartNumber,
+ })
+ }
+
+ // Sort all completed parts.
+ sort.Sort(completedParts(complMultipartUpload.Parts))
+
+ uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+
+ uploadInfo.Size = totalUploadedSize
+ return uploadInfo, nil
+}
+
+// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
+func (c Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) {
+ // Input validation.
+ if err := s3utils.CheckValidBucketName(bucketName); err != nil {
+ return initiateMultipartUploadResult{}, err
+ }
+ if err := s3utils.CheckValidObjectName(objectName); err != nil {
+ return initiateMultipartUploadResult{}, err
+ }
+
+ // Initialize url queries.
+ urlValues := make(url.Values)
+ urlValues.Set("uploads", "")
+
+ if opts.Internal.SourceVersionID != "" {
+ if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
+ return initiateMultipartUploadResult{}, errInvalidArgument(err.Error())
+ }
+ urlValues.Set("versionId", opts.Internal.SourceVersionID)
+ }
+
+ // Set ContentType header.
+ customHeader := opts.Header()
+
+ reqMetadata := requestMetadata{
+ bucketName: bucketName,
+ objectName: objectName,
+ queryValues: urlValues,
+ customHeader: customHeader,
+ }
+
+ // Execute POST on an objectName to initiate multipart upload.
+ resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
+ defer closeResponse(resp)
+ if err != nil {
+ return initiateMultipartUploadResult{}, err
+ }
+ if resp != nil {
+ if resp.StatusCode != http.StatusOK {
+ return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
+ }
+ }
+ // Decode xml for new multipart upload.
+ initiateMultipartUploadResult := initiateMultipartUploadResult{}
+ err = xmlDecoder(resp.Body, &initiateMultipartUploadResult)
+ if err != nil {
+ return initiateMultipartUploadResult, err
+ }
+ return initiateMultipartUploadResult, nil
+}
+
+// uploadPart - Uploads a part in a multipart upload.
+func (c Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
+ partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error) {
+ // Input validation.
+ if err := s3utils.CheckValidBucketName(bucketName); err != nil {
+ return ObjectPart{}, err
+ }
+ if err := s3utils.CheckValidObjectName(objectName); err != nil {
+ return ObjectPart{}, err
+ }
+ if size > maxPartSize {
+ return ObjectPart{}, errEntityTooLarge(size, maxPartSize, bucketName, objectName)
+ }
+ if size <= -1 {
+ return ObjectPart{}, errEntityTooSmall(size, bucketName, objectName)
+ }
+ if partNumber <= 0 {
+ return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.")
+ }
+ if uploadID == "" {
+ return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.")
+ }
+
+ // Get resources properly escaped and lined up before using them in http request.
+ urlValues := make(url.Values)
+ // Set part number.
+ urlValues.Set("partNumber", strconv.Itoa(partNumber))
+ // Set upload id.
+ urlValues.Set("uploadId", uploadID)
+
+ // Set encryption headers, if any.
+ customHeader := make(http.Header)
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
+ // Server-side encryption is supported by the S3 Multipart Upload actions.
+ // Unless you are using a customer-provided encryption key, you don't need
+ // to specify the encryption parameters in each UploadPart request.
+ if sse != nil && sse.Type() == encrypt.SSEC {
+ sse.Marshal(customHeader)
+ }
+
+ reqMetadata := requestMetadata{
+ bucketName: bucketName,
+ objectName: objectName,
+ queryValues: urlValues,
+ customHeader: customHeader,
+ contentBody: reader,
+ contentLength: size,
+ contentMD5Base64: md5Base64,
+ contentSHA256Hex: sha256Hex,
+ }
+
+ // Execute PUT on each part.
+ resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
+ defer closeResponse(resp)
+ if err != nil {
+ return ObjectPart{}, err
+ }
+ if resp != nil {
+ if resp.StatusCode != http.StatusOK {
+ return ObjectPart{}, httpRespToErrorResponse(resp, bucketName, objectName)
+ }
+ }
+ // Once successfully uploaded, return completed part.
+ objPart := ObjectPart{}
+ objPart.Size = size
+ objPart.PartNumber = partNumber
+ // Trim off the odd double quotes from ETag in the beginning and end.
+ objPart.ETag = trimEtag(resp.Header.Get("ETag"))
+ return objPart, nil
+}
+
+// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
+func (c Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
+ complete completeMultipartUpload) (UploadInfo, error) {
+ // Input validation.
+ if err := s3utils.CheckValidBucketName(bucketName); err != nil {
+ return UploadInfo{}, err
+ }
+ if err := s3utils.CheckValidObjectName(objectName); err != nil {
+ return UploadInfo{}, err
+ }
+
+ // Initialize url queries.
+ urlValues := make(url.Values)
+ urlValues.Set("uploadId", uploadID)
+ // Marshal complete multipart body.
+ completeMultipartUploadBytes, err := xml.Marshal(complete)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+
+ // Instantiate all the complete multipart buffer.
+ completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes)
+ reqMetadata := requestMetadata{
+ bucketName: bucketName,
+ objectName: objectName,
+ queryValues: urlValues,
+ contentBody: completeMultipartUploadBuffer,
+ contentLength: int64(len(completeMultipartUploadBytes)),
+ contentSHA256Hex: sum256Hex(completeMultipartUploadBytes),
+ }
+
+ // Execute POST to complete multipart upload for an objectName.
+ resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
+ defer closeResponse(resp)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+ if resp != nil {
+ if resp.StatusCode != http.StatusOK {
+ return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
+ }
+ }
+
+ // Read resp.Body into a []bytes to parse for Error response inside the body
+ var b []byte
+ b, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return UploadInfo{}, err
+ }
+ // Decode completed multipart upload response on success.
+ completeMultipartUploadResult := completeMultipartUploadResult{}
+ err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult)
+ if err != nil {
+ // xml parsing failure due to presence an ill-formed xml fragment
+ return UploadInfo{}, err
+ } else if completeMultipartUploadResult.Bucket == "" {
+ // xml's Decode method ignores well-formed xml that don't apply to the type of value supplied.
+ // In this case, it would leave completeMultipartUploadResult with the corresponding zero-values
+ // of the members.
+
+ // Decode completed multipart upload response on failure
+ completeMultipartUploadErr := ErrorResponse{}
+ err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr)
+ if err != nil {
+ // xml parsing failure due to presence an ill-formed xml fragment
+ return UploadInfo{}, err
+ }
+ return UploadInfo{}, completeMultipartUploadErr
+ }
+
+ // extract lifecycle expiry date and rule ID
+ expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
+
+ return UploadInfo{
+ Bucket: completeMultipartUploadResult.Bucket,
+ Key: completeMultipartUploadResult.Key,
+ ETag: trimEtag(completeMultipartUploadResult.ETag),
+ VersionID: resp.Header.Get(amzVersionID),
+ Location: completeMultipartUploadResult.Location,
+ Expiration: expTime,
+ ExpirationRuleID: ruleID,
+ }, nil
+
+}