From c9cb7408753347a0a943fbe403dbf98c47116869 Mon Sep 17 00:00:00 2001
From: John Cai <jcai@gitlab.com>
Date: Wed, 17 Jul 2024 20:48:26 -0400
Subject: [PATCH] smarthttp: Generate bundle in background

clone, check if 5 or more clones are already happening for this
repository. If so, and a bundle does not exist, then we want to generate
one because this is likely a busy repository.
---
 internal/cli/gitaly/serve.go                  |  13 +-
 .../featureflag/ff_autogenerate_bundles.go    |   9 +
 internal/gitaly/config/config.go              |   2 +
 internal/gitaly/service/smarthttp/server.go   |  14 +-
 .../gitaly/service/smarthttp/upload_pack.go   |  92 ++++++++
 .../service/smarthttp/upload_pack_test.go     | 210 ++++++++++++++++++
 6 files changed, 334 insertions(+), 6 deletions(-)
 create mode 100644 internal/featureflag/ff_autogenerate_bundles.go

diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index fb2bc9422f..41ced77966 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -494,7 +494,17 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error {
 
 	var bundleURISink *bundleuri.Sink
 	if cfg.BundleURI.GoCloudURL != "" {
-		bundleURISink, err = bundleuri.NewSink(ctx, cfg.BundleURI.GoCloudURL)
+		bundleURISink, err = bundleuri.NewSink(
+			ctx,
+			cfg.BundleURI.GoCloudURL,
+			bundleuri.WithBundleGenerationNotifier(func(bundlePath string, err error) {
+				if err != nil {
+					logger.WithField("bundle_path", bundlePath).
+						WithError(err).
+						Warn("bundle generation failed")
+				}
+			}),
+		)
 		if err != nil {
 			return fmt.Errorf("create bundle-URI sink: %w", err)
 		}
@@ -543,6 +553,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error {
 			BackupSink:          backupSink,
 			BackupLocator:       backupLocator,
 			BundleURISink:       bundleURISink,
+			InProgressTracker:   service.NewInProgressTracker(),
 		})
 		b.RegisterStarter(starter.New(c, srv, logger))
 	}
diff --git a/internal/featureflag/ff_autogenerate_bundles.go b/internal/featureflag/ff_autogenerate_bundles.go
new file mode 100644
index 0000000000..61692778f3
--- /dev/null
+++ b/internal/featureflag/ff_autogenerate_bundles.go
@@ -0,0 +1,9 @@
+package featureflag
+
+// AutogenerateBundlesForBundleURI enables the use of git's bundle URI feature
+var AutogenerateBundlesForBundleURI = NewFeatureFlag(
+	"autogenerate_bundles_for_bundleuri",
+	"v17.3.0",
+	"https://gitlab.com/gitlab-org/gitaly/-/issues/6204",
+	false,
+)
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go
index ae7e1f282c..03b313ff86 100644
--- a/internal/gitaly/config/config.go
+++ b/internal/gitaly/config/config.go
@@ -623,6 +623,8 @@ type BundleURIConfig struct {
 	// GoCloudURL is the blob storage GoCloud URL that will be used to store
 	// Git bundles for Bundle-URI use.
 	GoCloudURL string `toml:"go_cloud_url,omitempty" json:"go_cloud_url,omitempty"`
+	// Autogeneration controls whether or not bundles for bundle uris are auto generated
+	Autogeneration bool `toml:"autogeneration,omitempty" json:"autogeneration"`
 }
 
 // Validate runs validation on all fields and returns any errors found.
diff --git a/internal/gitaly/service/smarthttp/server.go b/internal/gitaly/service/smarthttp/server.go
index 5de6942f1d..ebc0e96873 100644
--- a/internal/gitaly/service/smarthttp/server.go
+++ b/internal/gitaly/service/smarthttp/server.go
@@ -34,7 +34,7 @@ type server struct {
 	backupLocator              backup.Locator
 	backupSink                 backup.Sink
 	bundleURISink              *bundleuri.Sink
-	inflightTracker            service.InProgressTracker
+	inProgressTracker          service.InProgressTracker
 	generateBundles            bool
 	partitionMgr               *storagemgr.PartitionManager
 	transactionRegistry        *storagemgr.TransactionRegistry
@@ -56,10 +56,14 @@ func NewServer(deps *service.Dependencies, serverOpts ...ServerOpt) gitalypb.Sma
 			prometheus.CounterOpts{},
 			[]string{"git_negotiation_feature"},
 		),
