Skip to content

DDL — Schema Management

The SQL Gateway provides a full Data Definition Language (DDL) layer on top of Elasticsearch, allowing you to define tables, schemas, pipelines, watchers, and enrich policies using relational syntax.

Table Model

A SQL table maps to Elasticsearch structures:

SQL DefinitionElasticsearch Structure
CREATE TABLE without PARTITIONED BYConcrete index
CREATE TABLE with PARTITIONED BYIndex template (legacy ES6 or composable ES7+)

Column Types

SQL TypeElasticsearch Mapping
NULLnull
TINYINTbyte
SMALLINTshort
INTinteger
BIGINTlong
DOUBLEdouble
REALfloat
BOOLEANboolean
VARCHAR | TEXTtext + optional keyword subfield
KEYWORDkeyword
DATEdate
TIMESTAMPdate
STRUCTobject with nested properties
ARRAY<STRUCT>nested
GEO_POINTgeo_point

CREATE TABLE

Basic Example

CREATE TABLE users (
id INT,
name VARCHAR DEFAULT 'anonymous',
birthdate DATE,
age INT SCRIPT AS (DATE_DIFF(birthdate, CURRENT_DATE, YEAR)),
PRIMARY KEY (id)
);

With STRUCT and Multi-fields

CREATE TABLE users (
id INT NOT NULL,
profile STRUCT FIELDS (
first_name VARCHAR NOT NULL,
last_name VARCHAR NOT NULL,
address STRUCT FIELDS (
street VARCHAR,
city VARCHAR,
zip VARCHAR
),
join_date DATE,
seniority INT SCRIPT AS (DATEDIFF(profile.join_date, CURRENT_DATE, DAY))
),
content VARCHAR FIELDS (
keyword VARCHAR OPTIONS (analyzer = 'keyword'),
english VARCHAR OPTIONS (analyzer = 'english')
)
);

With ARRAY<STRUCT> (Nested)

CREATE TABLE store (
id INT NOT NULL,
products ARRAY<STRUCT> FIELDS (
name VARCHAR NOT NULL,
description VARCHAR NOT NULL,
price BIGINT NOT NULL
)
);

FIELDS Behavior

  • On VARCHAR — defines multi-fields (different analyzers)
  • On STRUCT — defines object fields
  • On ARRAY<STRUCT> — defines nested fields
  • Sub-fields support: nested FIELDS, DEFAULT, NOT NULL, COMMENT, OPTIONS, SCRIPT AS

Constraints

Primary Key

CREATE TABLE users (
id INT,
PRIMARY KEY (id)
);

Used for document ID generation, upsert semantics, and COPY INTO conflict resolution.

Composite Primary Keys

CREATE TABLE users (
id INT NOT NULL,
birthdate DATE NOT NULL,
name VARCHAR,
PRIMARY KEY (id, birthdate)
);

Generates _id = "{{id}}||{{birthdate}}" via an ingest pipeline processor.

Partitioning

Partitioning routes documents to time-based indices using date_index_name.

CREATE TABLE users (
id INT,
birthdate DATE,
PRIMARY KEY (id)
)
PARTITIONED BY (birthdate MONTH);

Supported Granularities

SQL GranularityES date_roundingExample Index Name
YEAR"y"users-2025
MONTH"M"users-2025-12
DAY (default)"d"users-2025-12-10
HOUR"h"users-2025-12-10-09
MINUTE"m"users-2025-12-10-09-46
SECOND"s"users-2025-12-10-09-46-30

CREATE TABLE AS SELECT

CREATE TABLE new_users AS
SELECT id, name FROM users;

The gateway infers the schema, generates mappings, creates the index/template, and populates data using the Bulk API.

ALTER TABLE

Supported operations:

-- Column operations
ALTER TABLE users ADD COLUMN [IF NOT EXISTS] last_login TIMESTAMP;
ALTER TABLE users DROP COLUMN [IF EXISTS] old_field;
ALTER TABLE users RENAME COLUMN old_name TO new_name;
ALTER TABLE users ALTER COLUMN name SET OPTIONS (analyzer = 'french');
ALTER TABLE users ALTER COLUMN age SET SCRIPT AS (YEAR(CURRENT_DATE) - YEAR(birthdate));
ALTER TABLE users ALTER COLUMN age DROP SCRIPT;
ALTER TABLE users ALTER COLUMN name SET DEFAULT 'unknown';
ALTER TABLE users ALTER COLUMN name DROP DEFAULT;
ALTER TABLE users ALTER COLUMN name SET NOT NULL;
ALTER TABLE users ALTER COLUMN name DROP NOT NULL;
ALTER TABLE users ALTER COLUMN name SET DATA TYPE KEYWORD;
ALTER TABLE users ALTER COLUMN name SET COMMENT 'Full name';
ALTER TABLE users ALTER COLUMN name DROP COMMENT;
ALTER TABLE users ALTER COLUMN profile ADD FIELD followers INT;
ALTER TABLE users ALTER COLUMN profile DROP FIELD old_field;
-- Index-level operations
ALTER TABLE users SET MAPPING (dynamic = false);
ALTER TABLE users DROP MAPPING dynamic;
ALTER TABLE users SET SETTING (number_of_replicas = 2);
ALTER TABLE users DROP SETTING number_of_replicas;

