GeoMesa SQL Connector

Glink 对 Flink 的 SQL Connector 进行了扩展,实现了 GeoMesa SQL Connector。本文档介绍了如何设置 GeoMesa SQL Connector,以便通过SQL语句读写 GeoMesa。

依赖

为了使用 GeoMesa SQL Connector, 需要添加如下依赖.

GeoMesa version

Maven dependency

GeoMesa 3.1.x

glink-connector-geomesa-x.x.x.jar

GeoMesa 3.1.x

glink-sql-x.x.x.jar

流式导入案例

创建CSV source table。

CREATE TABLE CSV_TDrive (
    `pid` STRING,
    `time` TIMESTAMP(0),
    `lng` DOUBLE,
    `lat` DOUBLE
) WITH (
    'connector' = 'filesystem',
    'path' = '/path/to/csv',
    'format' = 'csv'
);

创建 GeoMesa sink table。

CREATE TABLE Geomesa_TDrive (
    `pid` STRING,
    `time` TIMESTAMP(0),
    `point2` STRING,
    PRIMARY KEY (pid) NOT ENFORCED
) WITH (
    'connector' = 'geomesa',
    'geomesa.data.store' = 'hbase',
    'geomesa.schema.name' = 'geomesa-test',
    'geomesa.spatial.fields' = 'point2:Point',
    'hbase.catalog' = 'test-sql'
);

执行导入作业。

INSERT INTO Geomesa_TDrive 
    SELECT `pid`, `time`, ST_AsText(ST_Point(lng, lat)) FROM CSV_TDrive;

数据类型映射

Flink SQL 的数据类型并不与 GeoMesa 完全兼容。对于基础数据类型而言 GeoMesa SQL Connector 进行了最大程度的适配;对于空间数据类型,由于 Flink 目前尚未支持可注册的结构化类型,因此在 Flink SQL 中所有空间数据类型均由 WKT 格式的 SRING 类型表示,且必须使用 geomesa.spatial.fields 这一 WITH 参数指定具体类型,Geomesa SQL Connector 在写入时会使用 org.locationtech.jts.io.WKTReader 进行转换。详细的数据类型对应关系如下表。Flink SQL 数据类型参见 Flink 文档。GeoMesa 数据类型参见 GeoMesa 文档

基础数据类型

Flink SQL Type

GeoMesa Type

Java Type

Indexable

CHAR / VARCHAR / STRING

String

java.lang.String

Yes

BOOLEAN

Boolean

java.lang.Boolean

Yes

BINARY / VARBINARY / BYTES

Bytes

byte[]

No

TINYINT / SMALLINT / INT

Integer

java.lang.Integer

Yes

BIGINT

Long

java.lang.Long

Yes

FLOAT

Float

java.lang.Float

Yes

DOUBLE

Double

java.lang.Double

Yes

DATE / TIME

Date

java.util.Date

Yes

TIMESTAMP

Timestamp

java.sql.Timestamp

Yes

DECIMAL

Not supported

ARRAY

Not supported

MAP / MULTISET

Not supported

Row

Not supported

空间数据类型

所有空间数据类型在 Flink SQL 中均由 WKT 格式的 STRING 类型表示.

GeoMesa Type

Java Type

Indexable

Point

org.locationtech.jts.geom.Point

Yes

LineString

org.locationtech.jts.geom.LineString

Yes

Polygon

org.locationtech.jts.geom.Polygon

Yes

MultiPoint

org.locationtech.jts.geom.MultiPoint

Yes

MultiLineString

org.locationtech.jts.geom.MultiLineString

Yes

MultiPolygon

org.locationtech.jts.geom.MultiPolygon

Yes

GeometryCollection

org.locationtech.jts.geom.GeometryCollection

Yes

Geometry

org.locationtech.jts.geom.Geometry

Yes

Connector 参数

Geomesa SQL Connector 支持通过 WITH 参数的方式对 GeoMesa 客户端的相关参数进行配置。

GeoMesa

参数名称

是否必须

参数含义

connector

连接器类型,对于 GeoMesa SQL Connector 而言固定为geomesa

geomesa.data.store

GeoMesa Data Store类型,目前支持hbase

geomesa.schema.name

GeoMesa Schema名称

geomesa.spatial.fields

空间类型字段,当包含空间字段时必须指定,否则空间类型将被解析为字符串。格式:<field name>:<field type>,多个字段间由","分隔

geomesa.temporal.join.predict

指定temporal table join的空间关系谓词,符合关系的记录将被join:

R:<distance>表示维表中空间对象与流表中空间对象距离小于distance米;

I表示流表中空间对象与维表中空间对象相交;

+C表示流表中空间对象包含维表中空间对象;

-C表示维表中空间对象包含流表中空间对象。

HBase Data Store

GeoMesa SQL Connector 支持 GeoMesa HBase Data Store 的所有配置参数,关于各个参数的具体含义,参见 Geomesa文档

参数名称

是否必须

hbase.catalog

hbase.zookeepers

hbase.coprocessor.url

hbase.config.paths

hbase.config.xml

hbase.connections.reuse

hbase.remote.filtering

hbase.security.enabled

hbase.coprocessor.threads

hbase.ranges.max-per-extended-scan

hbase.ranges.max-per-coprocessor-scan

hbase.coprocessor.arrow.enable

hbase.coprocessor.bin.enable

hbase.coprocessor.density.enable

hbase.coprocessor.stats.enable

hbase.coprocessor.yield.partial.results

hbase.coprocessor.scan.parallel

注意: 当geomesa.data.store为hbase时必须指定hbase.catalog

Last updated

Was this helpful?