-		infoRefCache:  newInfoRefCache(deps.GetLogger(), deps.GetDiskCache()),
-		backupLocator: deps.GetBackupLocator(),
-		backupSink:    deps.GetBackupSink(),
-		bundleURISink: deps.GetBundleURISink(),
+		infoRefCache:        newInfoRefCache(deps.GetLogger(), deps.GetDiskCache()),
+		backupLocator:       deps.GetBackupLocator(),
+		backupSink:          deps.GetBackupSink(),
+		bundleURISink:       deps.GetBundleURISink(),
+		inProgressTracker:   deps.GetInProgressTracker(),
+		generateBundles:     deps.GetCfg().BundleURI.Autogeneration,
+		partitionMgr:        deps.GetPartitionManager(),
+		transactionRegistry: deps.GetTransactionRegistry(),
 	}
 
 	for _, serverOpt := range serverOpts {
diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go
index 6486436f66..f6a943ca67 100644
--- a/internal/gitaly/service/smarthttp/upload_pack.go
+++ b/internal/gitaly/service/smarthttp/upload_pack.go
@@ -1,21 +1,34 @@
 package smarthttp
 
 import (
+	"bytes"
 	"context"
 	"crypto/sha1"
 	"errors"
 	"fmt"
 	"io"
+	"time"
 
+	"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/command"
+	"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/git"
+	"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
+	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx"
+	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
+	"gitlab.com/gitlab-org/gitaly/v16/internal/log"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
 	"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
 )
 
+const (
+	concurrentUploadPackThreshold = 5
+	bundleGenerationTimeout       = 24 * time.Hour
+)
+
 func (s *server) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) {
 	repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req)
 	if err != nil {
@@ -116,20 +129,98 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack
 
 	gitConfig = append(gitConfig, bundleuri.CapabilitiesGitConfig(ctx)...)
 
+	originalRepo := req.GetRepository()
+
+	storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) {
+		originalRepo = tx.OriginalRepository(req.GetRepository())
+	})
+
+	key := originalRepo.GetStorageName() + ":" + originalRepo.GetRelativePath()
+
 	uploadPackConfig, err := bundleuri.UploadPackGitConfig(ctx, s.bundleURISink, req.GetRepository())
 	if err != nil {
+		if errors.Is(err, bundleuri.ErrBundleNotFound) &&
+			featureflag.AutogenerateBundlesForBundleURI.IsEnabled(ctx) &&
+			s.generateBundles &&
+			s.inProgressTracker.GetInProgress(key) > concurrentUploadPackThreshold {
+
+			go func() {
+				ctx, cancel := context.WithTimeout(context.Background(), bundleGenerationTimeout)
+				defer cancel()
+
+				if s.partitionMgr != nil {
+					tx, err := s.partitionMgr.Begin(
+						ctx,
+						originalRepo.GetStorageName(),
+						0,
+						storagemgr.TransactionOptions{
+							ReadOnly:     true,
+							RelativePath: originalRepo.GetRelativePath(),
+						},
+					)
+					if err != nil {
+						ctxlogrus.Extract(ctx).WithError(err).Error("failed starting transaction")
+						return
+					}
+
+					ctx = storagectx.ContextWithTransaction(ctx, tx)
+
+					if err := s.bundleURISink.GenerateOneAtATime(ctx, localrepo.New(
+						s.logger,
+						s.locator,
+						s.gitCmdFactory,
+						s.catfileCache,
+						originalRepo)); err != nil {
+						ctxlogrus.Extract(ctx).WithError(err).Error("generate bundle")
+						if tx == nil {
+							return
+						}
+
+						if err := tx.Rollback(); err != nil {
+							ctxlogrus.Extract(ctx).WithError(err).Error("failed rolling back transaction")
+							return
+						}
+
+						return
+					}
+
+					if err := tx.Commit(ctx); err != nil {
+						ctxlogrus.Extract(ctx).WithError(err).Error("committing transaction")
+					}
+				} else {
+					if err := s.bundleURISink.GenerateOneAtATime(ctx, localrepo.New(
+						s.logger,
+						s.locator,
+						s.gitCmdFactory,
+						s.catfileCache,
+						originalRepo)); err != nil {
+						ctxlogrus.Extract(ctx).WithError(err).Error("generate bundle")
+						ctxlogrus.Extract(ctx).WithError(err).Error("failed rolling back transaction")
+						return
+					}
+				}
+			}()
+		} else if !errors.Is(err, bundleuri.ErrSinkMissing) {
+			log.AddFields(ctx, log.Fields{"bundle_uri_error": err})
+		}
 	} else {
 		gitConfig = append(gitConfig, uploadPackConfig...)
 	}
 
+	var stderr bytes.Buffer
+
 	commandOpts := []git.CmdOpt{
 		git.WithStdin(stdin),
+		git.WithStderr(&stderr),
 		git.WithSetupStdout(),
 		git.WithGitProtocol(s.logger, req),
 		git.WithConfig(gitConfig...),
 		git.WithPackObjectsHookEnv(req.GetRepository(), "http"),
 	}
 
