Skip to content

Latest commit

 

History

History
371 lines (322 loc) · 14.2 KB

5.15-xu-lie-hua-qi-yu-xu-lie-hua-gong-chang-3.md

File metadata and controls

371 lines (322 loc) · 14.2 KB

5.14 序列化器与序列化工厂2

Codec 接口都在 staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go 目录下,我们来分析下

首先我们看到的是:

const (
    // APIVersionInternal may be used if you are registering a type that should not
    // be considered stable or serialized - it is a convention only and has no
    // special behavior in this package.
    APIVersionInternal = "__internal"
)

这个 APIVersionInternal 是用来标识内部版本的

然后我们看到:

// Encoder writes objects to a serialized form
type Encoder interface {
    // Encode writes an object to a stream. Implementations may return errors if the versions are
    // incompatible, or if no conversion is defined.
    Encode(obj Object, w io.Writer) error
    // Identifier returns an identifier of the encoder.
    // Identifiers of two different encoders should be equal if and only if for every input
    // object it will be encoded to the same representation by both of them.
    //
    // Identifier is intended for use with CacheableObject#CacheEncode method. In order to
    // correctly handle CacheableObject, Encode() method should look similar to below, where
    // doEncode() is the encoding logic of implemented encoder:
    //   func (e *MyEncoder) Encode(obj Object, w io.Writer) error {
    //     if co, ok := obj.(CacheableObject); ok {
    //       return co.CacheEncode(e.Identifier(), e.doEncode, w)
    //     }
    //     return e.doEncode(obj, w)
    //   }
    Identifier() Identifier
}

这个 Encoder 有 Encode 接口是正常的,那为什么会多出来了个 Identifier 接口呢?

我查看了 git history ,发现是 kubernetes/kubernetes#81914 这个 pr 加的,那这个pr 是做什么用的呢? Cache serializations across watchers !

之前在 client-go 中我们一个事件有可能发送给多个 watchers ,如果为每个 watcher 分配一个序列化器是不是很浪费资源啊! 此PR 搞了个缓存,并且当多个观察者期望相同的序列化格式时候,重用之!在 5k 测试中,我们节省了约 15% 的内存和5% 的 CPU。

cache serializations across watchers

扩展Encoder interface 中加上一个方法, 所有实现 Encoder 接口的对象必须要有身份喽。

// Identitier of two different objects should be equal if and only if for every
// input the output they produce is exactly the same.
type Identifier string

// Encoder writes objects to a serialized form
type Encoder interface {
	// Encode writes an object to a stream. Implementations may return errors if the versions are
	// incompatible, or if no conversion is defined.
	Encode(obj Object, w io.Writer) error
	// Identifier returns an identifier of the encoder.
	// Identifiers of two different encoders should be equal if and only if for every input
	// object it will be encoded to the same representation by both of them.
	//
	// Identifier is inteted for use with CacheableObject#CacheEncode method. In order to
	// correctly handle CacheableObject, Encode() method should look similar to below, where
	// doEncode() is the encoding logic of implemented encoder:
	//   func (e *MyEncoder) Encode(obj Object, w io.Writer) error {
	//     if co, ok := obj.(CacheableObject); ok {
	//       return co.CacheEncode(e.Identifier(), e.doEncode, w)
	//     }
	//     return e.doEncode(obj, w)
	//   }
	Identifier() Identifier
}

添加一个 CacheableObject interface

// CacheableObject allows an object to cache its different serializations
// to avoid performing the same serialization multiple times.
type CacheableObject interface {
	// CacheEncode writes an object to a stream. The <encode> function will
	// be used in case of cache miss. The <encode> function takes ownership
	// of the object.
	// If CacheableObject is a wrapper, then deep-copy of the wrapped object
	// should be passed to <encode> function.
	// CacheEncode assumes that for two different calls with the same <id>,
	// <encode> function will also be the same.
	CacheEncode(id Identifier, encode func(Object, io.Writer) error, w io.Writer) error
	// GetObject returns a deep-copy of an object to be encoded - the caller of
	// GetObject() is the owner of returned object. The reason for making a copy
	// is to avoid bugs, where caller modifies the object and forgets to copy it,
	// thus modifying the object for everyone.
	// The object returned by GetObject should be the same as the one that is supposed
	// to be passed to <encode> function in CacheEncode method.
	// If CacheableObject is a wrapper, the copy of wrapped object should be returned.
	GetObject() Object
}

将 Identifier 方法添加到 GroupVersioner 接口

// GroupVersioner refines a set of possible conversion targets into a single option.
type GroupVersioner interface {
	// KindForGroupVersionKinds returns a desired target group version kind for the given input, or returns ok false if no
	// target is known. In general, if the return target is not in the input list, the caller is expected to invoke
	// Scheme.New(target) and then perform a conversion between the current Go type and the destination Go type.
	// Sophisticated implementations may use additional information about the input kinds to pick a destination kind.
	KindForGroupVersionKinds(kinds []schema.GroupVersionKind) (target schema.GroupVersionKind, ok bool)
	// Identifier returns string representation of the object.
	// Identifiers of two different encoders should be equal only if for every input
	// kinds they return the same result.
	Identifier() string
}

实现Encoder.Identifier() 方法

// json 实现
// identifier computes Identifier of Encoder based on the given options.
func identifier(options SerializerOptions) runtime.Identifier {
	result := map[string]string{
		"name":   "json",
		"yaml":   strconv.FormatBool(options.Yaml),
		"pretty": strconv.FormatBool(options.Pretty),
	}
	identifier, err := json.Marshal(result)
	if err != nil {
		klog.Fatalf("Failed marshaling identifier for json Serializer: %v", err)
	}
	return runtime.Identifier(identifier)
}

// protobuf 实现

const serializerIdentifier runtime.Identifier = "protobuf"
// Identifier implements runtime.Encoder interface.
func (s *Serializer) Identifier() runtime.Identifier {
	return serializerIdentifier
}

// version 实现 
// identifier computes Identifier of Encoder based on codec parameters.
func identifier(encodeGV runtime.GroupVersioner, encoder runtime.Encoder) runtime.Identifier {
	result := map[string]string{
		"name": "versioning",
	}
	if encodeGV != nil {
		result["encodeGV"] = encodeGV.Identifier()
	}
	if encoder != nil {
		result["encoder"] = string(encoder.Identifier())
	}
	identifier, err := json.Marshal(result)
	if err != nil {
		klog.Fatalf("Failed marshaling identifier for codec: %v", err)
	}
	return runtime.Identifier(identifier)
}

实现对 CacheableObject 的支持,如果这个对象实现了 CacheableObject 接口,在Encode 时候就调用 CacheEncode 方法

func (s base64Serializer) Encode(obj Object, stream io.Writer) error {
	if co, ok := obj.(CacheableObject); ok {
		return co.CacheEncode(s.Identifier(), s.doEncode, stream)
	}
	return s.doEncode(obj, stream)
}

CachingObject 缓存对象

在 staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go 文件下实现 cachingObject 对象.

一个runtime.Object 对象通过调用 newCachingObject 方法就能变成 cachingObject 对象,也就拥有了能够被编码缓存的能力,这里有用到设计模式之装饰器哦。

// cachingObject is an object that is able to cache its serializations
// so that each of those is computed exactly once.
//
// cachingObject implements the metav1.Object interface (accessors for
// all metadata fields).
type cachingObject struct {
	lock sync.RWMutex

	// deepCopied defines whether the object below has already been
	// deep copied. The operation is performed lazily on the first
	// setXxx operation.
	//
	// The lazy deep-copy make is useful, as effectively the only
	// case when we are setting some fields are ResourceVersion for
	// DELETE events, so in all other cases we can effectively avoid
	// performing any deep copies.
	deepCopied bool

	// Object for which serializations are cached.
	object metaRuntimeInterface

	// serializations is a cache containing object`s serializations.
	// The value stored in atomic.Value is of type serializationsCache.
	// The atomic.Value type is used to allow fast-path.
	serializations atomic.Value
}

// newCachingObject performs a deep copy of the given object and wraps it
// into a cachingObject.
// An error is returned if it's not possible to cast the object to
// metav1.Object type.
func newCachingObject(object runtime.Object) (*cachingObject, error) {
	if obj, ok := object.(metaRuntimeInterface); ok {
		result := &cachingObject{
			object:     obj,
			deepCopied: false,
		}
		result.serializations.Store(make(serializationsCache))
		return result, nil
	}
	return nil, fmt.Errorf("can't cast object to metav1.Object: %#v", object)
}

核心代码逻辑如下:(此方法会在 Encode 阶段被调用)

// CacheEncode implements runtime.CacheableObject interface.
// It serializes the object and writes the result to given io.Writer trying
// to first use the already cached result and falls back to a given encode
// function in case of cache miss.
// It assumes that for a given identifier, the encode function always encodes
// each input object into the same output format.
func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
	result := o.getSerializationResult(id)
	result.once.Do(func() {
		buffer := bytes.NewBuffer(nil)
		// TODO(wojtek-t): This is currently making a copy to avoid races
		//   in cases where encoding is making subtle object modifications,
		//   e.g. #82497
		//   Figure out if we can somehow avoid this under some conditions.
		result.err = encode(o.GetObject(), buffer)
		result.raw = buffer.Bytes()
	})
	// Once invoked, fields of serialization will not change.
	if result.err != nil {
		return result.err
	}
	if b, support := w.(runtime.Splice); support {
		b.Splice(result.raw)
		return nil
	}
	_, err := w.Write(result.raw)
	return err
}

最后我们需要在 staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go 文件下,添加分发判断逻辑:

func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
	c.startDispatching(event)
	defer c.finishDispatching()
	// Watchers stopped after startDispatching will be delayed to finishDispatching,

	// Since add() can block, we explicitly add when cacher is unlocked.
	// Dispatching event in nonblocking way first, which make faster watchers
	// not be blocked by slower ones.
	if event.Type == watch.Bookmark {
		for _, watcher := range c.watchersBuffer {
			watcher.nonblockingAdd(event)
		}
	} else {
		// Set up caching of object serializations only for dispatching this event.
		//
		// Storing serializations in memory would result in increased memory usage,
		// but it would help for caching encodings for watches started from old
		// versions. However, we still don't have a convincing data that the gain
		// from it justifies increased memory usage, so for now we drop the cached
		// serializations after dispatching this event.
		//
		// Given that CachingObject is just wrapping the object and not perfoming
		// deep-copying (until some field is explicitly being modified), we create
		// it unconditionally to ensure safety and reduce deep-copying.
		//
		// Make a shallow copy to allow overwriting Object and PrevObject.
		wcEvent := *event
		setCachingObjects(&wcEvent, c.versioner)
		event = &wcEvent

		c.blockedWatchers = c.blockedWatchers[:0]
		for _, watcher := range c.watchersBuffer {
			if !watcher.nonblockingAdd(event) {
				c.blockedWatchers = append(c.blockedWatchers, watcher)
			}
		}

		if len(c.blockedWatchers) > 0 {
			// dispatchEvent is called very often, so arrange
			// to reuse timers instead of constantly allocating.
			startTime := time.Now()
			timeout := c.dispatchTimeoutBudget.takeAvailable()
			c.timer.Reset(timeout)

			// Send event to all blocked watchers. As long as timer is running,
			// `add` will wait for the watcher to unblock. After timeout,
			// `add` will not wait, but immediately close a still blocked watcher.
			// Hence, every watcher gets the chance to unblock itself while timer
			// is running, not only the first ones in the list.
			timer := c.timer
			for _, watcher := range c.blockedWatchers {
				if !watcher.add(event, timer) {
					// fired, clean the timer by set it to nil.
					timer = nil
				}
			}

			// Stop the timer if it is not fired
			if timer != nil && !timer.Stop() {
				// Consume triggered (but not yet received) timer event
				// so that future reuse does not get a spurious timeout.
				<-timer.C
			}

			c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime))
		}
	}
}


func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
	switch event.Type {
	case watch.Added, watch.Modified:
		if object, err := newCachingObject(event.Object); err == nil {
			event.Object = object
		} else {
			klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
		}
		// Don't wrap PrevObject for update event (for create events it is nil).
		// We only encode those to deliver DELETE watch events, so if
		// event.Object is not nil it can be used only for watchers for which
		// selector was satisfied for its previous version and is no longer
		// satisfied for the current version.
		// This is rare enough that it doesn't justify making deep-copy of the
		// object (done by newCachingObject) every time.
	case watch.Deleted:
		// Don't wrap Object for delete events - these are not to deliver any
		// events. Only wrap PrevObject.
		if object, err := newCachingObject(event.PrevObject); err == nil {
			// Update resource version of the underlying object.
			// event.PrevObject is used to deliver DELETE watch events and
			// for them, we set resourceVersion to <current> instead of
			// the resourceVersion of the last modification of the object.
			updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
			event.PrevObject = object
		} else {
			klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
		}
	}
}