Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/query/frontend/ast/ast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ constexpr utils::TypeInfo query::Query::kType{utils::TypeId::AST_QUERY, "Query",

constexpr utils::TypeInfo query::IndexHint::kType{utils::TypeId::AST_INDEX_HINT, "IndexHint", &query::Tree::kType};

constexpr utils::TypeInfo query::UsingStatement::kType{utils::TypeId::AST_USING_STATEMENT, "UsingStatement",
&query::Tree::kType};
constexpr utils::TypeInfo query::PreQueryDirectives::kType{utils::TypeId::AST_PRE_QUERY_DIRECTIVES,
"PreQueryDirectives", &query::Tree::kType};

constexpr utils::TypeInfo query::CypherQuery::kType{utils::TypeId::AST_CYPHER_QUERY, "CypherQuery",
&query::Query::kType};
Expand Down
13 changes: 8 additions & 5 deletions src/query/frontend/ast/ast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2162,22 +2162,25 @@ struct IndexHint {
}
};

struct UsingStatement {
struct PreQueryDirectives {
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const { return kType; }

/// Index hints
std::vector<memgraph::query::IndexHint> index_hints_;
/// Hops limit
memgraph::query::Expression *hops_limit_{nullptr};
/// Commit frequency
memgraph::query::Expression *commit_frequency_{nullptr};

UsingStatement Clone(AstStorage *storage) const {
UsingStatement object;
PreQueryDirectives Clone(AstStorage *storage) const {
PreQueryDirectives object;
object.index_hints_.resize(index_hints_.size());
for (auto i = 0; i < index_hints_.size(); ++i) {
object.index_hints_[i] = index_hints_[i].Clone(storage);
}
object.hops_limit_ = hops_limit_ ? hops_limit_->Clone(storage) : nullptr;
object.commit_frequency_ = commit_frequency_ ? commit_frequency_->Clone(storage) : nullptr;
return object;
}
};
Expand Down Expand Up @@ -2210,7 +2213,7 @@ class CypherQuery : public memgraph::query::Query, public utils::Visitable<Hiera
memgraph::query::Expression *memory_limit_{nullptr};
size_t memory_scale_{1024U};
/// Using statement
memgraph::query::UsingStatement using_statement_;
memgraph::query::PreQueryDirectives pre_query_directives_;

CypherQuery *Clone(AstStorage *storage) const override {
CypherQuery *object = storage->Create<CypherQuery>();
Expand All @@ -2221,7 +2224,7 @@ class CypherQuery : public memgraph::query::Query, public utils::Visitable<Hiera
}
object->memory_limit_ = memory_limit_ ? memory_limit_->Clone(storage) : nullptr;
object->memory_scale_ = memory_scale_;
object->using_statement_ = using_statement_.Clone(storage);
object->pre_query_directives_ = pre_query_directives_.Clone(storage);
return object;
}

Expand Down
55 changes: 42 additions & 13 deletions src/query/frontend/ast/cypher_main_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ antlrcpp::Any CypherMainVisitor::visitCypherQuery(MemgraphCypher::CypherQueryCon
cypher_query->cypher_unions_.push_back(std::any_cast<CypherUnion *>(child->accept(this)));
}

if (auto *using_statement_ctx = ctx->usingStatement()) {
cypher_query->using_statement_ = std::any_cast<UsingStatement>(using_statement_ctx->accept(this));
if (auto *pre_query_directives_ctx = ctx->preQueryDirectives()) {
cypher_query->pre_query_directives_ = std::any_cast<PreQueryDirectives>(pre_query_directives_ctx->accept(this));
}

if (auto *memory_limit_ctx = ctx->queryMemoryLimit()) {
Expand All @@ -230,31 +230,46 @@ antlrcpp::Any CypherMainVisitor::visitCypherQuery(MemgraphCypher::CypherQueryCon
return cypher_query;
}

antlrcpp::Any CypherMainVisitor::visitUsingStatement(MemgraphCypher::UsingStatementContext *ctx) {
UsingStatement using_statement;
for (auto *using_statement_item : ctx->usingStatementItem()) {
if (auto *index_hints_ctx = using_statement_item->indexHints()) {
antlrcpp::Any CypherMainVisitor::visitPreQueryDirectives(MemgraphCypher::PreQueryDirectivesContext *ctx) {
PreQueryDirectives pre_query_directives;
for (auto *pre_query_directive : ctx->preQueryDirective()) {
if (auto *index_hints_ctx = pre_query_directive->indexHints()) {
for (auto *index_hint_ctx : index_hints_ctx->indexHint()) {
auto label = AddLabel(std::any_cast<std::string>(index_hint_ctx->labelName()->accept(this)));
if (!index_hint_ctx->propertyKeyName()) {
using_statement.index_hints_.emplace_back(
pre_query_directives.index_hints_.emplace_back(
IndexHint{.index_type_ = IndexHint::IndexType::LABEL, .label_ = label});
continue;
}
using_statement.index_hints_.emplace_back(
pre_query_directives.index_hints_.emplace_back(
IndexHint{.index_type_ = IndexHint::IndexType::LABEL_PROPERTY,
.label_ = label,
.property_ = std::any_cast<PropertyIx>(index_hint_ctx->propertyKeyName()->accept(this))});
}
} else {
if (using_statement.hops_limit_) {
throw SemanticException("Hops limit can be set only once in the USING statement.");
} else if (auto *periodic_commit = pre_query_directive->periodicCommit()) {
if (pre_query_directives.commit_frequency_) {
throw SyntaxException("Commit frequency can be set only once in the USING statement.");
}
auto *periodic_commit_number = periodic_commit->periodicCommitNumber;
if (!periodic_commit_number->numberLiteral()) {
throw SyntaxException("Periodic commit should be a number variable.");
}
if (!periodic_commit_number->numberLiteral()->integerLiteral()) {
throw SyntaxException("Periodic commit should be an integer.");
}
using_statement.hops_limit_ = std::any_cast<Expression *>(using_statement_item->hopsLimit()->accept(this));

pre_query_directives.commit_frequency_ = std::any_cast<Expression *>(periodic_commit_number->accept(this));
} else if (pre_query_directive->hopsLimit()) {
if (pre_query_directives.hops_limit_) {
throw SyntaxException("Hops limit can be set only once in the USING statement.");
}
pre_query_directives.hops_limit_ = std::any_cast<Expression *>(pre_query_directive->hopsLimit()->accept(this));
} else {
throw SyntaxException("Unknown pre query directive!");
}
}

return using_statement;
return pre_query_directives;
}

antlrcpp::Any CypherMainVisitor::visitIndexQuery(MemgraphCypher::IndexQueryContext *ctx) {
Expand Down Expand Up @@ -3112,6 +3127,20 @@ antlrcpp::Any CypherMainVisitor::visitCallSubquery(MemgraphCypher::CallSubqueryC

call_subquery->cypher_query_ = std::any_cast<CypherQuery *>(ctx->cypherQuery()->accept(this));

PreQueryDirectives pre_query_directives;
if (auto const *periodic_commit = ctx->periodicSubquery()) {
auto *const periodic_commit_number = periodic_commit->periodicCommitNumber;
if (!periodic_commit_number->numberLiteral()) {
throw SyntaxException("Periodic commit should be a number variable.");
}
if (!periodic_commit_number->numberLiteral()->integerLiteral()) {
throw SyntaxException("Periodic commit should be an integer.");
}
pre_query_directives.commit_frequency_ = std::any_cast<Expression *>(periodic_commit_number->accept(this));

call_subquery->cypher_query_->pre_query_directives_ = pre_query_directives;
}

return call_subquery;
}

Expand Down
4 changes: 2 additions & 2 deletions src/query/frontend/ast/cypher_main_visitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
antlrcpp::Any visitCypherQuery(MemgraphCypher::CypherQueryContext *ctx) override;

/**
* @return UsingStatement*
* @return PreQueryDirectives*
*/
antlrcpp::Any visitUsingStatement(MemgraphCypher::UsingStatementContext *ctx) override;
antlrcpp::Any visitPreQueryDirectives(MemgraphCypher::PreQueryDirectivesContext *ctx) override;

/**
* @return IndexQuery*
Expand Down
17 changes: 13 additions & 4 deletions src/query/frontend/opencypher/grammar/MemgraphCypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,12 @@ memgraphCypherKeyword : cypherKeyword
| NO
| NODE_LABELS
| NOTHING
| OF_TOKEN
| ON_DISK_TRANSACTIONAL
| NULLIF
| ON_DISK_TRANSACTIONAL
| PASSWORD
| PERIODIC
| PORT
| PRIVILEGES
| PULSAR
Expand All @@ -120,6 +123,8 @@ memgraphCypherKeyword : cypherKeyword
| REVOKE
| ROLE
| ROLES
| ROWS
| QUOTE
| SCHEMA
| SERVER
| SERVICE_URL
Expand Down Expand Up @@ -205,7 +210,7 @@ query : cypherQuery
| showSchemaInfoQuery
;

cypherQuery : ( usingStatement )? singleQuery ( cypherUnion )* ( queryMemoryLimit )? ;
cypherQuery : ( preQueryDirectives )? singleQuery ( cypherUnion )* ( queryMemoryLimit )? ;

authQuery : createRole
| dropRole
Expand Down Expand Up @@ -277,17 +282,21 @@ updateClause : set

foreach : FOREACH '(' variable IN expression '|' updateClause+ ')' ;

usingStatement: USING usingStatementItem ( ',' usingStatementItem )* ;
preQueryDirectives: USING preQueryDirective ( ',' preQueryDirective )* ;

usingStatementItem: hopsLimit | indexHints ;
preQueryDirective: hopsLimit | indexHints | periodicCommit ;

hopsLimit: HOPS LIMIT literal ;

indexHints: INDEX indexHint ( ',' indexHint )* ;

indexHint: ':' labelName ( '(' propertyKeyName ')' )? ;

callSubquery : CALL '{' cypherQuery '}' ;
periodicCommit : PERIODIC COMMIT periodicCommitNumber=literal ;

periodicSubquery : IN TRANSACTIONS OF_TOKEN periodicCommitNumber=literal ROWS ;

callSubquery : CALL '{' cypherQuery '}' ( periodicSubquery )? ;

streamQuery : checkStream
| createStream
Expand Down
3 changes: 3 additions & 0 deletions src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ NEXT : N E X T ;
NO : N O ;
NODE_LABELS : N O D E UNDERSCORE L A B E L S ;
NOTHING : N O T H I N G ;
OF_TOKEN : O F ;
ON_DISK_TRANSACTIONAL : O N UNDERSCORE D I S K UNDERSCORE T R A N S A C T I O N A L ;
NULLIF : N U L L I F ;
PASSWORD : P A S S W O R D ;
PERIODIC : P E R I O D I C ;
PORT : P O R T ;
PRIVILEGES : P R I V I L E G E S ;
PULSAR : P U L S A R ;
Expand All @@ -122,6 +124,7 @@ RESET : R E S E T;
REVOKE : R E V O K E ;
ROLE : R O L E ;
ROLES : R O L E S ;
ROWS : R O W S ;
QUOTE : Q U O T E ;
SCHEMA : S C H E M A ;
SERVER : S E R V E R ;
Expand Down
10 changes: 10 additions & 0 deletions src/query/frontend/semantic/symbol_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ void SymbolGenerator::VisitReturnBody(ReturnBody &body, Where *where) {
scopes_.back().has_aggregation = false;
}

// CypherQuery

bool SymbolGenerator::PreVisit(CypherQuery &cypher_query) {
if (cypher_query.pre_query_directives_.commit_frequency_) {
throw utils::NotYetImplemented("periodic commit");
}

return true;
}

// Query

bool SymbolGenerator::PreVisit(SingleQuery &) {
Expand Down
3 changes: 3 additions & 0 deletions src/query/frontend/semantic/symbol_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class SymbolGenerator : public HierarchicalTreeVisitor {
using HierarchicalTreeVisitor::Visit;
using typename HierarchicalTreeVisitor::ReturnType;

// Cypher Query
bool PreVisit(CypherQuery &) override;

// Query
bool PreVisit(SingleQuery &) override;

Expand Down
8 changes: 8 additions & 0 deletions src/query/interpret/eval.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,12 @@ std::optional<size_t> EvaluateMemoryLimit(ExpressionVisitor<TypedValue> &eval, E
return limit * memory_scale;
}

std::optional<size_t> EvaluateCommitFrequency(ExpressionVisitor<TypedValue> &eval, Expression *commit_frequency) {
if (!commit_frequency) return std::nullopt;
auto const frequency_value = commit_frequency->Accept(eval);
if (!frequency_value.IsInt() || frequency_value.ValueInt() <= 0)
throw QueryRuntimeException("Commit frequency must be a non-negative integer.");
return frequency_value.ValueInt();
}

} // namespace memgraph::query
2 changes: 2 additions & 0 deletions src/query/interpret/eval.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1280,4 +1280,6 @@ std::optional<int64_t> EvaluateHopsLimit(ExpressionVisitor<TypedValue> &eval, Ex
std::optional<size_t> EvaluateMemoryLimit(ExpressionVisitor<TypedValue> &eval, Expression *memory_limit,
size_t memory_scale);

std::optional<size_t> EvaluateCommitFrequency(ExpressionVisitor<TypedValue> &eval, Expression *commit_frequency);

} // namespace memgraph::query
10 changes: 8 additions & 2 deletions src/query/interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2276,11 +2276,17 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
spdlog::info("Running query with memory limit of {}", utils::GetReadableSize(*memory_limit));
}

const auto hops_limit = EvaluateHopsLimit(evaluator, cypher_query->using_statement_.hops_limit_);
const auto hops_limit = EvaluateHopsLimit(evaluator, cypher_query->pre_query_directives_.hops_limit_);
if (hops_limit) {
spdlog::debug("Running query with hops limit of {}", *hops_limit);
}

std::optional<size_t> const commit_frequency =
EvaluateCommitFrequency(evaluator, cypher_query->pre_query_directives_.commit_frequency_);
if (commit_frequency) {
throw utils::NotYetImplemented("periodic commit");
}

auto clauses = cypher_query->single_query_->clauses_;
if (std::any_of(clauses.begin(), clauses.end(),
[](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
Expand Down Expand Up @@ -2450,7 +2456,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context};
const auto memory_limit = EvaluateMemoryLimit(evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);

const auto hops_limit = EvaluateHopsLimit(evaluator, cypher_query->using_statement_.hops_limit_);
const auto hops_limit = EvaluateHopsLimit(evaluator, cypher_query->pre_query_directives_.hops_limit_);

MG_ASSERT(current_db.execution_db_accessor_, "Profile query expects a current DB transaction");
auto *dba = &*current_db.execution_db_accessor_;
Expand Down
2 changes: 1 addition & 1 deletion src/query/plan/planner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ auto MakeLogicalPlan(TPlanningContext *context, TPlanPostProcess *post_process,

template <class TPlanningContext>
auto MakeLogicalPlan(TPlanningContext *context, const Parameters &parameters, bool use_variable_planner) {
PostProcessor post_processor(parameters, context->query->using_statement_.index_hints_, context->db);
PostProcessor post_processor(parameters, context->query->pre_query_directives_.index_hints_, context->db);
return MakeLogicalPlan(context, &post_processor, use_variable_planner);
}

Expand Down
3 changes: 2 additions & 1 deletion src/utils/typeinfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ enum class TypeId : uint64_t {
INDEXED_JOIN,
HASH_JOIN,
ROLLUP_APPLY,
PERIODIC_COMMIT,

// Replication
// NOTE: these NEED to be stable in the 2000+ range (see rpc version)
Expand Down Expand Up @@ -189,7 +190,7 @@ enum class TypeId : uint64_t {
AST_CYPHER_UNION,
AST_QUERY,
AST_INDEX_HINT,
AST_USING_STATEMENT,
AST_PRE_QUERY_DIRECTIVES,
AST_CYPHER_QUERY,
AST_EXPLAIN_QUERY,
AST_PROFILE_QUERY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Feature: Periodic commit

Scenario: Not implemented periodic commit:
Given an empty graph
When executing query:
"""
USING PERIODIC COMMIT 10 CREATE (a:A)
"""
Then an error should be raised

Scenario: Not implemented nested periodic commit:
Given an empty graph
When executing query:
"""
UNWIND range(1, 100) as x CALL { CREATE (:N) } IN TRANSACTIONS OF 10 ROWS;
"""
Then an error should be raised
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Feature: Periodic commit

Scenario: Not implemented periodic commit:
Given an empty graph
When executing query:
"""
USING PERIODIC COMMIT 10 CREATE (a:A)
"""
Then an error should be raised

Scenario: Not implemented nested periodic commit:
Given an empty graph
When executing query:
"""
UNWIND range(1, 100) as x CALL { CREATE (:N) } IN TRANSACTIONS OF 10 ROWS;
"""
Then an error should be raised
Loading