+	s.inProgressTracker.IncrementInProgress(key)
+	defer s.inProgressTracker.DecrementInProgress(key)
+
 	cmd, err := s.gitCmdFactory.New(ctx, req.GetRepository(), git.Command{
 		Name:  "upload-pack",
 		Flags: []git.Option{git.Flag{Name: "--stateless-rpc"}},
@@ -158,5 +249,6 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack
 	}
 
 	s.logger.WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).InfoContext(ctx, "request details")
+
 	return nil, nil
 }
diff --git a/internal/gitaly/service/smarthttp/upload_pack_test.go b/internal/gitaly/service/smarthttp/upload_pack_test.go
index 944268aee5..fc53b79924 100644
--- a/internal/gitaly/service/smarthttp/upload_pack_test.go
+++ b/internal/gitaly/service/smarthttp/upload_pack_test.go
@@ -21,11 +21,13 @@ import (
 	"gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/git"
+	"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
 	"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
@@ -380,6 +382,7 @@ func TestServer_PostUploadPackWithBundleURI(t *testing.T) {
 
 	ctx := testhelper.Context(t)
 	ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.BundleURI, true)
+	ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.AutogenerateBundlesForBundleURI, false)
 
 	tempDir := testhelper.TempDir(t)
 	keyFile, err := os.Create(filepath.Join(tempDir, "secret.key"))
@@ -500,6 +503,213 @@ func TestServer_PostUploadPackWithBundleURI(t *testing.T) {
 	}
 }
 
