diff --git a/agent/agent_test.go b/agent/agent_test.go
index 8be15fa85423d687444b2783d8a3f8a3d0428230..d7f860d8a1795ac2fc77ccf270624485bc3d8aac 100644
--- a/agent/agent_test.go
+++ b/agent/agent_test.go
@@ -20,6 +20,8 @@ import (
 
 	"github.com/hashicorp/consul/testrpc"
 
+	"github.com/hashicorp/consul/agent/cache"
+	cachetype "github.com/hashicorp/consul/agent/cache-types"
 	"github.com/hashicorp/consul/agent/checks"
 	"github.com/hashicorp/consul/agent/config"
 	"github.com/hashicorp/consul/agent/connect"
@@ -3574,3 +3576,97 @@ func TestConfigSourceFromName(t *testing.T) {
 		})
 	}
 }
+
+func TestAgentCache_serviceInConfigFile_initialFetchErrors_Issue6521(t *testing.T) {
+	t.Parallel()
+
+	// Ensure that initial failures to fetch the discovery chain via the agent
+	// cache using the notify API for a service with no config entries
+	// correctly recovers when those RPCs resume working. The key here is that
+	// the lack of config entries guarantees that the RPC will come back with a
+	// synthetic index of 1.
+	//
+	// The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the
+	// index for the next query from 0 to 1 for all queries, when it should
+	// have not done so for queries that errored.
+
+	a1 := NewTestAgent(t, t.Name()+"-a1", "")
+	defer a1.Shutdown()
+	testrpc.WaitForLeader(t, a1.RPC, "dc1")
+
+	a2 := NewTestAgent(t, t.Name()+"-a2", `
+		server = false
+		bootstrap = false
+services {
+  name = "echo-client"
+  port = 8080
+  connect {
+    sidecar_service {
+      proxy {
+        upstreams {
+          destination_name = "echo"
+          local_bind_port  = 9191
+        }
+      }
+    }
+  }
+}
+
+services {
+  name = "echo"
+  port = 9090
+  connect {
+    sidecar_service {}
+  }
+}
+	`)
+	defer a2.Shutdown()
+
+	// Starting a client agent disconnected from a server with services.
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	ch := make(chan cache.UpdateEvent, 1)
+	require.NoError(t, a2.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
+		Datacenter:           "dc1",
+		Name:                 "echo",
+		EvaluateInDatacenter: "dc1",
+		EvaluateInNamespace:  "default",
+	}, "foo", ch))
+
+	{ // The first event is an error because we are not joined yet.
+		evt := <-ch
+		require.Equal(t, "foo", evt.CorrelationID)
+		require.Nil(t, evt.Result)
+		require.Error(t, evt.Err)
+		require.Equal(t, evt.Err, structs.ErrNoServers)
+	}
+
+	t.Logf("joining client to server")
+
+	// Now connect to server
+	_, err := a1.JoinLAN([]string{
+		fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN),
+	})
+	require.NoError(t, err)
+
+	t.Logf("joined client to server")
+
+	deadlineCh := time.After(10 * time.Second)
+	start := time.Now()
+AGAIN:
+	select {
+	case evt := <-ch:
+		// We may receive several notifications of an error until we get the
+		// first successful reply.
+		require.Equal(t, "foo", evt.CorrelationID)
+		if evt.Err != nil {
+			goto AGAIN
+		}
+		require.NoError(t, evt.Err)
+		require.NotNil(t, evt.Result)
+		t.Logf("took %s to get first success", time.Since(start))
+	case <-deadlineCh:
+		t.Fatal("did not get notified successfully")
+	}
+}
diff --git a/agent/cache/watch.go b/agent/cache/watch.go
index 47476c37b8c78ebb8e4e910389562b3ed16f162c..fca176fe08ecfa31f72c9837c78e233de4b2e8e5 100644
--- a/agent/cache/watch.go
+++ b/agent/cache/watch.go
@@ -126,7 +126,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
 			}
 		}
 		// Sanity check we always request blocking on second pass
-		if index < 1 {
+		if err == nil && index < 1 {
 			index = 1
 		}
 	}
