split clone and pull in their own worker

This commit is contained in:
Massaki Archambault 2020-12-30 21:18:18 -05:00
parent 66fbf713d1
commit 87bc3976b1
7 changed files with 283 additions and 174 deletions

View File

@ -34,6 +34,7 @@ git:
# Must be set to either "init", "no-checkout" or "checkout".
# If set to "init", the local clone will be initialized with `git init` and set to track the default branch. (fastest)
# If set to "init-pull", the local clone will be initialized with `git init` and will than be followed up by an asynchronous `git pull`. (faster)
# If set to "no-checkout", the local clone will be initialized with `git clone --no-checkout`. (slower)
# If set to "checkout", the local clone will be initialized with `git clone`. (slowest)
# NOTE: If set to "init" or "no-checkout", the local clone will appear empty. Running `git pull` will download the files from the git server.
@ -46,4 +47,16 @@ git:
auto_pull: false
# The depth of the git history to pull. Set to 0 to pull the full history.
depth: 0
depth: 1
# The number of `git clone` operation that can be queued up
clone_queue_size: 200
# The number of parallel `git clone` operation that is allowed to run at once
clone_worker_count: 5
# The number of `git pull` operation that can be queued up
pull_queue_size: 500
# The number of parallel `git pull` operation that is allowed to run at once
pull_worker_count: 5

View File