+type testInProgressTracker struct {
+	thresholdReached bool
+}
+
+func (t *testInProgressTracker) GetInProgress(key string) int {
+	if t.thresholdReached {
+		return concurrentUploadPackThreshold + 1
+	}
+
+	return 0
+}
+
+func (t *testInProgressTracker) IncrementInProgress(key string) {}
+
+func (t *testInProgressTracker) DecrementInProgress(key string) {}
+
+func TestServer_PostUploadPackAutogenerateBundles(t *testing.T) {
+	t.Parallel()
+
+	ctx := testhelper.Context(t)
+	ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.AutogenerateBundlesForBundleURI, true)
+	ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.BundleURI, true)
+
+	tempDir := testhelper.TempDir(t)
+	keyFile, err := os.Create(filepath.Join(tempDir, "secret.key"))
+	require.NoError(t, err)
+	_, err = keyFile.WriteString("super-secret-key")
+	require.NoError(t, err)
+	require.NoError(t, keyFile.Close())
+
+	testCases := []struct {
+		desc    string
+		sinkDir string
+		setup   func(
+			t *testing.T,
+			ctx context.Context,
+			cfg config.Cfg,
+			sink *bundleuri.Sink,
+			tracker *testInProgressTracker,
+			repoProto *gitalypb.Repository,
+			repoPath string,
+		)
+		expectBundleGenerated bool
+		verifyBundle          func(*testing.T, config.Cfg, string, git.ObjectID)
+	}{
+		{
+			desc: "autogeneration successful",
+			setup: func(
+				t *testing.T,
+				ctx context.Context,
+				cfg config.Cfg,
+				sink *bundleuri.Sink,
+				tracker *testInProgressTracker,
+				repoProto *gitalypb.Repository,
+				repoPath string,
+			) {
+				gittest.WriteCommit(t, cfg, repoPath,
+					gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}),
+					gittest.WithBranch("main"))
+
+				tracker.thresholdReached = true
+			},
+			expectBundleGenerated: true,
+			verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) {
+				tempDir := t.TempDir()
+				objectFormat := gittest.DefaultObjectHash.Format
+				gittest.Exec(t, cfg, "init", "--object-format="+objectFormat, tempDir)
+				gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath)
+				// A new bundle is expected to be created containing the new commit
+				gittest.RequireObjectExists(t, cfg, tempDir, commit)
+			},
+		},
+		{
+			desc: "bundle already exists",
+			setup: func(
+				t *testing.T,
+				ctx context.Context,
+				cfg config.Cfg,
+				sink *bundleuri.Sink,
+				tracker *testInProgressTracker,
+				repoProto *gitalypb.Repository,
+				repoPath string,
+			) {
+				gittest.WriteCommit(t, cfg, repoPath,
+					gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}),
+					gittest.WithBranch("main"))
+				tracker.thresholdReached = true
+
+				repo := localrepo.NewTestRepo(t, cfg, repoProto)
+				require.NoError(t, sink.Generate(ctx, repo))
+			},
+			expectBundleGenerated: false,
+			verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) {
+				tempDir := t.TempDir()
+				objectFormat := gittest.DefaultObjectHash.Format
+				gittest.Exec(t, cfg, "init", "--object-format="+objectFormat, tempDir)
+				gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath)
+				// No new bundle is expected to be created since one already existed.
+				gittest.RequireObjectNotExists(t, cfg, tempDir, commit)
+			},
+		},
+		{
+			desc: "no concurrent upload packs in flight",
+			setup: func(
+				t *testing.T,
+				ctx context.Context,
+				cfg config.Cfg,
+				sink *bundleuri.Sink,
+				tracker *testInProgressTracker,
+				repoProto *gitalypb.Repository,
+				repoPath string,
+			) {
+				tracker.thresholdReached = false
+				gittest.WriteCommit(t, cfg, repoPath,
+					gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}),
+					gittest.WithBranch("main"))
+			},
+			expectBundleGenerated: false,
+			verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) {
+				tempDir := t.TempDir()
+				gittest.Exec(t, cfg, "init", tempDir)
+				gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath)
+				// No new bundle is expected to have been created because there are no
+				// inflight upload pack calls.
+				gittest.RequireObjectNotExists(t, cfg, tempDir, commit)
+			},
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.desc, func(t *testing.T) {
+			doneChan := make(chan struct{})
+			errChan := make(chan error)
+
+			var bundlePath string
+
+			bundleGeneratedNotifier := func(path string, err error) {
+				bundlePath = path
+
+				close(doneChan)
+				errChan <- err
+			}
+
+			tracker := &testInProgressTracker{}
+
+			sinkDir := t.TempDir()
+			sink, err := bundleuri.NewSink(ctx, "file://"+sinkDir, bundleuri.WithBundleGenerationNotifier(bundleGeneratedNotifier))
+			require.NoError(t, err)
+
+			cfg := testcfg.Build(t)
+			logger := testhelper.NewLogger(t)
+
+			cfg.BundleURI.Autogeneration = true
+
+			gitCmdFactory := gittest.NewCommandFactory(t, cfg)
+			catfileCache := catfile.NewCache(cfg)
+			t.Cleanup(catfileCache.Stop)
+
+			server := startSmartHTTPServerWithOptions(t, cfg, nil, []testserver.GitalyServerOpt{
+				testserver.WithBundleURISink(sink),
+				testserver.WithLogger(logger),
+				testserver.WithInProgressTracker(tracker),
+				testserver.WithTransactionRegistry(storagemgr.NewTransactionRegistry()),
+				testserver.WithGitCommandFactory(gitCmdFactory),
+			})
+
+			cfg.SocketPath = server.Address()
+
+			repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg)
+			oldCommit := gittest.WriteCommit(t, cfg, repoPath)
+			newCommit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("master"), gittest.WithParents(oldCommit))
+
+			if tc.setup != nil {
+				tc.setup(t, ctx, cfg, sink, tracker, repoProto, repoPath)
+			}
+
+			commitInUpdatedBundle := gittest.WriteCommit(t, cfg, repoPath,
+				gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "CHANGELOG", Content: "nothing changed"}),
+				gittest.WithBranch("main"))
+
+			// UploadPack request is a "want" packet line followed by a packet flush, then many "have" packets followed by a packet flush.
+			// This is explained a bit in https://git-scm.com/book/en/v2/Git-Internals-Transfer-Protocols#_downloading_data
+			requestBuffer := &bytes.Buffer{}
+			gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("want %s %s\n", newCommit, clientCapabilities))
+			gittest.WritePktlineFlush(t, requestBuffer)
+			gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("have %s\n", oldCommit))
+			gittest.WritePktlineFlush(t, requestBuffer)
+
+			req := &gitalypb.PostUploadPackWithSidechannelRequest{Repository: repoProto}
+			responseBuffer, err := makePostUploadPackWithSidechannelRequest(t, ctx, cfg.SocketPath, cfg.Auth.Token, req, requestBuffer)
+			require.NoError(t, err)
+
+			pack, _, _ := extractPackDataFromResponse(t, responseBuffer)
+			require.NotEmpty(t, pack, "Expected to find a pack file in response, found none")
+
+			if tc.expectBundleGenerated {
+				<-doneChan
+				err := <-errChan
+				require.NoError(t, err)
+				tc.verifyBundle(t, cfg, filepath.Join(sinkDir, bundlePath), commitInUpdatedBundle)
+			} else {
+				require.Empty(t, bundlePath)
+			}
+		})
+	}
+}
+
 func testServerPostUploadPackWithSideChannelValidation(t *testing.T, ctx context.Context, makeRequest requestMaker, opts ...testcfg.Option) {
 	cfg := testcfg.Build(t, opts...)
 	serverSocketPath := runSmartHTTPServer(t, cfg)
-- 
GitLab