diff --git a/agent/cache/watch_test.go b/agent/cache/watch_test.go
index 38661374ecda685c2adff24ef1d825bbf7877f5a..cce8d355907dde0b369ae26964152ae356ceacd0 100644
--- a/agent/cache/watch_test.go
+++ b/agent/cache/watch_test.go
@@ -33,8 +33,9 @@ func TestCacheNotify(t *testing.T) {
 	// initially.
 	typ.Static(FetchResult{Value: nil, Index: 0}, errors.New("no servers available")).Once()
 
-	// Configure the type
-	typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
+	// Configure the type. The first time we use the fake index of "1" to verify we
+	// don't regress on https://github.com/hashicorp/consul/issues/6521 .
+	typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
 		// Assert the right request type - all real Fetch implementations do this so
 		// it keeps us honest that Watch doesn't require type mangling which will
 		// break in real life (hint: it did on the first attempt)
@@ -79,7 +80,7 @@ func TestCacheNotify(t *testing.T) {
 	TestCacheNotifyChResult(t, ch, UpdateEvent{
 		CorrelationID: "test",
 		Result:        1,
-		Meta:          ResultMeta{Hit: false, Index: 4},
+		Meta:          ResultMeta{Hit: false, Index: 1},
 		Err:           nil,
 	})
 
diff --git a/agent/config_endpoint.go b/agent/config_endpoint.go
index 2f290e66efbc13d1312eaaf36cb9413a45168a88..283f9f45e26f89b5b7d76353a9c4df357773568d 100644
--- a/agent/config_endpoint.go
+++ b/agent/config_endpoint.go
@@ -42,6 +42,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int
 		if err := s.agent.RPC("ConfigEntry.Get", &args, &reply); err != nil {
 			return nil, err
 		}
+		setMeta(resp, &reply.QueryMeta)
 
 		if reply.Entry == nil {
 			return nil, NotFoundError{Reason: fmt.Sprintf("Config entry not found for %q / %q", pathArgs[0], pathArgs[1])}
@@ -56,6 +57,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int
 		if err := s.agent.RPC("ConfigEntry.List", &args, &reply); err != nil {
 			return nil, err
 		}
+		setMeta(resp, &reply.QueryMeta)
 
 		return reply.Entries, nil
 	default:
diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go
index eea31d100ff4aed3eaa1900d4eb0ebe79bf69874..736ebcd22449a9a114c81f66bd21b90aaafbaae5 100644
--- a/agent/proxycfg/state.go
+++ b/agent/proxycfg/state.go
@@ -460,18 +460,22 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
 }
 
 func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error {
+	if u.Err != nil {
+		return fmt.Errorf("error filling agent cache: %v", u.Err)
+	}
+
 	switch {
 	case u.CorrelationID == rootsWatchID:
 		roots, ok := u.Result.(*structs.IndexedCARoots)
 		if !ok {
-			return fmt.Errorf("invalid type for roots response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		snap.Roots = roots
 
 	case u.CorrelationID == leafWatchID:
 		leaf, ok := u.Result.(*structs.IssuedCert)
 		if !ok {
-			return fmt.Errorf("invalid type for leaf response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		snap.ConnectProxy.Leaf = leaf
 
@@ -481,7 +485,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
 	case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
 		resp, ok := u.Result.(*structs.DiscoveryChainResponse)
 		if !ok {
-			return fmt.Errorf("invalid type for service response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
 		snap.ConnectProxy.DiscoveryChain[svc] = resp.Chain
@@ -493,7 +497,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
 	case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
 		resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
 		if !ok {
-			return fmt.Errorf("invalid type for service response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
 		targetID, svc, ok := removeColonPrefix(correlationID)
@@ -511,7 +515,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
 	case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
 		resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
 		if !ok {
-			return fmt.Errorf("invalid type for service response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
 		dc, svc, ok := removeColonPrefix(correlationID)
@@ -528,7 +532,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
 	case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix):
 		resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
 		if !ok {
-			return fmt.Errorf("invalid type for service response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		svc := strings.TrimPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix)
 		snap.ConnectProxy.UpstreamEndpoints[svc] = resp.Nodes
@@ -536,13 +540,13 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
 	case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
 		resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
 		if !ok {
-			return fmt.Errorf("invalid type for prepared query response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
 		snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes
 
 	default:
-		return errors.New("unknown correlation ID")
+		return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
 	}
 	return nil
 }
@@ -650,17 +654,21 @@ func (s *state) resetWatchesFromChain(
 }
 
 func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
+	if u.Err != nil {
+		return fmt.Errorf("error filling agent cache: %v", u.Err)
+	}
+
 	switch u.CorrelationID {
 	case rootsWatchID:
 		roots, ok := u.Result.(*structs.IndexedCARoots)
 		if !ok {
-			return fmt.Errorf("invalid type for roots response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		snap.Roots = roots
 	case serviceListWatchID:
 		services, ok := u.Result.(*structs.IndexedServices)
 		if !ok {
-			return fmt.Errorf("invalid type for services response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 
 		for svcName := range services.Services {
@@ -703,7 +711,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
 	case datacentersWatchID:
 		datacentersRaw, ok := u.Result.(*[]string)
 		if !ok {
-			return fmt.Errorf("invalid type for datacenters response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 		if datacentersRaw == nil {
 			return fmt.Errorf("invalid response with a nil datacenter list")
@@ -753,7 +761,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
 	case serviceResolversWatchID:
 		configEntries, ok := u.Result.(*structs.IndexedConfigEntries)
 		if !ok {
-			return fmt.Errorf("invalid type for services response: %T", u.Result)
+			return fmt.Errorf("invalid type for response: %T", u.Result)
 		}
 
 		resolvers := make(map[string]*structs.ServiceResolverConfigEntry)
@@ -768,7 +776,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
 		case strings.HasPrefix(u.CorrelationID, "connect-service:"):
 			resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
 			if !ok {
-				return fmt.Errorf("invalid type for service response: %T", u.Result)
+				return fmt.Errorf("invalid type for response: %T", u.Result)
 			}
 
 			svc := strings.TrimPrefix(u.CorrelationID, "connect-service:")
@@ -781,7 +789,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
 		case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
 			resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
 			if !ok {
-				return fmt.Errorf("invalid type for service response: %T", u.Result)
+				return fmt.Errorf("invalid type for response: %T", u.Result)
 			}
 
 			dc := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")