internal

package
v1.50.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// READ_ROW executes Point Reads on standard tables.
	READ_ROW = &VRpcDescriptorImpl{
		MethodName: "ReadRow",
		EncodeFn: createTableEncoder(func(req interface{}, env *btpb.TableRequest) {
			args := req.(ReadRowArgs)
			env.Payload = &btpb.TableRequest_ReadRow{
				ReadRow: encodeReadRow(args),
			}
		}),
		DecodeFn: createTableDecoder(func(env *btpb.TableResponse) interface{} {
			return decodeReadRow(env.GetReadRow())
		}),
	}

	// MUTATE_ROW executes Point Mutations on standard tables.
	MUTATE_ROW = &VRpcDescriptorImpl{
		MethodName: "MutateRow",
		EncodeFn: createTableEncoder(func(req interface{}, env *btpb.TableRequest) {
			args := req.(MutateRowArgs)
			env.Payload = &btpb.TableRequest_MutateRow{
				MutateRow: encodeMutateRow(args),
			}
		}),
		DecodeFn: createTableDecoder(func(env *btpb.TableResponse) interface{} {
			return decodeMutateRow(env.GetMutateRow())
		}),
	}

	// READ_ROW_AUTH_VIEW executes Point Reads on Authorized Views.
	READ_ROW_AUTH_VIEW = &VRpcDescriptorImpl{
		MethodName: "ReadRow",
		EncodeFn: createAuthViewEncoder(func(req interface{}, env *btpb.AuthorizedViewRequest) {
			args := req.(ReadRowArgs)
			env.Payload = &btpb.AuthorizedViewRequest_ReadRow{
				ReadRow: encodeReadRow(args),
			}
		}),
		DecodeFn: createAuthViewDecoder(func(env *btpb.AuthorizedViewResponse) interface{} {
			return decodeReadRow(env.GetReadRow())
		}),
	}

	// MUTATE_ROW_AUTH_VIEW executes Point Mutations on Authorized Views.
	MUTATE_ROW_AUTH_VIEW = &VRpcDescriptorImpl{
		MethodName: "MutateRow",
		EncodeFn: createAuthViewEncoder(func(req interface{}, env *btpb.AuthorizedViewRequest) {
			args := req.(MutateRowArgs)
			env.Payload = &btpb.AuthorizedViewRequest_MutateRow{
				MutateRow: encodeMutateRow(args),
			}
		}),
		DecodeFn: createAuthViewDecoder(func(env *btpb.AuthorizedViewResponse) interface{} {
			return decodeMutateRow(env.GetMutateRow())
		}),
	}

	// READ_ROW_MAT_VIEW executes Point Reads on Materialized Views.
	READ_ROW_MAT_VIEW = &VRpcDescriptorImpl{
		MethodName: "ReadRow",
		EncodeFn: createMatViewEncoder(func(req interface{}, env *btpb.MaterializedViewRequest) {
			args := req.(ReadRowArgs)
			env.Payload = &btpb.MaterializedViewRequest_ReadRow{
				ReadRow: encodeReadRow(args),
			}
		}),
		DecodeFn: createMatViewDecoder(func(env *btpb.MaterializedViewResponse) interface{} {
			return decodeReadRow(env.GetReadRow())
		}),
	}
)
View Source
var (
	// TABLE_SESSION manages standard table scoped Session streams.
	TABLE_SESSION = &SessionDescriptor{
		Type:       SessionTypeTable,
		MethodName: "OpenTable",
		HeaderKeys: []string{"table_name", "app_profile_id", "permission"},
		LogNameFn: func(req proto.Message) string {
			r, ok := req.(*spb.OpenTableRequest)
			if !ok || r == nil {
				return "TableSession(nil)"
			}
			return fmt.Sprintf("TableSession(table=%s, app_profile=%s, perm=%s)", r.TableName, r.AppProfileId, r.Permission.String())
		},
		MetadataFn: func(req proto.Message) map[string]string {
			r, ok := req.(*spb.OpenTableRequest)
			if !ok || r == nil {
				return nil
			}
			return map[string]string{
				"open_session.payload.table_name":     r.TableName,
				"open_session.payload.app_profile_id": r.AppProfileId,
				"open_session.payload.permission":     r.Permission.String(),
			}
		},
	}

	// AUTHORIZED_VIEW_SESSION manages authorized view scoped Session streams.
	AUTHORIZED_VIEW_SESSION = &SessionDescriptor{
		Type:       SessionTypeAuthorizedView,
		MethodName: "OpenAuthorizedView",
		HeaderKeys: []string{"authorized_view_name", "app_profile_id", "permission"},
		LogNameFn: func(req proto.Message) string {
			r, ok := req.(*spb.OpenAuthorizedViewRequest)
			if !ok || r == nil {
				return "AuthorizedViewSession(nil)"
			}
			return fmt.Sprintf("AuthorizedViewSession(view=%s, app_profile=%s, perm=%s)", r.AuthorizedViewName, r.AppProfileId, r.Permission.String())
		},
		MetadataFn: func(req proto.Message) map[string]string {
			r, ok := req.(*spb.OpenAuthorizedViewRequest)
			if !ok || r == nil {
				return nil
			}
			return map[string]string{
				"open_session.payload.authorized_view_name": r.AuthorizedViewName,
				"open_session.payload.app_profile_id":       r.AppProfileId,
				"open_session.payload.permission":           r.Permission.String(),
			}
		},
	}

	// MATERIALIZED_VIEW_SESSION manages materialized view scoped Session streams (Read-Only).
	MATERIALIZED_VIEW_SESSION = &SessionDescriptor{
		Type:       SessionTypeMaterializedView,
		MethodName: "OpenMaterializedView",
		HeaderKeys: []string{"materialized_view_name", "app_profile_id", "permission"},
		LogNameFn: func(req proto.Message) string {
			r, ok := req.(*spb.OpenMaterializedViewRequest)
			if !ok || r == nil {
				return "MaterializedViewSession(nil)"
			}
			return fmt.Sprintf("MaterializedViewSession(view=%s, app_profile=%s, perm=%s)", r.MaterializedViewName, r.AppProfileId, r.Permission.String())
		},
		MetadataFn: func(req proto.Message) map[string]string {
			r, ok := req.(*spb.OpenMaterializedViewRequest)
			if !ok || r == nil {
				return nil
			}
			return map[string]string{
				"open_session.payload.materialized_view_name": r.MaterializedViewName,
				"open_session.payload.app_profile_id":         r.AppProfileId,
				"open_session.payload.permission":             r.Permission.String(),
			}
		},
	}
)