Type Change Safety

  • Convertible types — allowed but requires automatic reindex
  • Incompatible types — rejected with error

DROP TABLE

DROP TABLE IF EXISTS users;

TRUNCATE TABLE

TRUNCATE TABLE users;

Deletes all documents while keeping mapping, settings, pipeline, and template.


Pipelines

CREATE PIPELINE

CREATE OR REPLACE PIPELINE user_pipeline
WITH PROCESSORS (
SET (
field = "name",
if = "ctx.name == null",
description = "DEFAULT 'anonymous'",
ignore_failure = true,
value = "anonymous"
),
SCRIPT (
description = "age INT SCRIPT AS (...)",
lang = "painless",
source = "...",
ignore_failure = true
),
DATE_INDEX_NAME (
field = "birthdate",
index_name_prefix = "users-",
date_formats = ["yyyy-MM"],
date_rounding = "M",
separator = "-",
ignore_failure = true
)
);

ALTER PIPELINE

ALTER PIPELINE IF EXISTS user_pipeline (
ADD PROCESSOR SET (
field = "status",
if = "ctx.status == null",
value = "active"
),
DROP PROCESSOR SET (_id)
);

DROP PIPELINE

DROP PIPELINE IF EXISTS user_pipeline;

Watchers

Watchers provide scheduled monitoring and alerting. A watcher consists of a trigger, input, condition, and actions.

CREATE WATCHER

CREATE OR REPLACE WATCHER high_error_rate AS
EVERY 5 MINUTES
FROM logs-* WHERE level = 'ERROR' WITHIN 5 MINUTES
WHEN ctx.payload.hits.total > 100 DO
notify LOG "High error rate: {{ctx.payload.hits.total}} errors in the last 5 minutes" AT ERROR
END

Triggers

Trigger TypeSyntaxExample
IntervalEVERY n unitEVERY 5 MINUTES
CronAT SCHEDULE 'expression'AT SCHEDULE '0 0 9 * * ?'

Inputs

Input TypeSyntax
No inputWITH NO INPUT
SimpleWITH INPUT (key = value, ...)
SearchFROM index [WHERE criteria] [WITHIN n unit]
HTTPWITH INPUT GET "url" [HEADERS (...)] [BODY "..."] [TIMEOUT (...)]
ChainWITH INPUTS name1 input1, name2 input2, ...

Conditions

ConditionSyntax
AlwaysALWAYS DO
NeverNEVER DO
CompareWHEN path operator value DO
ScriptWHEN SCRIPT '...' USING LANG '...' WITH PARAMS (...) RETURNS TRUE DO

Actions

Logging:

action_name LOG "message" AT ERROR FOREACH "ctx.payload.hits.hits" LIMIT 500

Webhook:

action_name WEBHOOK POST "https://hooks.example.com/webhook"
HEADERS ("Content-Type" = "application/json")
BODY "{\"text\": \"Alert triggered\"}"

Complete Example

CREATE OR REPLACE WATCHER enriched_alert AS
AT SCHEDULE '0 */15 * * * ?'
WITH INPUTS
alerts FROM alerts-* WHERE severity = 'critical' WITHIN 15 MINUTES,
context GET PROTOCOL https HOST "api.internal.com" PATH "/context"
HEADERS ("X-API-Key" = "secret123")
WHEN ctx.payload.alerts.hits.total > 0 DO
log_alert LOG "Critical alerts: {{ctx.payload.alerts.hits.total}}" AT ERROR,
notify_ops WEBHOOK POST "https://alerting.example.com/alert"
HEADERS ("Content-Type" = "application/json")
BODY "{\"alerts\": {{ctx.payload.alerts.hits.total}}}"
END

DROP WATCHER

DROP WATCHER IF EXISTS high_error_rate;

Enrich Policies

Enrich policies add data from existing indices to incoming documents during ingest.

CREATE ENRICH POLICY

CREATE ENRICH POLICY user_enrichment
FROM users
ON user_id
ENRICH name, email, department;

Policy Types

TypeDescriptionUse Case
MATCH (default)Exact value matchingUser IDs, product codes
GEO_MATCHGeo-shape matchingLocation-based enrichment
RANGERange-based matchingIP ranges, numeric ranges

With WHERE Clause

CREATE OR REPLACE ENRICH POLICY active_user_enrichment
FROM users
ON user_id
ENRICH name, email, department
WHERE account_status = 'active' AND email_verified = true;

EXECUTE ENRICH POLICY

EXECUTE ENRICH POLICY user_enrichment;

Using in Pipelines

CREATE PIPELINE events_enriched
WITH PROCESSORS (
ENRICH (
policy_name = "user_enrichment",
field = "user_id",
target_field = "user_info",
max_matches = 1,
ignore_missing = true
)
);

DROP ENRICH POLICY

DROP ENRICH POLICY IF EXISTS user_enrichment;

Version Compatibility

FeatureES6ES7ES8ES9
Legacy templatesYesYesNoNo
Composable templatesNoYesYesYes
date_index_nameYesYesYesYes
Generated scriptsYesYesYesYes
STRUCTYesYesYesYes
ARRAY<STRUCT>YesYesYesYes
WatchersYesYesYesYes
Enrich PoliciesNoYes*YesYes
Materialized ViewsNoYes*YesYes

* Requires ES 7.5+