summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/pkg/replication
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/pkg/replication')
-rw-r--r--vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go696
1 files changed, 696 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go b/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go
new file mode 100644
index 00000000..beacc71f
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go
@@ -0,0 +1,696 @@
+/*
+ * MinIO Client (C) 2020 MinIO, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package replication
+
+import (
+ "bytes"
+ "encoding/xml"
+ "fmt"
+ "strconv"
+ "strings"
+ "unicode/utf8"
+
+ "github.com/rs/xid"
+)
+
+var errInvalidFilter = fmt.Errorf("invalid filter")
+
+// OptionType specifies operation to be performed on config
+type OptionType string
+
+const (
+ // AddOption specifies addition of rule to config
+ AddOption OptionType = "Add"
+ // SetOption specifies modification of existing rule to config
+ SetOption OptionType = "Set"
+
+ // RemoveOption specifies rule options are for removing a rule
+ RemoveOption OptionType = "Remove"
+ // ImportOption is for getting current config
+ ImportOption OptionType = "Import"
+)
+
+// Options represents options to set a replication configuration rule
+type Options struct {
+ Op OptionType
+ ID string
+ Prefix string
+ RuleStatus string
+ Priority string
+ TagString string
+ StorageClass string
+ RoleArn string
+ DestBucket string
+ IsTagSet bool
+ IsSCSet bool
+ ReplicateDeletes string // replicate versioned deletes
+ ReplicateDeleteMarkers string // replicate soft deletes
+ ReplicaSync string // replicate replica metadata modifications
+ ExistingObjectReplicate string
+}
+
+// Tags returns a slice of tags for a rule
+func (opts Options) Tags() ([]Tag, error) {
+ var tagList []Tag
+ tagTokens := strings.Split(opts.TagString, "&")
+ for _, tok := range tagTokens {
+ if tok == "" {
+ break
+ }
+ kv := strings.SplitN(tok, "=", 2)
+ if len(kv) != 2 {
+ return []Tag{}, fmt.Errorf("tags should be entered as comma separated k=v pairs")
+ }
+ tagList = append(tagList, Tag{
+ Key: kv[0],
+ Value: kv[1],
+ })
+ }
+ return tagList, nil
+}
+
+// Config - replication configuration specified in
+// https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
+type Config struct {
+ XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"`
+ Rules []Rule `xml:"Rule" json:"Rules"`
+ Role string `xml:"Role" json:"Role"`
+}
+
+// Empty returns true if config is not set
+func (c *Config) Empty() bool {
+ return len(c.Rules) == 0
+}
+
+// AddRule adds a new rule to existing replication config. If a rule exists with the
+// same ID, then the rule is replaced.
+func (c *Config) AddRule(opts Options) error {
+ priority, err := strconv.Atoi(opts.Priority)
+ if err != nil {
+ return err
+ }
+ if opts.RoleArn != c.Role && c.Role != "" {
+ return fmt.Errorf("role ARN does not match existing configuration")
+ }
+ var status Status
+ // toggle rule status for edit option
+ switch opts.RuleStatus {
+ case "enable":
+ status = Enabled
+ case "disable":
+ status = Disabled
+ default:
+ return fmt.Errorf("rule state should be either [enable|disable]")
+ }
+
+ tags, err := opts.Tags()
+ if err != nil {
+ return err
+ }
+ andVal := And{
+ Tags: tags,
+ }
+ filter := Filter{Prefix: opts.Prefix}
+ // only a single tag is set.
+ if opts.Prefix == "" && len(tags) == 1 {
+ filter.Tag = tags[0]
+ }
+ // both prefix and tag are present
+ if len(andVal.Tags) > 1 || opts.Prefix != "" {
+ filter.And = andVal
+ filter.And.Prefix = opts.Prefix
+ filter.Prefix = ""
+ filter.Tag = Tag{}
+ }
+ if opts.ID == "" {
+ opts.ID = xid.New().String()
+ }
+ arnStr := opts.RoleArn
+ if opts.RoleArn == "" {
+ arnStr = c.Role
+ }
+ if arnStr == "" {
+ return fmt.Errorf("role ARN required")
+ }
+ tokens := strings.Split(arnStr, ":")
+ if len(tokens) != 6 {
+ return fmt.Errorf("invalid format for replication Arn")
+ }
+ if c.Role == "" {
+ c.Role = arnStr
+ }
+ destBucket := opts.DestBucket
+ // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html
+ if btokens := strings.Split(destBucket, ":"); len(btokens) != 6 {
+ if len(btokens) == 1 {
+ destBucket = fmt.Sprintf("arn:aws:s3:::%s", destBucket)
+ } else {
+ return fmt.Errorf("destination bucket needs to be in Arn format")
+ }
+ }
+ dmStatus := Disabled
+ if opts.ReplicateDeleteMarkers != "" {
+ switch opts.ReplicateDeleteMarkers {
+ case "enable":
+ dmStatus = Enabled
+ case "disable":
+ dmStatus = Disabled
+ default:
+ return fmt.Errorf("ReplicateDeleteMarkers should be either enable|disable")
+ }
+ }
+
+ vDeleteStatus := Disabled
+ if opts.ReplicateDeletes != "" {
+ switch opts.ReplicateDeletes {
+ case "enable":
+ vDeleteStatus = Enabled
+ case "disable":
+ vDeleteStatus = Disabled
+ default:
+ return fmt.Errorf("ReplicateDeletes should be either enable|disable")
+ }
+ }
+ var replicaSync Status
+ // replica sync is by default Enabled, unless specified.
+ switch opts.ReplicaSync {
+ case "enable", "":
+ replicaSync = Enabled
+ case "disable":
+ replicaSync = Disabled
+ default:
+ return fmt.Errorf("replica metadata sync should be either [enable|disable]")
+ }
+
+ var existingStatus Status
+ if opts.ExistingObjectReplicate != "" {
+ switch opts.ExistingObjectReplicate {
+ case "enable":
+ existingStatus = Enabled
+ case "disable", "":
+ existingStatus = Disabled
+ default:
+ return fmt.Errorf("existingObjectReplicate should be either enable|disable")
+ }
+ }
+ newRule := Rule{
+ ID: opts.ID,
+ Priority: priority,
+ Status: status,
+ Filter: filter,
+ Destination: Destination{
+ Bucket: destBucket,
+ StorageClass: opts.StorageClass,
+ },
+ DeleteMarkerReplication: DeleteMarkerReplication{Status: dmStatus},
+ DeleteReplication: DeleteReplication{Status: vDeleteStatus},
+ // MinIO enables replica metadata syncing by default in the case of bi-directional replication to allow
+ // automatic failover as the expectation in this case is that replica and source should be identical.
+ // However AWS leaves this configurable https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-for-metadata-changes.html
+ SourceSelectionCriteria: SourceSelectionCriteria{
+ ReplicaModifications: ReplicaModifications{
+ Status: replicaSync,
+ },
+ },
+ // By default disable existing object replication unless selected
+ ExistingObjectReplication: ExistingObjectReplication{
+ Status: existingStatus,
+ },
+ }
+
+ // validate rule after overlaying priority for pre-existing rule being disabled.
+ if err := newRule.Validate(); err != nil {
+ return err
+ }
+ for _, rule := range c.Rules {
+ if rule.Priority == newRule.Priority {
+ return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority")
+ }
+ if rule.Destination.Bucket != newRule.Destination.Bucket {
+ return fmt.Errorf("the destination bucket must be same for all rules")
+ }
+ if rule.ID == newRule.ID {
+ return fmt.Errorf("a rule exists with this ID")
+ }
+ }
+
+ c.Rules = append(c.Rules, newRule)
+ return nil
+}
+
+// EditRule modifies an existing rule in replication config
+func (c *Config) EditRule(opts Options) error {
+ if opts.ID == "" {
+ return fmt.Errorf("rule ID missing")
+ }
+ rIdx := -1
+ var newRule Rule
+ for i, rule := range c.Rules {
+ if rule.ID == opts.ID {
+ rIdx = i
+ newRule = rule
+ break
+ }
+ }
+ if rIdx < 0 {
+ return fmt.Errorf("rule with ID %s not found in replication configuration", opts.ID)
+ }
+ prefixChg := opts.Prefix != newRule.Prefix()
+ if opts.IsTagSet || prefixChg {
+ prefix := newRule.Prefix()
+ if prefix != opts.Prefix {
+ prefix = opts.Prefix
+ }
+ tags := []Tag{newRule.Filter.Tag}
+ if len(newRule.Filter.And.Tags) != 0 {
+ tags = newRule.Filter.And.Tags
+ }
+ var err error
+ if opts.IsTagSet {
+ tags, err = opts.Tags()
+ if err != nil {
+ return err
+ }
+ }
+ andVal := And{
+ Tags: tags,
+ }
+
+ filter := Filter{Prefix: prefix}
+ // only a single tag is set.
+ if prefix == "" && len(tags) == 1 {
+ filter.Tag = tags[0]
+ }
+ // both prefix and tag are present
+ if len(andVal.Tags) > 1 || prefix != "" {
+ filter.And = andVal
+ filter.And.Prefix = prefix
+ filter.Prefix = ""
+ filter.Tag = Tag{}
+ }
+ newRule.Filter = filter
+ }
+
+ // toggle rule status for edit option
+ if opts.RuleStatus != "" {
+ switch opts.RuleStatus {
+ case "enable":
+ newRule.Status = Enabled
+ case "disable":
+ newRule.Status = Disabled
+ default:
+ return fmt.Errorf("rule state should be either [enable|disable]")
+ }
+ }
+ // set DeleteMarkerReplication rule status for edit option
+ if opts.ReplicateDeleteMarkers != "" {
+ switch opts.ReplicateDeleteMarkers {
+ case "enable":
+ newRule.DeleteMarkerReplication.Status = Enabled
+ case "disable":
+ newRule.DeleteMarkerReplication.Status = Disabled
+ default:
+ return fmt.Errorf("ReplicateDeleteMarkers state should be either [enable|disable]")
+ }
+ }
+
+ // set DeleteReplication rule status for edit option. This is a MinIO specific
+ // option to replicate versioned deletes
+ if opts.ReplicateDeletes != "" {
+ switch opts.ReplicateDeletes {
+ case "enable":
+ newRule.DeleteReplication.Status = Enabled
+ case "disable":
+ newRule.DeleteReplication.Status = Disabled
+ default:
+ return fmt.Errorf("ReplicateDeletes state should be either [enable|disable]")
+ }
+ }
+
+ if opts.ReplicaSync != "" {
+ switch opts.ReplicaSync {
+ case "enable", "":
+ newRule.SourceSelectionCriteria.ReplicaModifications.Status = Enabled
+ case "disable":
+ newRule.SourceSelectionCriteria.ReplicaModifications.Status = Disabled
+ default:
+ return fmt.Errorf("replica metadata sync should be either [enable|disable]")
+ }
+ }
+ fmt.Println("opts.ExistingObjectReplicate>", opts.ExistingObjectReplicate)
+ if opts.ExistingObjectReplicate != "" {
+ switch opts.ExistingObjectReplicate {
+ case "enable":
+ newRule.ExistingObjectReplication.Status = Enabled
+ case "disable":
+ newRule.ExistingObjectReplication.Status = Disabled
+ default:
+ return fmt.Errorf("existingObjectsReplication state should be either [enable|disable]")
+ }
+ }
+ if opts.IsSCSet {
+ newRule.Destination.StorageClass = opts.StorageClass
+ }
+ if opts.Priority != "" {
+ priority, err := strconv.Atoi(opts.Priority)
+ if err != nil {
+ return err
+ }
+ newRule.Priority = priority
+ }
+ if opts.DestBucket != "" {
+ destBucket := opts.DestBucket
+ // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html
+ if btokens := strings.Split(opts.DestBucket, ":"); len(btokens) != 6 {
+ if len(btokens) == 1 {
+ destBucket = fmt.Sprintf("arn:aws:s3:::%s", destBucket)
+ } else {
+ return fmt.Errorf("destination bucket needs to be in Arn format")
+ }
+ }
+ newRule.Destination.Bucket = destBucket
+ }
+ // validate rule
+ if err := newRule.Validate(); err != nil {
+ return err
+ }
+ // ensure priority and destination bucket restrictions are not violated
+ for idx, rule := range c.Rules {
+ if rule.Priority == newRule.Priority && rIdx != idx {
+ return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority")
+ }
+ if rule.Destination.Bucket != newRule.Destination.Bucket {
+ return fmt.Errorf("the destination bucket must be same for all rules")
+ }
+ }
+
+ c.Rules[rIdx] = newRule
+ return nil
+}
+
+// RemoveRule removes a rule from replication config.
+func (c *Config) RemoveRule(opts Options) error {
+ var newRules []Rule
+ ruleFound := false
+ for _, rule := range c.Rules {
+ if rule.ID != opts.ID {
+ newRules = append(newRules, rule)
+ continue
+ }
+ ruleFound = true
+ }
+ if !ruleFound {
+ return fmt.Errorf("Rule with ID %s not found", opts.ID)
+ }
+ if len(newRules) == 0 {
+ return fmt.Errorf("replication configuration should have at least one rule")
+ }
+ c.Rules = newRules
+ return nil
+
+}
+
+// Rule - a rule for replication configuration.
+type Rule struct {
+ XMLName xml.Name `xml:"Rule" json:"-"`
+ ID string `xml:"ID,omitempty"`
+ Status Status `xml:"Status"`
+ Priority int `xml:"Priority"`
+ DeleteMarkerReplication DeleteMarkerReplication `xml:"DeleteMarkerReplication"`
+ DeleteReplication DeleteReplication `xml:"DeleteReplication"`
+ Destination Destination `xml:"Destination"`
+ Filter Filter `xml:"Filter" json:"Filter"`
+ SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"`
+ ExistingObjectReplication ExistingObjectReplication `xml:"ExistingObjectReplication,omitempty" json:"ExistingObjectReplication,omitempty"`
+}
+
+// Validate validates the rule for correctness
+func (r Rule) Validate() error {
+ if err := r.validateID(); err != nil {
+ return err
+ }
+ if err := r.validateStatus(); err != nil {
+ return err
+ }
+ if err := r.validateFilter(); err != nil {
+ return err
+ }
+
+ if r.Priority < 0 && r.Status == Enabled {
+ return fmt.Errorf("priority must be set for the rule")
+ }
+
+ if err := r.validateStatus(); err != nil {
+ return err
+ }
+ return r.ExistingObjectReplication.Validate()
+}
+
+// validateID - checks if ID is valid or not.
+func (r Rule) validateID() error {
+ // cannot be longer than 255 characters
+ if len(r.ID) > 255 {
+ return fmt.Errorf("ID must be less than 255 characters")
+ }
+ return nil
+}
+
+// validateStatus - checks if status is valid or not.
+func (r Rule) validateStatus() error {
+ // Status can't be empty
+ if len(r.Status) == 0 {
+ return fmt.Errorf("status cannot be empty")
+ }
+
+ // Status must be one of Enabled or Disabled
+ if r.Status != Enabled && r.Status != Disabled {
+ return fmt.Errorf("status must be set to either Enabled or Disabled")
+ }
+ return nil
+}
+
+func (r Rule) validateFilter() error {
+ if err := r.Filter.Validate(); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Prefix - a rule can either have prefix under <filter></filter> or under
+// <filter><and></and></filter>. This method returns the prefix from the
+// location where it is available
+func (r Rule) Prefix() string {
+ if r.Filter.Prefix != "" {
+ return r.Filter.Prefix
+ }
+ return r.Filter.And.Prefix
+}
+
+// Tags - a rule can either have tag under <filter></filter> or under
+// <filter><and></and></filter>. This method returns all the tags from the
+// rule in the format tag1=value1&tag2=value2
+func (r Rule) Tags() string {
+ ts := []Tag{r.Filter.Tag}
+ if len(r.Filter.And.Tags) != 0 {
+ ts = r.Filter.And.Tags
+ }
+
+ var buf bytes.Buffer
+ for _, t := range ts {
+ if buf.Len() > 0 {
+ buf.WriteString("&")
+ }
+ buf.WriteString(t.String())
+ }
+ return buf.String()
+}
+
+// Filter - a filter for a replication configuration Rule.
+type Filter struct {
+ XMLName xml.Name `xml:"Filter" json:"-"`
+ Prefix string `json:"Prefix,omitempty"`
+ And And `xml:"And,omitempty" json:"And,omitempty"`
+ Tag Tag `xml:"Tag,omitempty" json:"Tag,omitempty"`
+}
+
+// Validate - validates the filter element
+func (f Filter) Validate() error {
+ // A Filter must have exactly one of Prefix, Tag, or And specified.
+ if !f.And.isEmpty() {
+ if f.Prefix != "" {
+ return errInvalidFilter
+ }
+ if !f.Tag.IsEmpty() {
+ return errInvalidFilter
+ }
+ }
+ if f.Prefix != "" {
+ if !f.Tag.IsEmpty() {
+ return errInvalidFilter
+ }
+ }
+ if !f.Tag.IsEmpty() {
+ if err := f.Tag.Validate(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Tag - a tag for a replication configuration Rule filter.
+type Tag struct {
+ XMLName xml.Name `json:"-"`
+ Key string `xml:"Key,omitempty" json:"Key,omitempty"`
+ Value string `xml:"Value,omitempty" json:"Value,omitempty"`
+}
+
+func (tag Tag) String() string {
+ if tag.IsEmpty() {
+ return ""
+ }
+ return tag.Key + "=" + tag.Value
+}
+
+// IsEmpty returns whether this tag is empty or not.
+func (tag Tag) IsEmpty() bool {
+ return tag.Key == ""
+}
+
+// Validate checks this tag.
+func (tag Tag) Validate() error {
+ if len(tag.Key) == 0 || utf8.RuneCountInString(tag.Key) > 128 {
+ return fmt.Errorf("invalid Tag Key")
+ }
+
+ if utf8.RuneCountInString(tag.Value) > 256 {
+ return fmt.Errorf("invalid Tag Value")
+ }
+ return nil
+}
+
+// Destination - destination in ReplicationConfiguration.
+type Destination struct {
+ XMLName xml.Name `xml:"Destination" json:"-"`
+ Bucket string `xml:"Bucket" json:"Bucket"`
+ StorageClass string `xml:"StorageClass,omitempty" json:"StorageClass,omitempty"`
+}
+
+// And - a tag to combine a prefix and multiple tags for replication configuration rule.
+type And struct {
+ XMLName xml.Name `xml:"And,omitempty" json:"-"`
+ Prefix string `xml:"Prefix,omitempty" json:"Prefix,omitempty"`
+ Tags []Tag `xml:"Tag,omitempty" json:"Tag,omitempty"`
+}
+
+// isEmpty returns true if Tags field is null
+func (a And) isEmpty() bool {
+ return len(a.Tags) == 0 && a.Prefix == ""
+}
+
+// Status represents Enabled/Disabled status
+type Status string
+
+// Supported status types
+const (
+ Enabled Status = "Enabled"
+ Disabled Status = "Disabled"
+)
+
+// DeleteMarkerReplication - whether delete markers are replicated - https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
+type DeleteMarkerReplication struct {
+ Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default
+}
+
+// IsEmpty returns true if DeleteMarkerReplication is not set
+func (d DeleteMarkerReplication) IsEmpty() bool {
+ return len(d.Status) == 0
+}
+
+// DeleteReplication - whether versioned deletes are replicated - this
+// is a MinIO specific extension
+type DeleteReplication struct {
+ Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default
+}
+
+// IsEmpty returns true if DeleteReplication is not set
+func (d DeleteReplication) IsEmpty() bool {
+ return len(d.Status) == 0
+}
+
+// ReplicaModifications specifies if replica modification sync is enabled
+type ReplicaModifications struct {
+ Status Status `xml:"Status" json:"Status"` // should be set to "Enabled" by default
+}
+
+// SourceSelectionCriteria - specifies additional source selection criteria in ReplicationConfiguration.
+type SourceSelectionCriteria struct {
+ ReplicaModifications ReplicaModifications `xml:"ReplicaModifications" json:"ReplicaModifications"`
+}
+
+// IsValid - checks whether SourceSelectionCriteria is valid or not.
+func (s SourceSelectionCriteria) IsValid() bool {
+ return s.ReplicaModifications.Status == Enabled || s.ReplicaModifications.Status == Disabled
+}
+
+// Validate source selection criteria
+func (s SourceSelectionCriteria) Validate() error {
+ if (s == SourceSelectionCriteria{}) {
+ return nil
+ }
+ if !s.IsValid() {
+ return fmt.Errorf("invalid ReplicaModification status")
+ }
+ return nil
+}
+
+// ExistingObjectReplication - whether existing object replication is enabled
+type ExistingObjectReplication struct {
+ Status Status `xml:"Status"` // should be set to "Disabled" by default
+}
+
+// IsEmpty returns true if DeleteMarkerReplication is not set
+func (e ExistingObjectReplication) IsEmpty() bool {
+ return len(e.Status) == 0
+}
+
+// Validate validates whether the status is disabled.
+func (e ExistingObjectReplication) Validate() error {
+ if e.IsEmpty() {
+ return nil
+ }
+ if e.Status != Disabled && e.Status != Enabled {
+ return fmt.Errorf("invalid ExistingObjectReplication status")
+ }
+ return nil
+}
+
+// Metrics represents inline replication metrics
+// such as pending, failed and completed bytes in total for a bucket
+type Metrics struct {
+ // Pending size in bytes
+ PendingSize uint64 `json:"pendingReplicationSize"`
+ // Completed size in bytes
+ ReplicatedSize uint64 `json:"completedReplicationSize"`
+ // Total Replica size in bytes
+ ReplicaSize uint64 `json:"replicaSize"`
+ // Failed size in bytes
+ FailedSize uint64 `json:"failedReplicationSize"`
+ // Total number of pending operations including metadata updates
+ PendingCount uint64 `json:"pendingReplicationCount"`
+ // Total number of failed operations including metadata updates
+ FailedCount uint64 `json:"failedReplicationCount"`
+}