Functions

func ValidateDynamicConfig added in v1.42.0

func ValidateDynamicConfig(config btopt.DynamicChannelPoolConfig, connPoolSize int) error

ValidateDynamicConfig is a helper to centralize validation logic.

Types

type BigtableChannelPool

type BigtableChannelPool struct {
	// contains filtered or unexported fields
}

BigtableChannelPool implements ConnPool and routes requests to the connection pool according to load balancing strategy.

func CreateBigtableChannelPool added in v1.50.0

func CreateBigtableChannelPool(
	ctx context.Context,
	project, instance string,
	config ChannelPoolConfig,
	otelMeterProvider metric.MeterProvider,
	o []option.ClientOption,
	directAccessOptions []option.ClientOption,
	directAccessMD metadata.MD,
	clientCreationTimestamp time.Time,
) (*BigtableChannelPool, error)

CreateBigtableChannelPool is a helper function to initialize a separate BigtableChannelPool instance.

See CreateAndStartManagedChannelPool for the contract on `o` vs. `directAccessOptions`.

func NewBigtableChannelPool

func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), clientCreationTimestamp time.Time, opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error)

NewBigtableChannelPool creates a pool of connPoolSize and takes the dial func() NewBigtableChannelPool primes the new connection in a non-blocking goroutine to warm it up. We keep it consistent with the current channelpool behavior which is lazily initialized.

func (*BigtableChannelPool) Close

func (p *BigtableChannelPool) Close() error

