圖解kubernetes中etcd增刪改查的工業實現

kubernetes中基於etcd實現集中的數據存儲,今天來學習下基於etcd如何實現數據讀取一致性、更新一致性、事務的具體實現

1. 數據的存儲與版本

1.1 數據存儲的轉換

圖解kubernetes中etcd增刪改查的工業實現

在k8s中有部分數據的存儲是需要經過處理之後才能存儲的,比如secret這種加密的數據,既然要存儲就至少包含兩個操作,加密存儲,解密讀取,transformer就是為了完成該操作而實現的,其在進行etcd數據存儲的時候回對數據進行加密,而在讀取的時候,則會進行解密

1.2 資源版本revision

圖解kubernetes中etcd增刪改查的工業實現

在etcd中進行修改(增刪改)操作的時候,都會遞增revision,而在k8s中也通過該值來作為k8s資源的ResourceVersion,該機制也是實現watch的關鍵機制,在操作etcd解碼從etcd獲取的數據的時候,會通過versioner組件來為資源動態的修改該值

1.3 數據模型的映射

圖解kubernetes中etcd增刪改查的工業實現

將數據從etcd中讀取後,數據本身就是一個字節數組,如何將對應的數據轉換成我們真正的運行時對象呢?還記得我們之前的scheme與codec麼,在這裡我們知道對應的數據編碼格式,也知道資源對象的類型,則通過codec、字節數組、目標類型,我們就可以完成對應數據的反射

2. 查詢接口一致性

圖解kubernetes中etcd增刪改查的工業實現

etcd中的數據寫入是基於leader單點寫入和集群quorum機制實現的,並不是一個強一致性的數據寫入,則如果如果我們訪問的節點不存在quorum的半數節點內,則可能造成短暫的數據不一致,針對一些強一致的場景,我們可以通過其revision機制來進行數據的讀取, 保證我們讀取到更新之後的數據

<code>// 省略非核心代碼
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {

\t// 獲取key
\tgetResp, err := s.client.KV.Get(ctx, key, s.getOps...)

// 檢測當前版本,是否達到最小版本的
\tif err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
\t\treturn err
\t}

\t// 執行數據轉換
\tdata, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
\tif err != nil {
\t\treturn storage.NewInternalError(err.Error())
\t}
\t// 解碼數據
\treturn decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

/<code>

3. 創建接口實現

創建一個接口數據則會首先進行資源對象的檢查,避免重複創建對象,此時會先通過資源對象的version字段來進行初步檢查,然後在利用etcd的事務機制來保證資源創建的原子性操作

<code>// 省略非核心代碼
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
\tif version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
\t\treturn errors.New("resourceVersion should not be set on objects to be created")
\t}
\tif err := s.versioner.PrepareObjectForStorage(obj); err != nil {
\t\treturn fmt.Errorf("PrepareObjectForStorage failed: %v", err)
\t}
\t// 將數據編碼
\tdata, err := runtime.Encode(s.codec, obj)
\tif err != nil {
\t\treturn err
\t}
\t
\t// 轉換數據

\tnewData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
\tif err != nil {
\t\treturn storage.NewInternalError(err.Error())
\t}

\tstartTime := time.Now()
// 事務操作
\ttxnResp, err := s.client.KV.Txn(ctx).If(
\t\tnotFound(key), // 如果之前不存在 這裡是利用的etcd的ModRevision即修改版本為0, 寓意著對應的key不存在
\t).Then(
\t\tclientv3.OpPut(key, string(newData), opts...), // put修改數據
\t).Commit()
\tmetrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
\tif err != nil {
\t\treturn err
\t}
\tif !txnResp.Succeeded {
\t\treturn storage.NewKeyExistsError(key, 0)
\t}

\tif out != nil {
// 獲取對應的Revision
\t\tputResp := txnResp.Responses[0].GetResponsePut()
\t\treturn decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
\t}
\treturn nil
}

func notFound(key string) clientv3.Cmp {
\treturn clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}
/<code>

4. 刪除接口的實現

圖解kubernetes中etcd增刪改查的工業實現

刪除接口主要是通過CAS和事務機制來共同實現,確保在etcd不發生異常的情況,即使併發對同個資源來進行刪除操作也能保證至少有一個節點成功

<code>// 省略非核心代碼
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
\tstartTime := time.Now()
\t// 獲取當前的key的數據
\tgetResp, err := s.client.KV.Get(ctx, key)
\tfor {
\t\t// 獲取當前的狀態
\t\torigState, err := s.getState(getResp, key, v, false)
\t\tif err != nil {
\t\t\treturn err
\t\t}
\t\ttxnResp, err := s.client.KV.Txn(ctx).If(
\t\t\tclientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 如果修改版本等於當前狀態,就嘗試刪除
\t\t).Then(
\t\t\tclientv3.OpDelete(key), // 刪除
\t\t).Else(
\t\t\tclientv3.OpGet(key),\t// 獲取
\t\t).Commit()
\t\tif !txnResp.Succeeded {
\t\t\t// 獲取最新的數據重試事務操作
\t\t\tgetResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
\t\t\tklog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
\t\t\tcontinue
\t\t}
\t\t// 將最後一個版本的數據解碼到out裡面,然後返回
\t\treturn decode(s.codec, s.versioner, origState.data, out, origState.rev)
\t}
}
/<code>

5. 更新接口的實現

圖解kubernetes中etcd增刪改查的工業實現

更新接口實現上與刪除接口並無本質上的差別,但是如果多個節點同時進行更新,CAS併發操作必然會有一個節點成功,當發現已經有節點操作成功,則當前節點其實並不需要再做過多的操作,直接返回即可

<code>// 省略非核心代碼
func (s *store) GuaranteedUpdate(
\tctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
\tpreconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
\t// 獲取當前key的最新數據
\tgetCurrentState := func() (*objState, error) {
\t\tstartTime := time.Now()
\t\tgetResp, err := s.client.KV.Get(ctx, key, s.getOps...)

\t\tmetrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
\t\tif err != nil {
\t\t\treturn nil, err
\t\t}
\t\treturn s.getState(getResp, key, v, ignoreNotFound)
\t}

\t// 獲取當前數據
\tvar origState *objState
\tvar mustCheckData bool
\tif len(suggestion) == 1 && suggestion[0] != nil {
\t\t// 如果提供了建議的數據,則會使用,
\t\torigState, err = s.getStateFromObject(suggestion[0])
\t\tif err != nil {
\t\t\treturn err
\t\t}
\t\t//但是需要檢測數據
\t\tmustCheckData = true
\t} else {
\t\t// 嘗試重新獲取數據
\t\torigState, err = getCurrentState()
\t\tif err != nil {
\t\t\treturn err
\t\t}
\t}

\ttransformContext := authenticatedDataString(key)
\tfor {
\t\t// 檢查對象是否已經更新, 主要是通過檢測uuid/revision來實現
\t\tif err := preconditions.Check(key, origState.obj); err != nil {
\t\t\t// If our data is already up to date, return the error
\t\t\tif !mustCheckData {
\t\t\t\treturn err
\t\t\t}
\t\t\t// 如果檢查數據一致性錯誤,則需要重新獲取
\t\t\torigState, err = getCurrentState()
\t\t\tif err != nil {
\t\t\t\treturn err
\t\t\t}
\t\t\tmustCheckData = false
\t\t\t// Retry
\t\t\tcontinue
\t\t}


\t\t// 刪除當前的版本數據revision
\t\tret, ttl, err := s.updateState(origState, tryUpdate)
\t\tif err != nil {
\t\t\t// If our data is already up to date, return the error
\t\t\tif !mustCheckData {
\t\t\t\treturn err
\t\t\t}

\t\t\t// It's possible we were working with stale data
\t\t\t// Actually fetch
\t\t\torigState, err = getCurrentState()
\t\t\tif err != nil {
\t\t\t\treturn err
\t\t\t}
\t\t\tmustCheckData = false
\t\t\t// Retry
\t\t\tcontinue
\t\t}

\t\t// 編碼數據
\t\tdata, err := runtime.Encode(s.codec, ret)
\t\tif err != nil {
\t\t\treturn err
\t\t}
\t\tif !origState.stale && bytes.Equal(data, origState.data) {
\t\t\t// 如果我們發現我們當前的數據與獲取到的數據一致,則會直接跳過
\t\t\tif mustCheckData {
\t\t\t\torigState, err = getCurrentState()
\t\t\t\tif err != nil {
\t\t\t\t\treturn err
\t\t\t\t}
\t\t\t\tmustCheckData = false
\t\t\t\tif !bytes.Equal(data, origState.data) {
\t\t\t\t\t// original data changed, restart loop
\t\t\t\t\tcontinue
\t\t\t\t}
\t\t\t}
\t\t\tif !origState.stale {
// 直接返回數據
\t\t\t\treturn decode(s.codec, s.versioner, origState.data, out, origState.rev)
\t\t\t}
\t\t}

\t\t// 磚漢數據

\t\tnewData, err := s.transformer.TransformToStorage(data, transformContext)
\t\tif err != nil {
\t\t\treturn storage.NewInternalError(err.Error())
\t\t}

\t\topts, err := s.ttlOpts(ctx, int64(ttl))
\t\tif err != nil {
\t\t\treturn err
\t\t}
\t\ttrace.Step("Transaction prepared")

\t\tstartTime := time.Now()
\t\t// 事務更新數據
\t\ttxnResp, err := s.client.KV.Txn(ctx).If(
\t\t\tclientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
\t\t).Then(
\t\t\tclientv3.OpPut(key, string(newData), opts...),
\t\t).Else(
\t\t\tclientv3.OpGet(key),
\t\t).Commit()
\t\tmetrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
\t\tif err != nil {
\t\t\treturn err
\t\t}
\t\ttrace.Step("Transaction committed")
\t\tif !txnResp.Succeeded {
\t\t\t// 重新獲取數據
\t\t\tgetResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
\t\t\tklog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
\t\t\torigState, err = s.getState(getResp, key, v, ignoreNotFound)
\t\t\tif err != nil {
\t\t\t\treturn err
\t\t\t}
\t\t\ttrace.Step("Retry value restored")
\t\t\tmustCheckData = false
\t\t\tcontinue
\t\t}
\t\t// 獲取put響應
\t\tputResp := txnResp.Responses[0].GetResponsePut()

\t\treturn decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
\t}
}
/<code>

6. 未曾講到的地方

transformer的實現和註冊地方我並沒有找到,只看到了幾個覆蓋資源類型的地方,還有list/watch接口,後續再繼續學習,今天就先到這裡,下次再見


分享到:


相關文章: