Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,20 @@ public void submit(Broker.BrokerRequest request,
return;
}

/// The MV minion executor sets this request-metadata flag to force-disable materialized-view
/// rewrite for its materialization query. Apply it as an override AFTER parsing so it cannot be
/// defeated by an `enableMaterializedViewRewrite` option present in the (user-authored) query
/// text: the parser keeps the last value for a duplicated SET, and SQL options otherwise outrank
/// request options. The materialization query reads the base table and must never be rewritten
/// back onto an MV (its own or a sibling). Safe to force from request metadata — disabling MV
/// rewrite only forgoes an optimization and never changes results.
String mvRewriteOverride =
metadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_MATERIALIZED_VIEW_REWRITE);
if (mvRewriteOverride != null) {
sqlNodeAndOptions.getOptions()
.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_MATERIALIZED_VIEW_REWRITE, mvRewriteOverride);
}

RequesterIdentity requesterIdentify = GrpcRequesterIdentity.fromRequest(request);
RequestContext requestContext = new DefaultRequestContext();
BrokerResponse brokerResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,11 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
/// response's `tablesQueried` field tags the user's base table — not the MV the broker
/// routed through. Equals `rawTableName` when no MV swap happens.
String userRawTableName = rawTableName;
if (_materializedViewHandler != null) {
/// Per-query opt-out (default true). A user — or the MV minion executor for its own
/// materialization query — sets `enableMaterializedViewRewrite=false` to force the base-table
/// path and avoid rewriting a materialization query back onto an MV.
if (_materializedViewHandler != null
&& QueryOptionsUtils.isMaterializedViewRewriteEnabled(serverPinotQuery.getQueryOptions())) {
MaterializedViewCompileOutcome outcome = applyMaterializedViewRewriteAtCompile(
requestId, serverPinotQuery, tableName, rawTableName, schema, _tableCache.isIgnoreCase());
materializedViewContext = outcome._materializedViewContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand Down Expand Up @@ -214,6 +215,47 @@ public void testGrpcBytesReceivedMetric()
verify(_brokerMetrics).addMeteredGlobalValue(eq(BrokerMeter.GRPC_BYTES_SENT), anyLong());
}

@Test
public void testForceDisableMaterializedViewRewriteViaMetadataOverridesSql()
throws Exception {
/// The SQL explicitly tries to ENABLE MV rewrite; the request-metadata flag must force it OFF,
/// proving the metadata override beats an `enableMaterializedViewRewrite` option in the query
/// text (the exact case the MV minion executor relies on for its materialization query).
Broker.BrokerRequest request = Broker.BrokerRequest.newBuilder()
.setSql("SET enableMaterializedViewRewrite='true'; SELECT * FROM testTable")
.putMetadata(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_MATERIALIZED_VIEW_REWRITE, "false")
.build();

ArgumentCaptor<SqlNodeAndOptions> optionsCaptor = ArgumentCaptor.forClass(SqlNodeAndOptions.class);
when(_brokerRequestHandler.handleRequest(any(), optionsCaptor.capture(), any(), any(), any()))
.thenReturn(new BrokerResponseNative());

_brokerGrpcServer.submit(request, createMockStreamObserver(new ArrayList<>()));

assertEquals(optionsCaptor.getValue().getOptions()
.get(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_MATERIALIZED_VIEW_REWRITE),
"false", "Request metadata must force enableMaterializedViewRewrite=false, overriding the SQL SET");
}

@Test
public void testNoMaterializedViewRewriteMetadataLeavesSqlOptionUntouched()
throws Exception {
/// Without the metadata flag, no override is applied and the SQL SET is the source of truth.
Broker.BrokerRequest request = Broker.BrokerRequest.newBuilder()
.setSql("SET enableMaterializedViewRewrite='true'; SELECT * FROM testTable")
.build();

ArgumentCaptor<SqlNodeAndOptions> optionsCaptor = ArgumentCaptor.forClass(SqlNodeAndOptions.class);
when(_brokerRequestHandler.handleRequest(any(), optionsCaptor.capture(), any(), any(), any()))
.thenReturn(new BrokerResponseNative());

_brokerGrpcServer.submit(request, createMockStreamObserver(new ArrayList<>()));

assertEquals(optionsCaptor.getValue().getOptions()
.get(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_MATERIALIZED_VIEW_REWRITE),
"true", "Without the metadata flag, the SQL-set option must be preserved");
}

/**
* Helper method to create a mock StreamObserver that captures all responses.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


Expand Down Expand Up @@ -1197,6 +1199,125 @@ protected BrokerResponseNative processMaterializedViewSplitBrokerRequest(long re
"Response must report the MV name when the swap was committed");
}

/**
* Per-query opt-out: a query carrying {@code enableMaterializedViewRewrite=false} (the option the
* MV minion executor injects onto its materialization query) MUST bypass the rewrite path
* entirely. This is the regression guard for the circular-rewrite hazard: a materialization
* query reads the base table, and with broker-wide MV rewrite enabled it would otherwise be
* eligible to be rewritten back onto an MV over the same base table (its own MV, or a sibling).
* Using the identical setup to {@link #testMaterializedViewFullRewriteActuallySwapsServerQuery}
* (which DOES swap), this asserts the gate prevents the swap: the rewrite engine is never
* consulted, the server query targets the base table, no MV marker is stamped, and the response
* reports no MV.
*/
@Test
public void testMaterializedViewRewriteSkippedWhenDisabledByQueryOption()
throws Exception {
String baseOfflineTable = "baseTable_OFFLINE";
String materializedViewOfflineTable = "mv_baseTable_OFFLINE";
String baseRawTable = "baseTable";
String materializedViewRawTable = "mv_baseTable";

String userSql = "SET enableMaterializedViewRewrite='false';"
+ "SELECT ts, SUM(revenue) FROM baseTable GROUP BY ts LIMIT 100";
PinotQuery materializedViewQuery = CalciteSqlParser.compileToPinotQuery(
"SELECT ts, SUM(revenue) FROM mv_baseTable_OFFLINE GROUP BY ts LIMIT 100");

/// The engine WOULD return a committable FULL_REWRITE if consulted — but the gate must ensure
/// it is never consulted at all.
MaterializedViewRewritePlan plan = new MaterializedViewRewritePlan(
materializedViewOfflineTable, MatchType.EXACT, ExecutionMode.FULL_REWRITE, materializedViewQuery, 1.0);
MaterializedViewQueryRewriteEngine materializedViewEngine = mock(MaterializedViewQueryRewriteEngine.class);
when(materializedViewEngine.tryRewrite(any(PinotQuery.class), anyString())).thenReturn(plan);
MaterializedViewHandler materializedViewHandler = new DefaultMaterializedViewHandler(materializedViewEngine);

Schema baseSchema = new Schema.SchemaBuilder()
.setSchemaName(baseRawTable)
.addSingleValueDimension("ts", DataType.STRING)
.addMetric("revenue", DataType.DOUBLE)
.build();

TableCache tableCache = mock(TableCache.class);
when(tableCache.getActualTableName(baseRawTable)).thenReturn(baseRawTable);
when(tableCache.getSchema(baseRawTable)).thenReturn(baseSchema);
when(tableCache.getColumnNameMap(anyString())).thenReturn(Map.of("ts", "ts", "revenue", "revenue"));
TableConfig tableCfg = mock(TableConfig.class);
TenantConfig tenant = new TenantConfig("t_BROKER", "t_SERVER", null);
when(tableCfg.getTenantConfig()).thenReturn(tenant);
when(tableCache.getTableConfig(baseOfflineTable)).thenReturn(tableCfg);
when(tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(baseRawTable))).thenReturn(null);

BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
when(routingManager.routingExists(baseOfflineTable)).thenReturn(true);
when(routingManager.getQueryTimeoutMs(anyString())).thenReturn(10000L);
RoutingTable rt = mock(RoutingTable.class);
when(rt.getServerInstanceToSegmentsMap()).thenReturn(
Map.of(new ServerInstance(new InstanceConfig("server01_9000")),
new SegmentsToQuery(List.of("seg01"), List.of())));
when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt);

QueryQuotaManager quotaManager = mock(QueryQuotaManager.class);
when(quotaManager.acquireDatabase(anyString())).thenReturn(true);
when(quotaManager.acquireApplication(anyString())).thenReturn(true);
when(quotaManager.acquire(anyString())).thenReturn(true);

BrokerMetrics.register(mock(BrokerMetrics.class));
PinotConfiguration config = new PinotConfiguration();
BrokerQueryEventListenerFactory.init(config);

AtomicReference<BrokerRequest> capturedServerBrokerRequest = new AtomicReference<>();
BaseSingleStageBrokerRequestHandler handler =
new BaseSingleStageBrokerRequestHandler(config, "broker1", new BrokerRequestIdGenerator(),
routingManager, new AllowAllAccessControlFactory(), quotaManager, tableCache,
ThreadAccountantUtils.getNoOpAccountant(), null, materializedViewHandler) {
@Override
public void start() {
}

@Override
public void shutDown() {
}

@Override
protected BrokerResponseNative processBrokerRequest(long requestId,
BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest,
TableRouteInfo route, long timeoutMs, ServerStats serverStats,
RequestContext requestContext) {
capturedServerBrokerRequest.set(serverBrokerRequest);
return BrokerResponseNative.empty();
}

@Override
protected BrokerResponseNative processMaterializedViewSplitBrokerRequest(long requestId,
long materializedViewRequestId, BrokerRequest originalBrokerRequest, TableRouteInfo baseRoute,
TableRouteInfo materializedViewRoute, long timeoutMs, ServerStats serverStats,
RequestContext requestContext) {
Assert.fail("MV split path must not be reached when enableMaterializedViewRewrite=false");
return BrokerResponseNative.empty();
}
};

BrokerResponseNative response = (BrokerResponseNative) handler.handleRequest(userSql);
BrokerRequest serverBrokerRequest = capturedServerBrokerRequest.get();
Assert.assertNotNull(serverBrokerRequest,
"processBrokerRequest must have been reached on the base-table path; exceptions: "
+ response.getExceptions());
/// The rewrite engine must never be consulted when the per-query opt-out is set.
verify(materializedViewEngine, never()).tryRewrite(any(PinotQuery.class), anyString());
/// The server query MUST target the base table — no swap to the MV.
String serverTableName = serverBrokerRequest.getPinotQuery().getDataSource().getTableName();
Assert.assertEquals(serverTableName, baseOfflineTable,
"Query with enableMaterializedViewRewrite=false must target the base table but got: " + serverTableName);
/// No MV-rewrite marker may be stamped.
Map<String, String> serverOptions = serverBrokerRequest.getPinotQuery().getQueryOptions();
Assert.assertTrue(serverOptions == null
|| !serverOptions.containsKey(CommonConstants.Broker.Request.QueryOptionKey.MATERIALIZED_VIEW_REWRITE),
"MV-rewrite marker must not be stamped when rewrite is disabled; options: " + serverOptions);
/// And the response must NOT report a materializedViewQueried.
Assert.assertNull(response.getMaterializedViewQueried(),
"Response must not report a materializedViewQueried when rewrite is disabled by the query option");
}

/**
* Pins the cascade-prevention guard: when the user's query already targets an MV table
* directly ({@code TableConfig.isMaterializedView()} is {@code true}), the broker must skip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,18 @@ public static boolean isSkipUpsert(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT));
}

/// Returns whether materialized-view rewrite is allowed for this query. Defaults to `true`
/// (absence ⇒ rewrite allowed) for backward compatibility with the pre-option behavior; the
/// MV minion executor sets it to `false` so a materialization query is never rewritten back
/// onto an MV.
public static boolean isMaterializedViewRewriteEnabled(Map<String, String> queryOptions) {
if (queryOptions == null) {
return true;
}
String value = queryOptions.get(QueryOptionKey.ENABLE_MATERIALIZED_VIEW_REWRITE);
return value == null || Boolean.parseBoolean(value);
}

public static boolean isSkipUpsertView(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT_VIEW));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@

import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;


Expand Down Expand Up @@ -69,6 +71,24 @@ public void shouldResolveSamplerOptionCaseInsensitively() {
assertEquals(resolved.get(TABLE_SAMPLER), "firstOnly");
}

@Test
public void materializedViewRewriteDefaultsToEnabled() {
// Absent option and null map both default to enabled (back-compat with pre-option behavior).
assertTrue(QueryOptionsUtils.isMaterializedViewRewriteEnabled(null));
assertTrue(QueryOptionsUtils.isMaterializedViewRewriteEnabled(new HashMap<>()));
assertTrue(QueryOptionsUtils.isMaterializedViewRewriteEnabled(
Map.of(ENABLE_MATERIALIZED_VIEW_REWRITE, "true")));
assertTrue(QueryOptionsUtils.isMaterializedViewRewriteEnabled(
Map.of(ENABLE_MATERIALIZED_VIEW_REWRITE, "TRUE")));
// Anything that is not "true" disables (explicit false, case variants, and any non-true value).
assertFalse(QueryOptionsUtils.isMaterializedViewRewriteEnabled(
Map.of(ENABLE_MATERIALIZED_VIEW_REWRITE, "false")));
assertFalse(QueryOptionsUtils.isMaterializedViewRewriteEnabled(
Map.of(ENABLE_MATERIALIZED_VIEW_REWRITE, "FALSE")));
assertFalse(QueryOptionsUtils.isMaterializedViewRewriteEnabled(
Map.of(ENABLE_MATERIALIZED_VIEW_REWRITE, "1")));
}

@Test
public void shouldExtractTableSamplerOption() {
assertEquals(QueryOptionsUtils.getTableSampler(Map.of(TABLE_SAMPLER, "firstOnly")), "firstOnly");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ public QueryHandle executeQuery(String sql, Map<String, String> authHeaders)

BrokerGrpcQueryClient client = getOrCreateClient(broker.getLeft(), broker.getRight());

Broker.BrokerRequest brokerRequest = Broker.BrokerRequest.newBuilder()
.setSql(sql)
.putAllMetadata(authHeaders)
.build();
Broker.BrokerRequest brokerRequest = buildMaterializationRequest(sql, authHeaders);
Iterator<Broker.BrokerResponse> responseIterator = client.submit(brokerRequest);

Preconditions.checkState(responseIterator.hasNext(),
Expand Down Expand Up @@ -219,6 +216,30 @@ private void decodeNextFrame() {
}
}

/// Builds the gRPC [Broker.BrokerRequest] for a materialization query, forcing materialized-view
/// rewrite OFF via request **metadata** rather than a SQL `SET`.
///
/// The materialization query reads the base table; with broker-wide MV rewrite enabled it would
/// otherwise be eligible to be rewritten back onto an MV over the same base table (its own MV or
/// a sibling), building the MV from MV data instead of the base table.
///
/// The disable is carried as request metadata so the broker can apply it as an override *after*
/// parsing (see `BrokerGrpcServer#submit`). A SQL `SET` prefix would be unreliable: the
/// (user-authored) `definedSQL` may already contain an `enableMaterializedViewRewrite` option,
/// and the parser keeps the LAST value for a duplicated key — so a prepended `SET ... = 'false'`
/// would silently lose to a later `SET ... = 'true'` in the query text. Forcing it from metadata
/// at the broker side is unconditional. Safe to force: disabling MV rewrite only forgoes an
/// optimization, never changes results.
@VisibleForTesting
static Broker.BrokerRequest buildMaterializationRequest(String sql, Map<String, String> authHeaders) {
return Broker.BrokerRequest.newBuilder()
.setSql(sql)
.putAllMetadata(authHeaders)
/// Set AFTER auth headers so the forced flag wins even if an auth header reused the key.
.putMetadata(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_MATERIALIZED_VIEW_REWRITE, "false")
.build();
}

/// Discovers all gRPC-enabled brokers from Helix and selects one using round-robin.
/// Also evicts cached clients for brokers that are no longer present.
@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.Broker;
import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -179,6 +181,28 @@ public void testCloseEvictsAllClients() {
assertEquals(_queryExecutor.getCachedClientCount(), 0);
}

@Test
public void testBuildMaterializationRequestForcesRewriteOffViaMetadata() {
String baseSql = "SELECT a, SUM(b) FROM t WHERE ts >= 1 AND ts < 2 GROUP BY a LIMIT 10";
Map<String, String> authHeaders = Map.of("Authorization", "Bearer token");

Broker.BrokerRequest request =
GrpcMaterializedViewQueryExecutor.buildMaterializationRequest(baseSql, authHeaders);

/// The materialization SQL is sent verbatim — no SET injection that an option already present in
/// the user-authored definedSQL could override.
assertEquals(request.getSql(), baseSql, "Materialization SQL must be sent verbatim");

/// The disable travels as request metadata; the broker force-applies it after parsing, so it
/// cannot be defeated by an `enableMaterializedViewRewrite` option present in the query text.
assertEquals(
request.getMetadataMap().get(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_MATERIALIZED_VIEW_REWRITE),
"false", "Materialization request must force enableMaterializedViewRewrite=false via metadata");

/// Auth headers are preserved alongside the forced flag.
assertEquals(request.getMetadataMap().get("Authorization"), "Bearer token", "Auth headers must be preserved");
}

private InstanceConfig buildBrokerConfig(String instanceName, String hostname, int grpcPort) {
InstanceConfig config = new InstanceConfig(instanceName);
config.setHostName(hostname);
Expand Down
Loading
Loading