Close closes all connections in the pool.

func (*BigtableChannelPool) Conn

func (p *BigtableChannelPool) Conn() *grpc.ClientConn

Conn provides connbased on selectfunc()

func (*BigtableChannelPool) Invoke

func (p *BigtableChannelPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error

Invoke selects the least loaded connection and calls Invoke on it. This method provides automatic load tracking. Load is tracked as a unary call.

func (*BigtableChannelPool) NewStream

func (p *BigtableChannelPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error)

NewStream selects a connection by the configured load-balancing strategy and opens a stream on it. grpc.OnFinish fires exactly once for any stream that was successfully created (normal completion, context cancellation, transport teardown), so it is the single source of truth for both load accounting and per-stream error attribution — no need to wrap the returned ClientStream.

func (*BigtableChannelPool) Num

func (p *BigtableChannelPool) Num() int

Num returns the number of connections in the pool.

type BigtableChannelPoolOption added in v1.42.0

type BigtableChannelPoolOption func(*BigtableChannelPool)

BigtableChannelPoolOption options for configurable

func WithAppProfile added in v1.42.0

func WithAppProfile(appProfile string) BigtableChannelPoolOption

WithAppProfile provides the appProfile

func WithDirectAccessDialer added in v1.43.0

func WithDirectAccessDialer(directAccessDialer func() (*BigtableConn, error)) BigtableChannelPoolOption

WithDirectAccessDialer provides the dialer for direct access

func WithDirectAccessFeatureFlagsMetadata added in v1.43.0

func WithDirectAccessFeatureFlagsMetadata(directAccessFeatureFlagsMD metadata.MD) BigtableChannelPoolOption

WithDirectAccessFeatureFlagsMetadata provides the feature flags required for DirectAccess

func WithFeatureFlagsMetadata added in v1.42.0

func WithFeatureFlagsMetadata(featureFlagsMd metadata.MD) BigtableChannelPoolOption

WithFeatureFlagsMetadata provides the feature flags metadata

func WithInstanceName added in v1.42.0

func WithInstanceName(instanceName string) BigtableChannelPoolOption

WithInstanceName provides the full instance Name

func WithLogger added in v1.42.0

func WithLogger(logger *log.Logger) BigtableChannelPoolOption

WithLogger provides the logger for logging events

func WithMeterProvider added in v1.42.0

func WithMeterProvider(mp metric.MeterProvider) BigtableChannelPoolOption

WithMeterProvider provides the meter provider for writing metrics

func WithMetricsReporterConfig added in v1.42.0

func WithMetricsReporterConfig(config btopt.MetricsReporterConfig) BigtableChannelPoolOption

WithMetricsReporterConfig attaches the relevant config for exporting the metrics

type BigtableConn added in v1.42.0

type BigtableConn struct {
	*grpc.ClientConn
	// contains filtered or unexported fields
}

BigtableConn wraps grpc.ClientConn to add Bigtable specific methods.

func NewBigtableConn added in v1.42.0

func NewBigtableConn(conn *grpc.ClientConn) *BigtableConn

NewBigtableConn creates a wrapped grpc Client Conn

func (*BigtableConn) Prime added in v1.42.0

func (bc *BigtableConn) Prime(ctx context.Context, fullInstanceName, appProfileID string, featureFlagsMd metadata.MD) error

Prime sends a PingAndWarm request to warm up the connection.

type ChannelPoolConfig added in v1.50.0

type ChannelPoolConfig struct {
	AppProfile                string
	DisableDynamicChannelPool bool
	DisableConnectionRecycler bool
	DisableDirectAccess       bool
}

ChannelPoolConfig has configurations for the channel pool.

type ConnectionRecycler added in v1.43.0

type ConnectionRecycler struct {
	// contains filtered or unexported fields
}

ConnectionRecycler monitors connection age and recycles them to prevent long-lived connections.

func NewConnectionRecycler added in v1.43.0

func NewConnectionRecycler(config btopt.ConnectionRecycleConfig, pool *BigtableChannelPool) *ConnectionRecycler

NewConnectionRecycler creates a new recycler with the provided configuration.

func (*ConnectionRecycler) Start added in v1.43.0

func (cr *ConnectionRecycler) Start(ctx context.Context)

Start begins the periodic monitoring.

func (*ConnectionRecycler) Stop added in v1.43.0

func (cr *ConnectionRecycler) Stop()

Stop terminates the ConnectionRecycler.

type Diverter added in v1.49.0

type Diverter struct {
	// contains filtered or unexported fields
}

Diverter decides whether to use the session-based protocol or classic protocol.

func NewDiverter added in v1.49.0

func NewDiverter(sessionLoad float64) *Diverter

NewDiverter creates a new Diverter with the given session load ratio (0.0 to 1.0).

func (*Diverter) SetSessionLoad added in v1.49.0

func (d *Diverter) SetSessionLoad(load float64)

SetSessionLoad updates the session load ratio.

func (*Diverter) UseSession added in v1.49.0

func (d *Diverter) UseSession() bool

UseSession returns true if the next call should use the session protocol.

type DynamicScaleMonitor added in v1.42.0

type DynamicScaleMonitor struct {
	// contains filtered or unexported fields
}

DynamicScaleMonitor manages upscale and downscale of the connection pool. Owner: It is owned by BigtableClient

func NewDynamicScaleMonitor added in v1.42.0

func NewDynamicScaleMonitor(config btopt.DynamicChannelPoolConfig, pool *BigtableChannelPool) *DynamicScaleMonitor

NewDynamicScaleMonitor creates a new DynamicScaleMonitor.

func (*DynamicScaleMonitor) Start added in v1.42.0

func (dsm *DynamicScaleMonitor) Start(ctx context.Context)

Start logic

func (*DynamicScaleMonitor) Stop added in v1.42.0

func (dsm *DynamicScaleMonitor) Stop()

Stop terminates the scaling check loop.

type ManagedChannelPool added in v1.50.0

type ManagedChannelPool struct {
	Pool         gtransport.ConnPool
	Dsm          *DynamicScaleMonitor
	ConnRecycler *ConnectionRecycler
}

ManagedChannelPool encapsulates a connection pool along with its lifecycle monitors.

func CreateAndStartManagedChannelPool added in v1.50.0

func CreateAndStartManagedChannelPool(
	ctx context.Context,
	project, instance string,
	config ChannelPoolConfig,
	otelMeterProvider metric.MeterProvider,
	o []option.ClientOption,
	directAccessOptions []option.ClientOption,
	directAccessMD metadata.MD,
	clientCreationTimestamp time.Time,
	enableBigtableConnPool bool,
) (ManagedChannelPool, error)

CreateAndStartManagedChannelPool initializes and starts the lifecycle monitors for a classic or session connection pool.

`o` is the full set of base ClientOptions used to dial both the classic pool and each per-connection dial inside the Bigtable channel pool. `directAccessOptions` is the *separate* set of options that opt a single dial in to direct access (DirectPath / DirectPathXds / AllowHardBoundTokens). They are kept apart because direct access is per-connection and conditional: it is layered on top of `o` only when isDirectAccessEnabled(config) is true, and only on the dedicated direct-access dialer — never on the fallback dialer used when direct access is disabled or unavailable.

func (ManagedChannelPool) Close added in v1.50.0

func (m ManagedChannelPool) Close() error

Close stops all associated monitors/recyclers and closes the underlying pool.

type MetricsReporter added in v1.42.0

type MetricsReporter struct {
	// contains filtered or unexported fields
}

MetricsReporter periodically collects and reports metrics for the connection pool.

func NewMetricsReporter added in v1.42.0

func NewMetricsReporter(config btopt.MetricsReporterConfig, connPoolStatsSupplier connPoolStatsSupplier, logger *log.Logger, mp metric.MeterProvider) (*MetricsReporter, error)

NewMetricsReporter starts a monitor to export periodic metrics

func (*MetricsReporter) Start added in v1.42.0

func (mr *MetricsReporter) Start(ctx context.Context)

Start starts the reporter.

func (*MetricsReporter) Stop added in v1.42.0

func (mr *MetricsReporter) Stop()

Stop stops the reporter gracefully

type Monitor added in v1.42.0

type Monitor interface {
	// Start begins the monitor's background processing loop.
	Start(ctx context.Context)
	// Stop gracefully terminates the monitor's background processing.
	Stop()
}

Monitor defines the interface for background tasks like health checking, dynamic scaling, and metrics reporting.

type MutateRowArgs added in v1.49.0

type MutateRowArgs struct {
	RowKey    string
	Mutations []*btpb.Mutation
}

MutateRowArgs contains arguments required for a virtual RPC MutateRow call.

type MutateRowResult added in v1.49.0

type MutateRowResult struct{}

MutateRowResult holds the result of a MutateRow virtual RPC.

type Pacemaker added in v1.43.0

type Pacemaker struct {
	// contains filtered or unexported fields
}

Pacemaker monitors the runtime scheduling delay It measures the time difference between when a ticker was scheduled to fire and when the ticker actually fires.

func NewPacemaker added in v1.43.0

func NewPacemaker(mp metric.MeterProvider, logger *log.Logger) *Pacemaker

NewPacemaker creates a new Pacemaker and initializes its metrics.

func (*Pacemaker) Start added in v1.43.0

func (p *Pacemaker) Start(ctx context.Context)

Start begins the pacemaker ticker.

func (*Pacemaker) Stop added in v1.43.0

func (p *Pacemaker) Stop()

Stop acts as a cleanup method. no-op

type ReadRowArgs added in v1.49.0

type ReadRowArgs struct {
	RowKey string
	Filter *btpb.RowFilter
}

ReadRowArgs contains arguments required for a virtual RPC ReadRow call.

type ReadRowResult added in v1.49.0

type ReadRowResult struct {
	Row *btpb.Row
}

ReadRowResult holds the result returned from a virtual RPC ReadRow call.

type SessionDescriptor added in v1.49.0

type SessionDescriptor struct {
	Type       SessionType
	MethodName string
	HeaderKeys []string
	LogNameFn  func(req proto.Message) string
	MetadataFn func(req proto.Message) map[string]string // Populates session metadata headers required for OpenSession{} RPC
}

SessionDescriptor models a dynamic envelope handshake parameters compiler.

type SessionType added in v1.49.0

type SessionType int

SessionType represents the protocol target session type.

const (
	// SessionTypeTable indicates standard table session type.
	SessionTypeTable SessionType = iota
	// SessionTypeAuthorizedView indicates authorized view session type.
	SessionTypeAuthorizedView
	// SessionTypeMaterializedView indicates materialized view session type.
	SessionTypeMaterializedView
)

func (SessionType) String added in v1.49.0

func (t SessionType) String() string

type VRpcDescriptor added in v1.49.0

type VRpcDescriptor interface {
	Method() string
	Encode(req interface{}) ([]byte, error)
	Decode(buf []byte) (interface{}, error)
}

VRpcDescriptor defines the interface for virtual RPC encoding and decoding.

type VRpcDescriptorImpl added in v1.49.0

type VRpcDescriptorImpl struct {
	MethodName string
	EncodeFn   func(req interface{}) ([]byte, error)
	DecodeFn   func(buf []byte) (interface{}, error)
}

VRpcDescriptorImpl implements VRpcDescriptor using custom encoder/decoder closures.

func (*VRpcDescriptorImpl) Decode added in v1.49.0

func (d *VRpcDescriptorImpl) Decode(buf []byte) (interface{}, error)

Decode de-serializes virtual RPC bytes back into standard Bigtable response message.

func (*VRpcDescriptorImpl) Encode added in v1.49.0

func (d *VRpcDescriptorImpl) Encode(req interface{}) ([]byte, error)

Encode serializes standard Bigtable request payload into virtual RPC bytes.

func (*VRpcDescriptorImpl) Method added in v1.49.0

func (d *VRpcDescriptorImpl) Method() string

Method returns the virtual RPC method identifier name.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL