Skip to content

Commit

Permalink
support migrate 7.x to 2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed Sep 5, 2019
1 parent 12ed896 commit 7dc95a9
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 16 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ migrate 5.x to 6.x and unify all the types to `doc`
```

migrate 2.x to 7.x and rename `_type` to `type`
```
./esm -s http://localhost:9201 -x "source" -y "target" -d https://localhost:9200 --rename="_type:type,age:myage" -u"_doc"
```


## Download
https://github.com/medcl/elasticsearch-dump/releases
Expand Down
14 changes: 14 additions & 0 deletions domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Document struct {
Routing string `json:"_routing,omitempty"`
}


type Scroll struct {
Took int `json:"took,omitempty"`
ScrollId string `json:"_scroll_id,omitempty"`
Expand All @@ -40,6 +41,7 @@ type Scroll struct {
Shards struct {
Total int `json:"total,omitempty"`
Successful int `json:"successful,omitempty"`
Skipped int `json:"skipped,omitempty"`
Failed int `json:"failed,omitempty"`
Failures []struct {
Shard int `json:"shard,omitempty"`
Expand All @@ -50,6 +52,18 @@ type Scroll struct {
} `json:"_shards,omitempty"`
}

type ScrollV7 struct {
Scroll
Hits struct {
MaxScore float32 `json:"max_score,omitempty"`
Total struct{
Value int `json:"value,omitempty"`
Relation string `json:"relation,omitempty"`
} `json:"total,omitempty"`
Docs []interface{} `json:"hits,omitempty"`
} `json:"hits"`
}

type ClusterVersion struct {
Name string `json:"name,omitempty"`
ClusterName string `json:"cluster_name,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions esapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ESAPI interface{
GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error)
UpdateIndexSettings(indexName string,settings map[string]interface{})(error)
UpdateIndexMapping(indexName string,mappings map[string]interface{})(error)
NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(*Scroll, error)
NextScroll(scrollTime string,scrollId string)(*Scroll,error)
NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(interface{}, error)
NextScroll(scrollTime string,scrollId string)(interface{},error)
Refresh(name string) (err error)
}
32 changes: 25 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ func main() {
if errs != nil {
return
}
if strings.HasPrefix(srcESVersion.Version.Number, "6.") {
if strings.HasPrefix(srcESVersion.Version.Number, "7.") {
log.Debug("source es is V7,", srcESVersion.Version.Number)
api := new(ESAPIV7)
api.Host = c.SourceEs
api.Auth = migrator.SourceAuth
api.HttpProxy=migrator.Config.SourceProxy
migrator.SourceESAPI = api
}else if strings.HasPrefix(srcESVersion.Version.Number, "6.") {
log.Debug("source es is V6,", srcESVersion.Version.Number)
api := new(ESAPIV5)
api.Host = c.SourceEs
Expand Down Expand Up @@ -110,11 +117,14 @@ func main() {
log.Error(err)
return
}
totalSize+=scroll.Hits.Total

if scroll != nil && scroll.Hits.Docs != nil {
temp:=scroll.(ScrollAPI)

totalSize+=temp.GetHitsTotal()

if scroll != nil && temp.GetDocs() != nil {

if scroll.Hits.Total == 0 {
if temp.GetHitsTotal() == 0 {
log.Error("can't find documents from source.")
return
}
Expand All @@ -124,10 +134,10 @@ func main() {
wg.Add(1)
//process input
// start scroll
scroll.ProcessScrollResult(&migrator, fetchBar)
temp.ProcessScrollResult(&migrator, fetchBar)

// loop scrolling until done
for scroll.Next(&migrator, fetchBar) == false {
for temp.Next(&migrator, fetchBar) == false {
}
fetchBar.Finish()
// finished, close doc chan and wait for goroutines to be done
Expand Down Expand Up @@ -198,7 +208,14 @@ func main() {
return
}

if strings.HasPrefix(descESVersion.Version.Number, "6.") {
if strings.HasPrefix(descESVersion.Version.Number, "7.") {
log.Debug("target es is V7,", descESVersion.Version.Number)
api := new(ESAPIV7)
api.Host = c.TargetEs
api.Auth = migrator.TargetAuth
api.HttpProxy=migrator.Config.TargetProxy
migrator.TargetESAPI = api
}else if strings.HasPrefix(descESVersion.Version.Number, "6.") {
log.Debug("target es is V6,", descESVersion.Version.Number)
api := new(ESAPIV5)
api.Host = c.TargetEs
Expand Down Expand Up @@ -484,3 +501,4 @@ func (c *Migrator) ClusterReady(api ESAPI) (*ClusterHealth, bool) {

return health, false
}

97 changes: 94 additions & 3 deletions scroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,50 @@ import (
)


type ScrollAPI interface{
GetScrollId()string
GetHitsTotal()int
GetDocs() []interface{}
ProcessScrollResult(c *Migrator, bar *pb.ProgressBar)
Next(c *Migrator, bar *pb.ProgressBar) (done bool)
}


func (scroll *Scroll) GetHitsTotal()int{
//fmt.Println("total v0:",scroll.Hits.Total)
return scroll.Hits.Total
}

func (scroll *Scroll) GetScrollId()string{
return scroll.ScrollId
}

func (scroll *Scroll) GetDocs()[]interface{}{

//fmt.Println("docs v0:",scroll.Hits)

return scroll.Hits.Docs
}

func (scroll *ScrollV7) GetHitsTotal()int{
//fmt.Println("total v7:",scroll.Hits.Total.Value)

return scroll.Hits.Total.Value
}


func (scroll *ScrollV7) GetScrollId()string{
return scroll.ScrollId
}

func (scroll *ScrollV7) GetDocs()[]interface{}{

//fmt.Println("docs v7:",scroll.Hits)

return scroll.Hits.Docs
}


// Stream from source es instance. "done" is an indicator that the stream is
// over
func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){
Expand All @@ -38,6 +82,7 @@ func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){

// write all the docs into a channel
for _, docI := range s.Hits.Docs {
//fmt.Println(docI)
c.DocChan <- docI.(map[string]interface{})
}
}
Expand All @@ -50,16 +95,62 @@ func (s *Scroll) Next(c *Migrator, bar *pb.ProgressBar) (done bool) {
return false
}

if scroll.Hits.Docs == nil || len(scroll.Hits.Docs) <= 0 {
docs:=scroll.(ScrollAPI).GetDocs()
if docs == nil || len(docs) <= 0 {
log.Debug("scroll result is empty")
return true
}

scroll.(ScrollAPI).ProcessScrollResult(c,bar)

//update scrollId
s.ScrollId=scroll.(ScrollAPI).GetScrollId()

return
}



// Stream from source es instance. "done" is an indicator that the stream is
// over
func (s *ScrollV7) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){

//update progress bar
bar.Add(len(s.Hits.Docs))

// show any failures
for _, failure := range s.Shards.Failures {
reason, _ := json.Marshal(failure.Reason)
log.Errorf(string(reason))
}

// write all the docs into a channel
for _, docI := range s.Hits.Docs {
//fmt.Println(docI)
c.DocChan <- docI.(map[string]interface{})
}
}

func (s *ScrollV7) Next(c *Migrator, bar *pb.ProgressBar) (done bool) {

scroll,err:=c.SourceESAPI.NextScroll(c.Config.ScrollTime,s.ScrollId)
if err != nil {
log.Error(err)
return false
}

docs:=scroll.(ScrollAPI).GetDocs()
if docs == nil || len(docs) <= 0 {
log.Debug("scroll result is empty")
return true
}

scroll.ProcessScrollResult(c,bar)
scroll.(ScrollAPI).ProcessScrollResult(c,bar)

//update scrollId
s.ScrollId=scroll.ScrollId
s.ScrollId=scroll.(ScrollAPI).GetScrollId()

return
}


5 changes: 3 additions & 2 deletions v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (s *ESAPIV0) ClusterHealth() *ClusterHealth {

func (s *ESAPIV0) Bulk(data *bytes.Buffer) {
if data == nil || data.Len() == 0 {
log.Error("data is empty, skip")
return
}
data.WriteRune('\n')
Expand Down Expand Up @@ -327,7 +328,7 @@ func (s *ESAPIV0) Refresh(name string) (err error) {
return nil
}

func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int,query string, slicedId,maxSlicedCount int, fields string) (scroll *Scroll, err error) {
func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int,query string, slicedId,maxSlicedCount int, fields string) (scroll interface{}, err error) {

// curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50'
url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
Expand Down Expand Up @@ -392,7 +393,7 @@ func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount
return scroll, err
}

func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (*Scroll, error) {
func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (interface{}, error) {
// curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m'
id := bytes.NewBufferString(scrollId)
url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
Expand Down
4 changes: 2 additions & 2 deletions v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *ESAPIV5) Refresh(name string) (err error) {
return s.ESAPIV0.Refresh(name)
}

func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(scroll *Scroll, err error){
func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(scroll interface{}, err error){
url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount)

jsonBody:=""
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount i
return scroll,err
}

func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(*Scroll,error) {
func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(interface{},error) {
id := bytes.NewBufferString(scrollId)

url:=fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
Expand Down
Loading

0 comments on commit 7dc95a9

Please sign in to comment.