@ -1,45 +1,86 @@
package git
import (
"errors"
"net/url"
"os"
"path/filepath"
"strconv"
)
type GitClonerPuller interface {
CloneOrPull(url string, pid int, defaultBranch string) (localRepoLoc string, err error)
}
type GitClientParam struct {
CloneLocation string
RemoteName string
RemoteURL *url.URL
Fetch bool
PullAfterClone bool
Clone bool
Checkout bool
PullDepth int
AutoPull bool
ChanBuffSize int
ChanWorkerCount int
CloneBuffSize int
CloneWorkerCount int
PullBuffSize int
PullWorkerCount int
}
type gitClient struct {
GitClientParam
clonePullChan chan *gitClonePullParam
cloneChan chan *gitCloneParam
pullChan chan *gitPullParam
}
func NewClient(p GitClientParam) (*gitClient, error) {
if p.ChanBuffSize == 0 {
p.ChanBuffSize = 500
}
if p.ChanWorkerCount == 0 {
p.ChanWorkerCount = 5
}
// Create the client
c := &gitClient{
GitClientParam: p,
clonePullChan: make(chan *gitClonePullParam, p.ChanBuffSize),
cloneChan: make(chan *gitCloneParam, p.CloneBuffSize),
pullChan: make(chan *gitPullParam, p.PullBuffSize),
}
// Start worker goroutines
for i := 0; i < p.ChanWorkerCount; i++ {
go c.clonePullWorker()
for i := 0; i < p.CloneWorkerCount; i++ {
go c.cloneWorker()
}
for i := 0; i < p.PullWorkerCount; i++ {
go c.pullWorker()
}
return c, nil
}
func (c *gitClient) getLocalRepoLoc(pid int) string {
return filepath.Join(c.CloneLocation, c.RemoteURL.Hostname(), strconv.Itoa(pid))
}
func (c *gitClient) CloneOrPull(url string, pid int, defaultBranch string) (localRepoLoc string, err error) {
localRepoLoc = c.getLocalRepoLoc(pid)
// TODO: Better manage concurrency, filter out duplicate requests
if _, err := os.Stat(localRepoLoc); os.IsNotExist(err) {
// Dispatch to clone worker
select {
case c.cloneChan <- &gitCloneParam{
url: url,
defaultBranch: defaultBranch,
dst: localRepoLoc,
}:
default:
return localRepoLoc, errors.New("failed to clone local repo")
}
} else if c.AutoPull {
// Dispatch to pull worker
select {
case c.pullChan <- &gitPullParam{
repoPath: localRepoLoc,
defaultBranch: defaultBranch,
}:
default:
return localRepoLoc, errors.New("failed to pull local repo")
}
}
return localRepoLoc, nil
}

101
git/clone.go Normal file
View File

@ -0,0 +1,101 @@
package git
import (
"errors"
"fmt"
"os"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/config"
"github.com/go-git/go-git/v5/plumbing"
)
type gitCloneParam struct {
url string
defaultBranch string
dst string
}
func (c *gitClient) cloneWorker() {
fmt.Println("Started git cloner worker routine")
for gcp := range c.cloneChan {
if _, err := os.Stat(gcp.dst); os.IsNotExist(err) {
if err := c.clone(gcp); err != nil {
fmt.Println(err)
}
}
}
}
func (c *gitClient) clone(gcp *gitCloneParam) error {
branchRef := plumbing.NewBranchReferenceName(gcp.defaultBranch)
if c.Clone {
// Clone the repo
// TODO: figure out why this operation is so memory intensive...
fmt.Printf("Cloning %v into %v\n", gcp.url, gcp.dst)
// fs := osfs.New(gcp.dst)
// storer := filesystem.NewStorage(fs, cache.NewObjectLRU(0))
_, err := git.PlainClone(gcp.dst, false, &git.CloneOptions{
URL: gcp.url,
RemoteName: c.RemoteName,
ReferenceName: branchRef,
NoCheckout: !c.Checkout,
Depth: c.PullDepth,
})
if err != nil {
return fmt.Errorf("failed to clone git repo %v to %v: %v", gcp.url, gcp.dst, err)
}
} else {
// "Fake" cloning the repo by never actually talking to the git server
// This skip a fetch operation that we would do if we where to do a proper clone
// We can save a lot of time and network i/o doing it this way, at the cost of
// resulting in a very barebone local copy
fmt.Printf("Initializing %v into %v\n", gcp.url, gcp.dst)
r, err := git.PlainInit(gcp.dst, false)
if err != nil {
return fmt.Errorf("failed to clone git repo %v to %v: %v", gcp.url, gcp.dst, err)
}
// Configure the remote
_, err = r.CreateRemote(&config.RemoteConfig{
Name: c.RemoteName,
URLs: []string{gcp.url},
})
if err != nil {
return fmt.Errorf("failed to setup remote %v in git repo %v: %v", gcp.url, gcp.dst, err)
}
// Configure a local branch to track the remote branch
err = r.CreateBranch(&config.Branch{
Name: gcp.defaultBranch,
Remote: c.RemoteName,
Merge: plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", gcp.defaultBranch)),
})
if err != nil {
return fmt.Errorf("failed to create branch %v of git repo %v: %v", gcp.defaultBranch, gcp.dst, err)
}
// Checkout the default branch
w, err := r.Worktree()
if err != nil {
return fmt.Errorf("failed to retrieve worktree of git repo %v: %v", gcp.dst, err)
}
w.Checkout(&git.CheckoutOptions{
Branch: branchRef,
})
}
if c.PullAfterClone {
// Dispatch to pull worker
select {
case c.pullChan <- &gitPullParam{
repoPath: gcp.dst,
defaultBranch: gcp.defaultBranch,
}:
default:
return errors.New("failed to pull local repo after clone")
}
}
return nil
}

60
git/pull.go Normal file
View File

@ -0,0 +1,60 @@
package git
import (
"fmt"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
)
type gitPullParam struct {
repoPath string
defaultBranch string
}
func (c *gitClient) pullWorker() {
fmt.Println("Started git puller worker routine")
for gpp := range c.pullChan {
if err := c.pull(gpp); err != nil {
fmt.Println(err)
}
}
}
func (c *gitClient) pull(gpp *gitPullParam) error {
r, err := git.PlainOpen(gpp.repoPath)
if err != nil {
return fmt.Errorf("failed to open git repo %v: %v", gpp.repoPath, err)
}
w, err := r.Worktree()
if err != nil {
return fmt.Errorf("failed to retrieve worktree of git repo %v: %v", gpp.repoPath, err)
}
// Check if the local repo is on default branch
headRef, err := r.Head()
if err != nil {
// We ignore "reference not found" as this occurs when the local branch
// has never been checked out when we are in init-pull mode
if err.Error() != "reference not found" {
return fmt.Errorf("failed to retrieve HEAD of git repo %v: %v", gpp.repoPath, err)
}
} else {
branchRef := plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", gpp.defaultBranch))
if headRef.Name() != branchRef {
// default branch is not checked out, nothing to do
fmt.Printf("Repo %v is not on default branch (%v != %v), skipping pull\n", gpp.repoPath, branchRef, headRef.Name())
return nil
}
}
// Pull the remote
// TODO: Just like clone, this is very memory intensive for some reasons...
fmt.Printf("Pulling %v\n", gpp.repoPath)
if err := w.Pull(&git.PullOptions{RemoteName: c.RemoteName, Depth: c.PullDepth}); err != nil {
return fmt.Errorf("failed to pull git repo %v: %v", gpp.repoPath, err)
}
return nil
}

View File

@ -1,129 +0,0 @@
package git
import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"github.com/go-git/go-billy/v5/osfs"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/config"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/cache"
"github.com/go-git/go-git/v5/storage/filesystem"
)
type GitClonerPuller interface {
CloneOrPull(url string, pid int, defaultBranch string) (localRepoLoc string, err error)
}
type gitClonePullParam struct {
url string
defaultBranch string
dst string
}
func (c *gitClient) getLocalRepoLoc(pid int) string {
return filepath.Join(c.CloneLocation, c.RemoteURL.Hostname(), strconv.Itoa(pid))
}
func (c *gitClient) CloneOrPull(url string, pid int, defaultBranch string) (localRepoLoc string, err error) {
localRepoLoc = c.getLocalRepoLoc(pid)
select {
case c.clonePullChan <- &gitClonePullParam{
url: url,
defaultBranch: defaultBranch,
dst: localRepoLoc,
}:
default:
return localRepoLoc, errors.New("failed to clone/pull local repo")
}
return localRepoLoc, nil
}
func (c *gitClient) clonePullWorker() {
fmt.Println("Started git cloner/puller worker routine")
for gpp := range c.clonePullChan {
if _, err := os.Stat(gpp.dst); os.IsNotExist(err) {
if err := c.clone(gpp); err != nil {
fmt.Println(err)
}
} else if c.AutoPull {
if err := c.pull(gpp); err != nil {
fmt.Println(err)
}
}
}
}
func (c *gitClient) clone(gpp *gitClonePullParam) error {
branchRef := plumbing.NewBranchReferenceName(gpp.defaultBranch)
if c.Fetch {
// Clone the repo
// TODO: figure out why this operation is so memory intensive...
fmt.Printf("Cloning %v into %v\n", gpp.url, gpp.dst)
fs := osfs.New(gpp.dst)
storer := filesystem.NewStorage(fs, cache.NewObjectLRU(0))
_, err := git.Clone(storer, fs, &git.CloneOptions{
URL: gpp.url,
RemoteName: c.RemoteName,
ReferenceName: branchRef,
NoCheckout: !c.Checkout,
Depth: c.PullDepth,
})
if err != nil {
return fmt.Errorf("failed to clone git repo %v to %v: %v", gpp.url, gpp.dst, err)
}
} else {
// "Fake" cloning the repo by never actually talking to the git server
// This skip a fetch operation that we would do if we where to do a proper clone
// We can save a lot of time and network i/o doing it this way, at the cost of
// resulting in a very barebone local copy
fmt.Printf("Initializing %v into %v\n", gpp.url, gpp.dst)
r, err := git.PlainInit(gpp.dst, false)
if err != nil {
return fmt.Errorf("failed to clone git repo %v to %v: %v", gpp.url, gpp.dst, err)
}
// Configure the remote
_, err = r.CreateRemote(&config.RemoteConfig{
Name: c.RemoteName,
URLs: []string{gpp.url},
})
if err != nil {
return fmt.Errorf("failed to setup remote %v in git repo %v: %v", gpp.url, gpp.dst, err)
}
// Configure a local branch to track the remote branch
err = r.CreateBranch(&config.Branch{
Name: gpp.defaultBranch,
Remote: c.RemoteName,
Merge: plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", gpp.defaultBranch)),
})
if err != nil {
return fmt.Errorf("failed to create branch %v of git repo %v: %v", gpp.defaultBranch, gpp.dst, err)
}
// Checkout the default branch
w, err := r.Worktree()
if err != nil {
return fmt.Errorf("failed to retrieve worktree of git repo %v: %v", gpp.dst, err)
}
w.Checkout(&git.CheckoutOptions{
Branch: branchRef,
})
}
return nil
}
func (c *gitClient) pull(gpp *gitClonePullParam) error {
// Check if the local repo is on default branch
// Check if the local repo is dirty
// Checkout the remote default branch
// TODO
return nil
}

1
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/hanwen/go-fuse v1.0.0
github.com/hanwen/go-fuse/v2 v2.0.3
github.com/xanzy/go-gitlab v0.40.2
gopkg.in/src-d/go-billy.v4 v4.3.2
gopkg.in/src-d/go-git.v4 v4.13.1 // indirect
gopkg.in/yaml.v2 v2.2.4
)

32
main.go
View File

@ -15,6 +15,7 @@ import (
const (
OnCloneInit = "init"
OnCloneInitPull = "init-pull"
OnCloneNoCheckout = "no-checkout"
OnCloneCheckout = "checkout"
)
@ -42,6 +43,10 @@ type (
OnClone string `yaml:"on_clone,omitempty"`
AutoPull bool `yaml:"auto_pull,omitempty"`
Depth int `yaml:"depth,omitempty"`
CloneQueueSize int `yaml:"clone_queue_size,omitempty"`
CloneWorkerCount int `yaml:"clone_worker_count,omitempty"`
PullQueueSize int `yaml:"pull_queue_size,omitempty"`
PullWorkerCount int `yaml:"pull_worker_count,omitempty"`
}
)
@ -71,6 +76,10 @@ func loadConfig(configPath string) (*Config, error) {
OnClone: "init",
AutoPull: false,
Depth: 0,
CloneQueueSize: 200,
CloneWorkerCount: 5,
PullQueueSize: 500,
PullWorkerCount: 5,
},
}
@ -109,16 +118,24 @@ func makeGitConfig(config *Config) (*git.GitClientParam, error) {
}
// parse on_clone
fetch := false
pullAfterClone := false
clone := false
checkout := false
if config.Git.OnClone == OnCloneInit {
fetch = false
pullAfterClone = false
clone = false
checkout = false
} else if config.Git.OnClone == OnCloneInitPull {
pullAfterClone = true
clone = false
checkout = false
} else if config.Git.OnClone == OnCloneNoCheckout {
fetch = true
pullAfterClone = false
clone = true
checkout = false
} else if config.Git.OnClone == OnCloneCheckout {
fetch = true
pullAfterClone = false
clone = true
checkout = true
} else {
return nil, fmt.Errorf("on_clone must be either \"%v\", \"%v\" or \"%V\"", OnCloneInit, OnCloneNoCheckout, OnCloneCheckout)
@ -128,10 +145,15 @@ func makeGitConfig(config *Config) (*git.GitClientParam, error) {
CloneLocation: config.Git.CloneLocation,
RemoteName: config.Git.Remote,
RemoteURL: parsedGitlabURL,
Fetch: fetch,
PullAfterClone: pullAfterClone,
Clone: clone,
Checkout: checkout,
AutoPull: config.Git.AutoPull,
PullDepth: config.Git.Depth,
CloneBuffSize: config.Git.CloneQueueSize,
CloneWorkerCount: config.Git.CloneWorkerCount,
PullBuffSize: config.Git.PullQueueSize,
PullWorkerCount: config.Git.PullWorkerCount,
}, nil
}