# 基本概念

# 1.Ignite是什么?

Ignite是:

一个支持水平扩展和容错的分布式内存计算平台,可以在TB级数据上以内存级的速度构建实时应用。

# 1.1.固化内存

Ignite的固化内存组件不仅仅将内存作为一个缓存层,还视为一个全功能的存储层。这意味着可以按需将持久化打开或者关闭。如果持久化关闭,那么Ignite就可以作为一个分布式的内存数据库或者内存数据网格,这完全取决于使用SQL和键-值API的喜好。如果持久化打开,那么Ignite就成为一个分布式的,可水平扩展的数据库,它会保证完整的数据一致性以及集群故障的可恢复能力。

# 1.2.Ignite持久化

Ignite的原生持久化是一个分布式的、支持ACID以及兼容SQL的磁盘存储,它可以作为一个可选的磁盘层与Ignite的固化内存透明地集成,然后将数据和索引存储在SSD、闪存、3D XPoint以及其它类型的非易失性存储中。

打开Ignite的持久化之后,就不需要将所有的数据和索引保存在内存中,或者在节点或者集群重启后对数据进行预热,因为固化内存和持久化紧密耦合之后,会将其视为一个二级存储层,这意味着在内存中数据和索引的一个子集如果丢失了,固化内存会从磁盘上进行获取。

# 1.3.ACID兼容

存储在Ignite中的数据,在内存和磁盘上是同时支持ACID的,使Ignite成为一个强一致的系统,Ignite可以在整个拓扑的多台服务器上保持事务。

# 1.4.完整的SQL支持

Ignite提供了完整的SQL、DDL和DML的支持,可以使用纯SQL而不用写代码与Ignite进行交互,这意味着只使用SQL就可以创建表和索引,以及插入、更新和查询数据。有这个完整的SQL支持,Ignite就可以作为一种分布式SQL数据库

# 1.5.键-值

Ignite的内存数据网格组件是一个完整的事务型分布式键值存储,它可以在有几百台服务器的集群上进行水平扩展。在打开持久化时,Ignite可以存储比内存容量更大的数据,并且在整个集群重启之后仍然可用。

# 1.6.并置处理

大多数传统数据库是以客户机-服务器的模式运行的,这意味着数据必须发给客户端进行处理,这个方式需要在客户端和服务端之间进行大量的数据移动,通常来说不可扩展。而Ignite使用了另外一种方式,可以将轻量级的计算发给数据,即数据的并置计算,从结果上来说,Ignite扩展性更好,并且使数据移动最小化。

# 1.7.可扩展性和持久性

Ignite是一个弹性的、可水平扩展的分布式系统,它支持按需地添加和删除节点,Ignite还可以存储数据的多个副本,这样可以使集群从部分故障中恢复。如果打开了持久化,那么Ignite中存储的数据可以在集群的完全故障中恢复。Ignite集群重启会非常快,因为数据从磁盘上获取,瞬间就具有了可操作性。从结果上来说,数据不需要在处理之前预加载到内存中,而Ignite会缓慢地恢复内存级的性能。

# 2.Ignite定位

Ignite是不是持久化或者纯内存存储?

都是,Ignite的原生持久化可以打开,也可以关闭。这使得Ignite可以存储比可用内存容量更大的数据集。也就是说,可以只在内存中存储较少的操作性数据集,然后将不适合存储在内存中的较大数据集存储在磁盘上,即为了提高性能将内存作为一个缓存层。

Ignite是不是内存数据库(IMDB)?

,虽然Ignite的固化内存在内存和磁盘中都工作得很好,但是磁盘持久化是可以关闭的,使Ignite成为一个支持SQL以及分布式关联的内存数据库

Ignite是不是内存数据网格(IMDG)?

,Ignite是一个全功能的数据网格,它既可以用于纯内存模式,也可以带有Ignite的原生持久化,它也可以与任何第三方数据库集成,包括RDBMS和NoSQL。

Ignite是不是一个分布式缓存?

,如果关闭原生持久化,Ignite就会成为一个分布式缓存,Ignite实现了JCache规范(JSR107),并且提供了比规范要求更多的功能,包括分区和复制模式、分布式ACID事务、SQL查询以及原生持久化等。

Ignite是不是分布式数据库?

,在整个集群的多个节点中,Ignite中的数据要么是分区模式的,要么是复制模式的,这给系统带来了伸缩性,增加了系统的弹性。Ignite可以自动控制数据如何分区,同时,开发者也可以插入自定义的分布(关联)函数,以及为了提高效率将部分数据并置在一起。

Ignite是不是SQL数据库?

不完整,尽管Ignite的目标是和其它的关系型SQL数据库具有类似的行为,但是在处理约束和索引方面还是有不同的。Ignite支持一级二级索引,但是只有一级索引支持唯一性,Ignite还不支持外键约束。

总体来说,Ignite作为约束不支持任何会导致集群广播消息的更新以及显著降低系统性能和可伸缩性的操作。

Ignite是不是一个NoSQL数据库?

不完全,和其它NoSQL数据库一样,Ignite支持高可用和水平扩展,但是,和其它的NoSQL数据库不同,Ignite支持SQL和ACID。

Ignite是不是事务型数据库?

Ignite在键-值API级别支持ACID事务,Ignite还支持跨分区的事务,这意味着事务可以跨越不同服务器不同分区。

Ignite在2.7版本中,通过MVCC技术,实现了包括SQL事务在内的全事务支持,不过目前还处于测试阶段。

Ignite是不是一个多模式数据库?

,Ignite的数据建模和访问,同时支持键值和SQL,另外,Ignite还为在分布式数据上的计算处理,提供了强大的API。

Ignite是不是键-值存储?

,Ignite提供了丰富的键-值API,兼容于JCache (JSR-107),并且支持Java,C++和.NET。

固化内存是什么?

Ignite的固化内存架构使得Ignite可以将内存计算延伸至磁盘,它基于一个页面化的堆外内存分配器,它通过预写日志(WAL)的持久化来对数据进行固化,当持久化禁用之后,固化内存就会变成一个纯粹的内存存储。

并置处理是什么?

Ignite是一个分布式系统,因此,有能力将数据和数据以及数据和计算进行并置就变得非常重要,这会避免分布式数据噪声。当执行分布式SQL关联时数据的并置就变得非常的重要。Ignite还支持将用户的逻辑(函数,lambda等)直接发到数据所在的节点然后在本地进行数据的运算。

# 3.入门

# 3.1.环境要求

Apache Ignite官方在如下环境中进行了测试:

  • JDK:Oracle JDK8及以上,Open JDK8及以上,IBM JDK8及以上,如果使用了JDK9或之后的版本,具体可以看下面的在JDK9及以后版本中运行Ignite章节;
  • OS:Linux(任何版本),Mac OS X(10.6及以上),Windows(XP及以上),Windows Server(2008及以上),Oracle Solaris;
  • 网络:没有限制(建议10G);
  • 架构:x86,x64,SPARC,PowerPC

# 3.2.在JDK9/10/11中运行Ignite

要在Java 9/10/11环境下运行Ignite,需要按照如下步骤操作:

  1. 配置JAVA_HOME环境变量或者Windows的PATH,指向Java的安装目录;
  2. Ignite使用了专有的SDK API,默认不可用,需要给JVM传递特定的标志,以使这些API可用。如果使用了启动脚本ignite.sh(或者Windows中的ignite.bat),那么什么都不需要做,因为脚本中已经配置好了,否则需要在应用的JVM中提供下面的参数;
  3. Java 11中已经可以使用TLSv1.3,目前还不支持,如果节点间使用了SSL,可以考虑添加-Djdk.tls.client.protocols=TLSv1.2
  4. 给应用的JVM添加如下的参数,如果使用的是Java瘦客户端或者Ignite JDBC,是不需要这些的:
--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
--add-exports=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED
--add-exports=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED
--add-exports=java.base/sun.reflect.generics.reflectiveObjects=ALL-UNNAMED
--illegal-access=permit
1
2
3
4
5
6

# 3.3.启动第一个Ignite集群

使用二进制包

可以从下面的步骤开始:

  • 官网下载zip格式的Ignite二进制包;
  • 解压到系统中的一个安装文件夹;
  • (可选)配置IGNITE_HOME环境变量/PATH,指向安装文件夹,确保路径不以/结尾。

其它的安装选项

除了二进制包,Ignite还支持源代码安装、docker、云镜像以及RPM格式,具体可以看下面的说明。 在应用中,建议使用Maven,后面会介绍。

现在就可以使用命令行接口启动第一个Ignite集群,如下所示,可以使用默认的或者也可以传入自定义的配置文件,可以启动任意多个节点,它们之间会自动发现。

使用默认的配置

使用默认的配置启动集群,打开命令行,转到IGNITE_HOME(Ignite安装文件夹),然后输入:

Linux:

$ bin/ignite.sh
1

Windows:

$ bin\ignite.bat
1

输出大致如下:

[02:49:12] Ignite node started OK (id=ab5d18a6)
[02:49:12] Topology snapshot [ver=1, nodes=1, CPUs=8, heap=1.0GB]
1
2

ignite.sh/ignite.bat默认会使用config/default-config.xml配置文件启动节点。

传入配置文件

要使用一个自定义配置文件,可以将其作为参数传给ignite.sh/bat,如下:

Linux:

$ bin/ignite.sh examples/config/example-ignite.xml
1

Windows:

$ bin\ignite.bat examples\config\example-ignite.xml
1

配置文件的路径,可以是绝对路径,也可以是相对于IGNITE_HOME(Ignite安装文件夹)的相对路径,也可以是类路径中的META-INF文件夹。

交互模式

如果要使用交互模式选择一个配置文件,传入-i参数即可,就是ignite.sh -i

好,这样就成功了!

# 3.4.使用Maven

下一步是将Ignite嵌入自己的应用,Java中的最简单的入门方式是使用Maven依赖系统。

Ignite中只有ignite-core模块是必须的,一般来说,要使用基于Spring的xml配置,还需要ignite-spring模块,要使用SQL查询,还需要ignite-indexing模块。

下面中的${ignite-version}需要替换为实际使用的版本。

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-core</artifactId>
    <version>${ignite.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-spring</artifactId>
    <version>${ignite.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-indexing</artifactId>
    <version>${ignite.version}</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Maven配置

关于如何包含个别的ignite maven模块的更多信息,可以参考Maven设置章节。

每个二进制包中,都会有一个示例工程,在开发环境中打开这个工程,然后转到{ignite_version}/examples文件夹找到pom.xml文件,依赖引入之后,各种示例就可以演示Ignite的各种功能了。

# 3.5.第一个SQL应用

下面会创建两张表及其索引,分别为City表和Person表,分别表示居住在城市中的人,并且城市中会有很多的人,通过WITH子句然后指定affinityKey=city_id,可以将人对象和其居住的城市对象并置在一起。

通过命令行或者嵌入式模式启动Ignite集群节点后,可以通过下面的语句创建SQL模式:

SQL:

CREATE TABLE City (
  id LONG PRIMARY KEY, name VARCHAR)
  WITH "template=replicated";

CREATE TABLE Person (
  id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id, city_id))
  WITH "backups=1, affinityKey=city_id";

CREATE INDEX idx_city_name ON City (name);

CREATE INDEX idx_person_name ON Person (name);
1
2
3
4
5
6
7
8
9
10
11

JDBC:

// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

// Open JDBC connection.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");

// Create database tables.
try (Statement stmt = conn.createStatement()) {

    // Create table based on REPLICATED template.
    stmt.executeUpdate("CREATE TABLE City (" +
    " id LONG PRIMARY KEY, name VARCHAR) " +
    " WITH \"template=replicated\"");

    // Create table based on PARTITIONED template with one backup.
    stmt.executeUpdate("CREATE TABLE Person (" +
    " id LONG, name VARCHAR, city_id LONG, " +
    " PRIMARY KEY (id, city_id)) " +
    " WITH \"backups=1, affinityKey=city_id\"");

    // Create an index on the City table.
    stmt.executeUpdate("CREATE INDEX idx_city_name ON City (name)");

    // Create an index on the Person table.
    stmt.executeUpdate("CREATE INDEX idx_person_name ON Person (name)");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

ODBC:

SQLHSTMT stmt;

// Allocate a statement handle.
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

// Create table based on REPLICATED template.
SQLCHAR query1[] = "CREATE TABLE City ("
  "id LONG PRIMARY KEY, name VARCHAR) "
  "WITH \"template=replicated\"";
SQLSMALLINT queryLen1 = static_cast<SQLSMALLINT>(sizeof(query1));

SQLExecDirect(stmt, query, queryLen);

// Create table based on PARTITIONED template with one backup.
SQLCHAR query2[] = "CREATE TABLE Person ( "
    "id LONG, name VARCHAR, city_id LONG "
    "PRIMARY KEY (id, city_id)) "
    "WITH \"backups=1, affinityKey=city_id\"";
SQLSMALLINT queryLen2 = static_cast<SQLSMALLINT>(sizeof(query2));

SQLExecDirect(stmt, query, queryLen);

// Create an index on the City table.
SQLCHAR query3[] = "CREATE INDEX idx_city_name ON City (name)";

SQLSMALLINT queryLen3 = static_cast<SQLSMALLINT>(sizeof(query3));

SQLRETURN ret = SQLExecDirect(stmt, query3, queryLen3);

// Create an index on the Person table.
SQLCHAR query4[] = "CREATE INDEX idx_person_name ON Person (name)";

SQLSMALLINT queryLen4 = static_cast<SQLSMALLINT>(sizeof(query4));

ret = SQLExecDirect(stmt, query4, queryLen4);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

下一步,需要往两个表中注入一些数据,比如:

SQL:

INSERT INTO City (id, name) VALUES (1, 'Forest Hill');
INSERT INTO City (id, name) VALUES (2, 'Denver');
INSERT INTO City (id, name) VALUES (3, 'St. Petersburg');

INSERT INTO Person (id, name, city_id) VALUES (1, 'John Doe', 3);
INSERT INTO Person (id, name, city_id) VALUES (2, 'Jane Roe', 2);
INSERT INTO Person (id, name, city_id) VALUES (3, 'Mary Major', 1);
INSERT INTO Person (id, name, city_id) VALUES (4, 'Richard Miles', 2);
1
2
3
4
5
6
7
8

JDBC:

// Register JDBC driver
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

// Open JDBC connection
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");

// Populate City table
try (PreparedStatement stmt =
conn.prepareStatement("INSERT INTO City (id, name) VALUES (?, ?)")) {

    stmt.setLong(1, 1L);
    stmt.setString(2, "Forest Hill");
    stmt.executeUpdate();

    stmt.setLong(1, 2L);
    stmt.setString(2, "Denver");
    stmt.executeUpdate();

    stmt.setLong(1, 3L);
    stmt.setString(2, "St. Petersburg");
    stmt.executeUpdate();
}

// Populate Person table
try (PreparedStatement stmt =
conn.prepareStatement("INSERT INTO Person (id, name, city_id) VALUES (?, ?, ?)")) {

    stmt.setLong(1, 1L);
    stmt.setString(2, "John Doe");
    stmt.setLong(3, 3L);
    stmt.executeUpdate();

    stmt.setLong(1, 2L);
    stmt.setString(2, "Jane Roe");
    stmt.setLong(3, 2L);
    stmt.executeUpdate();

    stmt.setLong(1, 3L);
    stmt.setString(2, "Mary Major");
    stmt.setLong(3, 1L);
    stmt.executeUpdate();

    stmt.setLong(1, 4L);
    stmt.setString(2, "Richard Miles");
    stmt.setLong(3, 2L);
    stmt.executeUpdate();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

ODBC:

SQLHSTMT stmt;

// Allocate a statement handle.
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

// Populate City table.
SQLCHAR query1[] = "INSERT INTO City (id, name) VALUES (?, ?)";

SQLRETURN ret = SQLPrepare(stmt, query1, static_cast<SQLSMALLINT>(sizeof(query1)));

char name[1024];

int32_t key = 1;
strncpy(name, "Forest Hill", sizeof(name));
ret = SQLExecute(stmt);

key = 2;
strncpy(name, "Denver", sizeof(name));
ret = SQLExecute(stmt);

key = 3;
strncpy(name, "Denver", sizeof(name));
ret = SQLExecute(stmt);

// Populate Person table
SQLCHAR query2[] = "INSERT INTO Person (id, name, city_id) VALUES (?, ?, ?)";

ret = SQLPrepare(stmt, query2, static_cast<SQLSMALLINT>(sizeof(query2)));

key = 1;
strncpy(name, "John Doe", sizeof(name));
int32_t city_id = 3;
ret = SQLExecute(stmt);

key = 2;
strncpy(name, "Jane Roe", sizeof(name));
city_id = 2;
ret = SQLExecute(stmt);

key = 3;
strncpy(name, "Mary Major", sizeof(name));
city_id = 1;
ret = SQLExecute(stmt);

key = 4;
strncpy(name, "Richard Miles", sizeof(name));
city_id = 2;
ret = SQLExecute(stmt);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

Java API:

// Connecting to the cluster.
Ignite ignite = Ignition.start();

// Getting a reference to an underlying cache created for City table above.
IgniteCache<Long, City> cityCache = ignite.cache("SQL_PUBLIC_CITY");

// Getting a reference to an underlying cache created for Person table above.
IgniteCache<PersonKey, Person> personCache = ignite.cache("SQL_PUBLIC_PERSON");

// Inserting entries into City.
SqlFieldsQuery query = new SqlFieldsQuery(
    "INSERT INTO City (id, name) VALUES (?, ?)");

cityCache.query(query.setArgs(1, "Forest Hill")).getAll();
cityCache.query(query.setArgs(2, "Denver")).getAll();
cityCache.query(query.setArgs(3, "St. Petersburg")).getAll();

// Inserting entries into Person.
query = new SqlFieldsQuery(
    "INSERT INTO Person (id, name, city_id) VALUES (?, ?, ?)");

personCache.query(query.setArgs(1, "John Doe", 3)).getAll();
personCache.query(query.setArgs(2, "Jane Roe", 2)).getAll();
personCache.query(query.setArgs(3, "Mary Major", 1)).getAll();
personCache.query(query.setArgs(4, "Richard Miles", 2)).getAll();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

下面就可以查询数据了,可以查询人及其居住的城市,这会进行两个表的关联:

SQL:

SELECT p.name, c.name
FROM Person p, City c
WHERE p.city_id = c.id;
1
2
3

JDBC:

// Register JDBC driver
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

// Open JDBC connection
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");

// Get data
try (Statement stmt = conn.createStatement()) {
    try (ResultSet rs =
    stmt.executeQuery("SELECT p.name, c.name " +
    " FROM Person p, City c " +
    " WHERE p.city_id = c.id")) {

      while (rs.next())
         System.out.println(rs.getString(1) + ", " + rs.getString(2));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

ODBC:

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

// Get data using an SQL join sample.
SQLCHAR query[] = "SELECT p.name, c.name "
  "FROM Person p, City c "
  "WHERE p.city_id = c.id";

SQLSMALLINT queryLen = static_cast<SQLSMALLINT>(sizeof(query));

SQLRETURN ret = SQLExecDirect(stmt, query, queryLen);
1
2
3
4
5
6
7
8
9
10
11
12
13

Java API:

// Connecting to the cluster.
Ignite ignite = Ignition.start();

// Getting a reference to an underlying cache created for City table above.
IgniteCache<Long, City> cityCache = ignite.cache("SQL_PUBLIC_CITY");

// Querying data from the cluster using a distributed JOIN.
SqlFieldsQuery query = new SqlFieldsQuery("SELECT p.name, c.name " +
    " FROM Person p, City c WHERE p.city_id = c.id");

FieldsQueryCursor<List<?>> cursor = cityCache.query(query);

Iterator<List<?>> iterator = cursor.iterator();


while (iterator.hasNext()) {
    List<?> row = iterator.next();

    System.out.println(row.get(0) + ", " + row.get(1));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

这会产生如下的输出:

Mary Major, Forest Hill
Jane Roe, Denver
Richard Miles, Denver
John Doe, St. Petersburg
1
2
3
4

# 3.6.第一个计算应用

作为第一个计算应用,它会计算一句话中非空白字符的字符数量。作为一个示例,首先将一句话分割为多个单词,然后通过计算作业来计算每一个独立单词中的字符数量。最后,我们将从每个作业获得的结果简单相加来获得整个的数量。

闭包计算:

try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
  Collection<IgniteCallable<Integer>> calls = new ArrayList<>();

  // Iterate through all the words in the sentence and create Callable jobs.
  for (final String word : "Count characters using callable".split(" "))
    calls.add(word::length);

  // Execute collection of Callables on the grid.
  Collection<Integer> res = ignite.compute().call(calls);

  // Add up all the results.
  int sum = res.stream().mapToInt(Integer::intValue).sum();

  System.out.println("Total number of characters is '" + sum + "'.");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

非闭包计算:

try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
    Collection<IgniteCallable<Integer>> calls = new ArrayList<>();

    // Iterate through all the words in the sentence and create Callable jobs.
    for (final String word : "Count characters using callable".split(" ")) {
        calls.add(new IgniteCallable<Integer>() {
            @Override public Integer call() throws Exception {
                return word.length();
            }
        });
    }

    // Execute collection of Callables on the grid.
    Collection<Integer> res = ignite.compute().call(calls);

    int sum = 0;

    // Add up individual word lengths received from remote nodes.
    for (int len : res)
        sum += len;

    System.out.println(">>> Total number of characters in the phrase is '" + sum + "'.");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

零部署

注意,由于Ignite的零部署特性,当从IDE运行上面的程序时,远程节点没有经过显式地部署,就获得了计算作业。

另一个例子,创建一个应用,它会读取第一个SQL应用中保存的数据,然后在这些对象上进行一些额外的处理。

下面会创建一个天气警报应用,假定丹佛有一个天气警报,然后需要提醒丹佛的居民为恶劣天气做好准备。

下面是代码片段:

Ignite ignite = Ignition.start();

long cityId = 2; // Id for Denver

// Sending the logic to a cluster node that stores Denver and its residents.
ignite.compute().affinityRun("SQL_PUBLIC_CITY", cityId, new IgniteRunnable() {

  @IgniteInstanceResource
  Ignite ignite;

  @Override
  public void run() {
    // Getting an access to Persons cache.
    IgniteCache<BinaryObject, BinaryObject> people = ignite.cache(
        "Person").withKeepBinary();

    ScanQuery<BinaryObject, BinaryObject> query =
        new ScanQuery <BinaryObject, BinaryObject>();

    try (QueryCursor<Cache.Entry<BinaryObject, BinaryObject>> cursor =
           people.query(query)) {

      // Iteration over the local cluster node data using the scan query.
      for (Cache.Entry<BinaryObject, BinaryObject> entry : cursor) {
        BinaryObject personKey = entry.getKey();

        // Picking Denver residents only only.
        if (personKey.<Long>field("CITY_ID") == cityId) {
            person = entry.getValue();

            // Sending the warning message to the person.
        }
      }
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

在上例中使用了affinityRun()方法,并且指定了SQL_PUBLIC_CITY缓存,cityId以及一个新创建的IgniteRunnable(),这样确保了计算被发送到丹佛及其居民所在的节点,使得可以直接在数据所在的地方执行业务逻辑,避免了昂贵的序列化和网络开销。

# 3.7.第一个数据网格应用

我们再来一个小例子,它从/往分布式缓存中获取/添加数据,并且执行基本的事务。

因为在应用中使用了缓存,要确保它是经过配置的,我们可以用Ignite自带的示例配置,它已经做了一些缓存的配置。

$ bin/ignite.sh examples/config/example-cache.xml
1

Put和Get:

try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
    IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCacheName");

    // Store keys in cache (values will end up on different cache nodes).
    for (int i = 0; i < 10; i++)
        cache.put(i, Integer.toString(i));

    for (int i = 0; i < 10; i++)
        System.out.println("Got [key=" + i + ", val=" + cache.get(i) + ']');
}
1
2
3
4
5
6
7
8
9
10

原子化操作:

// Put-if-absent which returns previous value.
Integer oldVal = cache.getAndPutIfAbsent("Hello", 11);

// Put-if-absent which returns boolean success flag.
boolean success = cache.putIfAbsent("World", 22);

// Replace-if-exists operation (opposite of getAndPutIfAbsent), returns previous value.
oldVal = cache.getAndReplace("Hello", 11);

// Replace-if-exists operation (opposite of putIfAbsent), returns boolean success flag.
success = cache.replace("World", 22);

// Replace-if-matches operation.
success = cache.replace("World", 2, 22);

// Remove-if-matches operation.
success = cache.remove("Hello", 1);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

事务:

try (Transaction tx = ignite.transactions().txStart()) {
    Integer hello = cache.get("Hello");

    if (hello == 1)
        cache.put("Hello", 11);

    cache.put("World", 22);

    tx.commit();
}
1
2
3
4
5
6
7
8
9
10

分布式锁:

// Lock cache key "Hello".
Lock lock = cache.lock("Hello");

lock.lock();

try {
    cache.put("Hello", 11);
    cache.put("World", 22);
}
finally {
    lock.unlock();
}
1
2
3
4
5
6
7
8
9
10
11
12

# 3.8.第一个服务网格应用

Ignite的服务网格对于在集群中部署微服务非常有用,Ignite会处理和部署的服务有关的任务的生命周期,并且提供了在应用中调用服务的简单方式。

作为一个示例,下面会开发一个服务,它会返回一个特定城市当前的天气预报。首先,它会创建一个只有一个方法的服务接口,这个接口扩展自org.apache.ignite.services.Service

服务接口:

import org.apache.ignite.services.Service;

public interface WeatherService extends Service {
    /**
     * Get a current temperature for a specific city in the world.
     *
     * @param countryCode Country code (ISO 3166 country codes).
     * @param cityName City name.
     * @return Current temperature in the city in JSON format.
     * @throws Exception if an exception happened.
     */
    String getCurrentTemperature(String countryCode, String cityName)
        throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

服务的实现会接入天气频道然后获取天气数据,代码如下:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.ignite.services.ServiceContext;


public class WeatherServiceImpl implements WeatherService {
    /** Weather service URL. */
    private static final String WEATHER_URL = "http://api.openweathermap.org/data/2.5/weather?";

    /** Sample app ID. */
    private static final String appId = "ca7345b4a1ef8c037f7749c09fcbf808";

    /** {@inheritDoc}. */
    @Override public void init(ServiceContext ctx) throws Exception {
        System.out.println("Weather Service is initialized!");
    }

    /** {@inheritDoc}. */
    @Override public void execute(ServiceContext ctx) throws Exception {
        System.out.println("Weather Service is started!");
    }

    /** {@inheritDoc}. */
    @Override public void cancel(ServiceContext ctx) {
        System.out.println("Weather Service is stopped!");
    }

    /** {@inheritDoc}. */
    @Override public String getCurrentTemperature(String cityName,
        String countryCode) throws Exception {

        System.out.println(">>> Requested weather forecast [city="
            + cityName + ", countryCode=" + countryCode + "]");

        String connStr = WEATHER_URL + "q=" + cityName + ","
            + countryCode + "&appid=" + appId;

        URL url = new URL(connStr);

        HttpURLConnection conn = null;

        try {
            // Connecting to the weather service.
            conn = (HttpURLConnection) url.openConnection();

            conn.setRequestMethod("GET");

            conn.connect();

            // Read data from the weather server.
            try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(conn.getInputStream()))) {

                String line;
                StringBuilder builder = new StringBuilder();

                while ((line = reader.readLine()) != null)
                    builder.append(line);

                return builder.toString();
            }
        } finally {
            if (conn != null)
                conn.disconnect();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

最后,服务需要在集群中进行部署,然后就可以在应用端进行调用,为了简化,服务在同一个应用中进行部署和调用,如下:

import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;

public class ServiceGridExample {

    public static void main(String[] args) throws Exception {
        try (Ignite ignite = Ignition.start()) {

            // Deploying a single instance of the Weather Service
            // in the whole cluster.
            ignite.services().deployClusterSingleton("WeatherService",
               new WeatherServiceImpl());

            // Requesting current weather for London.
            WeatherService service = ignite.services().service("WeatherService");

            String forecast = service.getCurrentTemperature("London", "UK");

            System.out.println("Weather forecast in London:" + forecast);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

零部署和服务网格

零部署是不支持服务网格的,如果希望将上面的服务部署在通过ignite.sh或者ignite.bat文件启动的节点上,那么就需要将服务的实现打成jar包然后放在{apache_ignite_version}/libs文件夹中。

# 3.9.集群管理和监控

查看数据网格的数据、以及执行其它的管理和监控操作的最简单方式是使用Ignite Web控制台,还有就是使用Ignite的Visor命令行工具。

# 3.10.Docker和云镜像安装

最新的Ignite Docker镜像以及AWS和Google计算引擎的云镜像,可以通过Ignite的下载页面获得。

# 3.11.RPM|DEB包安装

Ignite可以通过官方的RPM和DEB仓库进行安装。

# 3.12.通过源代码构建

如果下载了源代码,可以使用下面的命令构建二进制包:

# Unpack the source package
$ unzip -q apache-ignite-{version}-src.zip
$ cd apache-ignite-{version}-src

# Build In-Memory Data Fabric release (without LGPL dependencies)
$ mvn clean package -DskipTests

# Build In-Memory Data Fabric release (with LGPL dependencies)
$ mvn clean package -DskipTests -Prelease,lgpl

# Build In-Memory Hadoop Accelerator release
# (optionally specify version of hadoop to use)
$ mvn clean package -DskipTests -Dignite.edition=hadoop [-Dhadoop.version=X.X.X]
1
2
3
4
5
6
7
8
9
10
11
12
13

具体细节请参见源码包中的DEVNOTES.txt文件。

# 4.Ignite生命周期

# 4.1.概述

Ignite是基于JVM的,一个JVM可以运行一个或者多个逻辑Ignite节点(大多数情况下,一个JVM仅运行一个Ignite节点)。在整个Ignite文档中,会交替地使用术语Ignite运行时以及Ignite节点,比如说可以该主机运行5个节点,技术上通常意味着主机上启动5个JVM,每个JVM运行一个节点,Ignite也支持一个JVM运行多个节点,事实上,通常作为Ignite内部测试用。

提示

Ignite运行时 == JVM进程 == Ignite节点(多数情况下)

# 4.2.Ignition类

Ignition类在网络中启动各个Ignite节点,注意一台物理服务器(网络中的一台计算机)可以运行多个Ignite节点。 下面的代码是在全默认配置下在本地启动网格节点;

Ignite ignite = Ignition.start();
1

或者传入一个配置文件:

Ignite ignite = Ignition.start("examples/config/example-cache.xml");
1

配置文件的路径既可以是绝对路径,也可以是相对于IGNITE_HOME的相对路径,也可以是相对于类路径的META-INF文件夹。

# 4.3.LifecycleBean

有时可能希望在Ignite节点启动和停止的之前和之后执行特定的操作,这个可以通过实现LifecycleBean接口实现,然后在Spring的配置文件中通过指定IgniteConfigurationlifecycleBeans属性实现。

<bean class="org.apache.ignite.IgniteConfiguration">
    ...
    <property name="lifecycleBeans">
        <list>
            <bean class="com.mycompany.MyLifecycleBean"/>
        </list>
    </property>
    ...
</bean>
1
2
3
4
5
6
7
8
9

LifecycleBean也可以像下面这样通过编程的方式实现:

// Create new configuration.
IgniteConfiguration cfg = new IgniteConfiguration();

// Provide lifecycle bean to configuration.
cfg.setLifecycleBeans(new MyLifecycleBean());

// Start Ignite node with given configuration.
Ignite ignite = Ignition.start(cfg)
1
2
3
4
5
6
7
8

一个LifecycleBean的实现可能如下所示:

public class MyLifecycleBean implements LifecycleBean {
    @Override public void onLifecycleEvent(LifecycleEventType evt) {
        if (evt == LifecycleEventType.BEFORE_NODE_START) {
            // Do something.
            ...
        }
    }
}
1
2
3
4
5
6
7
8

也可以将Ignite实例以及其它有用的资源注入LifecycleBean实现,查看资源注入章节可以了解更多的信息。

# 4.4.生命周期事件类型

当前支持如下生命周期事件类型:

  • BEFORE_NODE_START:Ignite节点的启动程序初始化之前调用
  • AFTER_NODE_START:Ignite节点启动之后调用
  • BEFORE_NODE_STOP:Ignite节点的停止程序初始化之前调用
  • AFTER_NODE_STOP:Ignite节点停止之后调用

# 5.异步支持

# 5.1.概述

Ignite的多数API即可以支持同步模式,也可以支持异步模式,异步方法需要有一个Async后缀。

// Synchronous get
V get(K key);

// Asynchronous get
IgniteFuture<V> getAsync(K key);
1
2
3
4
5

异步操作返回的是一个IgniteFuture或其子类的实例,通过如下方式可以获得异步操作的结果,或者调用阻塞的IgniteFuture.get()方法,或者通过IgniteFuture.listen()方法或者IgniteFuture.chain()方法注册一个闭包,然后等待当操作完成后调用闭包。

# 5.2.支持的接口

下面列出的接口可以用于同步或者异步模式:

  • IgniteCompute
  • IgniteCache
  • Transaction
  • IgniteServices
  • IgniteMessaging
  • IgniteEvents

# 5.3.监听器和Future链

要在非阻塞模式下等待异步操作的结果(IgniteFuture.get()),可以使用IgniteFuture.listen()方法或者IgniteFuture.chain()方法注册一个闭包,当操作完成后,闭包会被调用,比如:

IgniteCompute compute = ignite.compute();

// Execute a closure asynchronously.
IgniteFuture<String> fut = compute.callAsync(() -> {
    return "Hello World";
});

// Listen for completion and print out the result.
fut.listen(f -> System.out.println("Job result: " + f.get()));
1
2
3
4
5
6
7
8
9

闭包执行和线程池

异步操作完成后,如果通过IgniteFuture.listen()或者IgniteFuture.chain()方法传递了闭包,那么闭包就会被调用线程以同步的方式执行,否则,闭包就会随着操作的完成异步地执行。 根据操作的类型,闭包可能被系统线程池中的线程调用(异步缓存操作),或者被公共线程池中的线程调用(异步计算操作)。因此需要避免在闭包实现中调用同步的缓存和计算操作,否则可能导致死锁。 要实现Ignite计算操作异步嵌套执行,可以使用自定义线程池,相关内容可以查看自定义线程池中的相关内容。

# 6.资源注入

# 6.1.概述

Ignite中,预定义的资源都是可以进行依赖注入的,同时支持基于属性和基于方法的注入。任何加注正确注解的资源都会在初始化之前注入相对应的任务、作业、闭包或者SPI。

# 6.2.基于属性和基于方法

可以通过在一个属性或者方法上加注注解来注入资源。当加注在属性上时,Ignite只是在注入阶段简单地设置属性的值(不会理会该属性的访问修饰符)。如果在一个方法上加注了资源注解,它会访问一个与注入资源相对应的输入参数的类型,如果匹配,那么在注入阶段,就会将适当的资源作为输入参数,然后调用该方法。

基于属性:

Ignite ignite = Ignition.ignite();

Collection<String> res = ignite.compute().broadcast(new IgniteCallable<String>() {
  // Inject Ignite instance.
  @IgniteInstanceResource
  private Ignite ignite;

  @Override
  public String call() throws Exception {
    IgniteCache<Object, Object> cache = ignite.getOrCreateCache(CACHE_NAME);

    // Do some stuff with cache.
     ...
  }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

基于方法:

public class MyClusterJob implements ComputeJob {
    ...
    private Ignite ignite;
    ...
    // Inject Ignite instance.
    @IgniteInstanceResource
    public void setIgnite(Ignite ignite) {
        this.ignite = ignite;
    }
    ...
}
1
2
3
4
5
6
7
8
9
10
11

# 6.3.预定义的资源

有很多的预定义资源可供注入:

资源 描述
CacheNameResource CacheConfiguration.getName()提供,注入网格缓存名
CacheStoreSessionResource 注入当前的CacheStoreSession实例
IgniteInstanceResource 注入当前的Ignite实例
JobContextResource 注入ComputeJobContext的实例。作业的上下文持有关于一个作业执行的有用的信息。比如,可以获得包含与作业并置的条目的缓存的名字。
LoadBalancerResource 注入ComputeLoadBalancer的实例,注入后可以用于任务的负载平衡。
LoggerResource 注入IgniteLogger的实例,它可以用于向本地节点的日志写消息。
ServiceResource 通过指定服务名注入Ignite的服务。
SpringApplicationContextResource 注入Spring的ApplicationContext资源。
SpringResource 从Spring的ApplicationContext注入资源,当希望访问在Spring的ApplicationContext XML配置中指定的一个Bean时,可以用它。
TaskContinuousMapperResource 注入一个ComputeTaskContinuousMapper的实例,持续映射可以在任何时点从任务中发布作业,即使过了map的初始化阶段。
TaskSessionResource 注入ComputeTaskSession资源的实例,它为一个特定的任务执行定义了一个分布式的会话。

# 7.线程池

# 7.1.概述

Ignite创建并且维护着一组线程池,根据使用的API不同分别用于不同的目的。本章节中会列出一些众所周知的内部线程池,然后会展示如何自定义线程池。在IgniteConfiguration的javadoc中,可以看到Ignite中可用的完整线程池列表。

# 7.2.系统线程池

系统线程池处理所有与缓存相关的操作,除了SQL以及其它的查询类型,它们会使用查询线程池,同时这个线程池也负责处理Ignite计算任务的取消操作。

默认的线程池数量为max(8,CPU总核数),使用IgniteConfiguration.setSystemThreadPoolSize(...)可以进行调整。

# 7.3.公共线程池

公共线程池负责Ignite的计算网格,所有的计算任务都由这个线程池接收然后处理。

默认的线程池数量为max(8,CPU总核数),使用IgniteConfiguration.setPublicThreadPoolSize(...)可以进行调整。

# 7.4.查询线程池

查询线程池处理集群内所有的SQL、扫描和SPI查询。

默认的线程池数量为max(8,CPU总核数),使用IgniteConfiguration.setQueryThreadPoolSize(...)可以进行调整。

# 7.5.服务线程池

Ignite的服务网格调用使用的是服务线程池,Ignite的服务和计算网格组件都有专用的线程池,可以避免当一个服务实现希望调用一个计算(或者反之)时的线程争用和死锁。

默认的线程池数量为max(8,CPU总核数),使用IgniteConfiguration.setServiceThreadPoolSize(...)可以进行调整。

# 7.6.平行线程池

平行线程池通过将操作展开为多个平行的执行,有助于显著加速基本的缓存操作以及事务,因为可以避免相互竞争。

默认的线程池数量为max(8,CPU总核数),使用IgniteConfiguration.setStripedPoolSize(...)可以进行调整。

# 7.7.数据流处理器线程池

数据流处理器线程池用于处理来自IgniteDataStreamer的所有消息和请求,各种内置的使用IgniteDataStreamer的流适配器也可以。

默认的线程池数量为max(8,CPU总核数),使用IgniteConfiguration.setDataStreamerThreadPoolSize(...)可以进行调整。

# 7.8.自定义线程池

对于Ignite的计算任务,也可以配置自定义的线程池,当希望同步地从一个计算任务调用另一个的时候很有用,因为可以避免死锁。要保证这一点,需要确保执行嵌套任务的线程池不同于上级任务的线程池。

自定义线程池需要在IgniteConfiguration中进行定义,并且需要有一个唯一的名字:

Java:

IgniteConfiguration cfg = ...;

cfg.setExecutorConfiguration(new ExecutorConfiguration("myPool").setSize(16));
1
2
3

XML:

<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
  <property name="executorConfiguration">
    <list>
      <bean class="org.apache.ignite.configuration.ExecutorConfiguration">
        <property name="name" value="myPool"/>
        <property name="size" value="16"/>
      </bean>
    </list>
  </property>
  ...
</bean>
1
2
3
4
5
6
7
8
9
10
11
12

这样,假定下面的计算任务由上面定义的myPool线程池中的线程执行:

public class InnerRunnable implements IgniteRunnable {
    @Override public void run() {
        System.out.println("Hello from inner runnable!");
    }
}
1
2
3
4
5

怎么做呢,需要使用IgniteCompute.withExecutor(),它会被上级任务的实现马上执行,像下面这样:

public class OuterRunnable implements IgniteRunnable {
    @IgniteInstanceResource
    private Ignite ignite;

    @Override public void run() {
        // Synchronously execute InnerRunnable in custom executor.
        ignite.compute().withExecutor("myPool").run(new InnerRunnable());
    }
}
1
2
3
4
5
6
7
8
9

上级任务的执行可通过如下方式触发,对于这个场景,它会由公共线程池执行:

ignite.compute().run(new OuterRunnable());
1

未定义线程池

如果应用请求在自定义线程池执行计算任务,而该线程池在Ignite节点中未定义,那么一个特定的警告消息就会在节点的日志中输出,然后任务就会被公共线程池接管执行。

# 8.二进制编组器

# 8.1.基本概念

从1.6版本开始,Ignite引入了一个在缓存中存储数据的新概念,名为二进制对象,这个新的序列化格式提供了若干个优势:

  • 它可以从一个对象的序列化形式中读取一个任意的属性,而不需要将该对象完整地反序列化,这个功能完全删除了将缓存的键和值类部署到服务端节点类路径的必要性;
  • 它可以为同一个类型的对象增加和删除属性,给定的服务端节点不需要有模型类的定义,这个功能允许动态改变对象的结构,甚至允许多个客户端持有类定义的不同版本,它们是共存的;
  • 它可以根据类型名构造一个新的对象,根本不需要类定义,因此允许动态类型创建;

二进制对象只可以用于使用默认的二进制编组器时(即没有在配置中显式地设置其它的编组器)

限制

BinaryObject格式实现也带来了若干个限制:

  1. 在内部Ignite不会写属性以及类型的名字,但是使用一个小写的名字哈希来标示一个属性或者类型,这意味着属性或者类型不能有同样的名字哈希。即使序列化不会在哈希冲突的情况下工作,但Ignite在配置级别提供了一种方法来解决此冲突;
  2. 同样的原因,BinaryObject格式在类的不同层次上也不允许有同样的属性名;
  3. 如果类实现了Externalizable接口,Ignite会使用OptimizedMarshallerOptimizedMarshaller会使用writeExternal()readExternal()来进行类对象的序列化和反序列化,这需要将实现Externalizable的类加入服务端节点的类路径中。

IgniteBinary入口,可以从Ignite的实例获得,包含了操作二进制对象的所有必要的方法。

自动化哈希值计算和Equals实现

如果一个对象可以被序列化到二进制形式,那么Ignite会在序列化期间计算它的哈希值并且将其写入最终的二进制数组。另外,Ignite还为二进制对象的比较需求提供了equals方法的自定义实现。这意味着,不需要为在Ignite中使用的自定义键和值覆写GetHashCodeEquals方法,除非它们无法序列化成二进制形式。

比如,Externalizable类型的对象无法被序列化成二进制形式,这时就需要自行实现hashCodeequals方法,具体可以看上面的限制章节。

# 8.2.配置二进制对象

在绝大多数情况下不需要额外地配置二进制对象。 但是,如果需要覆写默认的类型和属性ID计算或者加入BinarySerializer,可以为IgniteConfiguration定义一个BinaryConfiguration对象,这个对象除了为每个类型指定映射以及序列化器之外还可以指定一个全局的Name映射、一个全局ID映射以及一个全局的二进制序列化器。对于每个类型的配置,通配符也是支持的,这时提供的配置会适用于匹配类型名称模板的所有类型。 配置二进制类型:

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">

  <property name="binaryConfiguration">
    <bean class="org.apache.ignite.configuration.BinaryConfiguration">

      <property name="nameMapper" ref="globalNameMapper"/>
      <property name="idMapper" ref="globalIdMapper"/>

      <property name="typeConfigurations">
        <list>
          <bean class="org.apache.ignite.binary.BinaryTypeConfiguration">
            <property name="typeName" value="org.apache.ignite.examples.*"/>
            <property name="serializer" ref="exampleSerializer"/>
          </bean>
        </list>
      </property>
    </bean>
  </property>
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 8.3.BinaryObject缓存API

Ignite默认使用反序列化值作为最常见的使用场景,要启用BinaryObject处理,需要获得一个IgniteCache的实例然后使用withKeepBinary()方法。启用之后,如果可能,这个标志会确保从缓存返回的对象都是BinaryObject格式的。将值传递给EntryProcessorCacheInterceptor也是同样的处理。

平台类型

注意当通过withKeepBinary()方法启用BinaryObject处理时并不是所有的对象都会表示为BinaryObject,会有一系列的平台类型,包括基本类型、String、UUID、Date、Timestamp、BigDecimal、Collections、Maps和这些类型的数组,它们不会被表示为BinaryObject

注意在下面的示例中,键类型为Integer,它是不会被修改,因为它是平台类型。

获取BinaryObject:

// Create a regular Person object and put it to the cache.
Person person = buildPerson(personId);
ignite.cache("myCache").put(personId, person);

// Get an instance of binary-enabled cache.
IgniteCache<Integer, BinaryObject> binaryCache = ignite.cache("myCache").withKeepBinary();

// Get the above person object in the BinaryObject format.
BinaryObject binaryPerson = binaryCache.get(personId);
1
2
3
4
5
6
7
8
9

# 8.4.使用BinaryObjectBuilder修改二进制对象

BinaryObject实例是不能修改的,要更新属性或者创建新的BinaryObject,必须使用BinaryObjectBuilder的实例。

BinaryObjectBuilder的实例可以通过IgniteBinary入口获得。它可以使用类型名创建,这时返回的对象不包含任何属性,或者它也可以通过一个已有的BinaryObject创建,这时返回的对象会包含从给定的BinaryObject中拷贝的所有属性。

获取BinaryObjectBuilder实例的另外一个方式是调用已有BinaryObject实例的toBuilder()方法,这种方式创建的对象也会从BinaryObject中拷贝所有的数据。

限制

  • 无法修改已有字段的类型;
  • 无法变更枚举值的顺序,也无法在枚举值列表的开始或者中部添加新的常量,但是可以在列表的末尾添加新的常量。

下面是一个使用BinaryObjectAPI来处理服务端节点的数据而不需要将程序部署到服务端以及不需要实际的数据反序列化的示例:

EntryProcessor内的BinaryObject:

// The EntryProcessor is to be executed for this key.
int key = 101;

cache.<Integer, BinaryObject>withKeepBinary().invoke(
  key, new CacheEntryProcessor<Integer, BinaryObject, Object>() {
  	public Object process(MutableEntry<Integer, BinaryObject> entry,
                          Object... objects) throws EntryProcessorException {
		    // Create builder from the old value.
        BinaryObjectBuilder bldr = entry.getValue().toBuilder();

        //Update the field in the builder.
        bldr.setField("name", "Ignite");

        // Set new value to the entry.
        entry.setValue(bldr.build());

        return null;
     }
  });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 8.5.BinaryObject类型元数据

像上面描述的那样,二进制对象结构可以在运行时进行修改,因此获取一个存储在缓存中的一个特定类型的信息也可能是有用的,比如属性名、属性类型名,关联属性名,Ignite通过BinaryType接口满足这样的需求。

这个接口还引入了一个属性getter的更快的版本,叫做BinaryField。这个概念类似于Java的反射,可以缓存BinaryField实例中读取的属性的特定信息,它有助于从一个很大的二进制对象集合中读取同一个属性。

Collection<BinaryObject> persons = getPersons();

BinaryField salary = null;

double total = 0;
int cnt = 0;

for (BinaryObject person : persons) {
    if (salary == null)
        salary = person.type().field("salary");

    total += salary.value(person);
    cnt++;
}

double avg = total / cnt;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 8.6.BinaryObject和CacheStore

在缓存API上调用withKeepBinary()方法对于将用户对象传入CacheStore的方式不起作用,这么做是故意的,因为大多数情况下单个CacheStore实现要么使用反序列化类,要么使用BinaryObject表示。要控制对象传入Store的方式,需要使用CacheConfigurationstoreKeepBinary标志,当该标志设置为false时,会将反序列化值传入Store,否则会使用BinaryObject表示。

下面是一个使用BinaryObject的Store的伪代码实现的示例:

public class CacheExampleBinaryStore extends CacheStoreAdapter<Integer, BinaryObject> {
    @IgniteInstanceResource
    private Ignite ignite;

    /** {@inheritDoc} */
    @Override public BinaryObject load(Integer key) {
        IgniteBinary binary = ignite.binary();

        List<?> rs = loadRow(key);

        BinaryObjectBuilder bldr = binary.builder("Person");

        for (int i = 0; i < rs.size(); i++)
            bldr.setField(name(i), rs.get(i));

        return bldr.build();
    }

    /** {@inheritDoc} */
    @Override public void write(Cache.Entry<? extends Integer, ? extends BinaryObject> entry) {
        BinaryObject obj = entry.getValue();

        BinaryType type = obj.type();

        Collection<String> fields = type.fieldNames();

        List<Object> row = new ArrayList<>(fields.size());

        for (String fieldName : fields)
            row.add(obj.field(fieldName));

        saveRow(entry.getKey(), row);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 8.7.二进制Name映射器和二进制ID映射器

在内部,Ignite不会写属性或者类型名字的完整字符串,而是为了性能,为类型和属性名写一个整型哈希值。经过测试,在类型相同时,属性名或者类型名的哈希值冲突实际上是不存在的,为了性能使用哈希值是安全的。对于当不同的类型或者属性确实冲突的场合,BinaryNameMapperBinaryIdMapper可以为该类型或者属性名覆写自动生成的哈希值。

BinaryNameMapper:映射类型/类和属性名到不同的名字;

BinaryIdMapper:映射从BinaryNameMapper来的类型和属性名到ID,以便于Ignite内部使用。

Ignite直接支持如下的映射器实现:

  • BinaryBasicNameMapperBinaryNameMapper的一个基本实现,对于一个给定的类,根据使用的setSimpleName(boolean useSimpleName)属性值,会返回一个完整或者简单的名字;
  • BinaryBasicIdMapperBinaryIdMapper的一个基本实现,它有一个lowerCase配置属性,如果属性设置为false,那么会返回一个给定类型或者属性名的哈希值,如果设置为true,会返回一个给定类型或者属性名的小写形式的哈希值。

如果仅仅使用Java或者.NET客户端并且在BinaryConfiguration中没有指定映射器,那么Ignite会使用BinaryBasicNameMapper并且simpleName属性会被设置为false,使用BinaryBasicIdMapper并且lowerCase属性会被设置为true

如果使用了C++客户端并且在BinaryConfiguration中没有指定映射器,那么Ignite会使用BinaryBasicNameMapper并且simpleName属性会被设置为true,使用BinaryBasicIdMapper并且lowerCase属性会被设置为true

如果使用Java、.Net或者C++,默认是不需要任何配置的,只有当需要平台协同、名字转换复杂的情况下,才需要配置映射器。

# 9.日志

Ignite支持各种日志库和框架,可以直接使用Log4jLog4j2JCLSLF4J,本文会描述如何使用它们。

# 9.1.通用配置

Ignite节点启动之后,会在控制台中输出启动信息,包括了配置的日志库信息。每个日志库都有自己的配置参数,需要分别进行配置。除了库特有的配置,还有一些系统属性可以对日志进行调整,如下表所示:

系统属性 描述 默认值
IGNITE_LOG_INSTANCE_NAME 如果该属性存在,Ignite会在日志消息中包含实例名 未配置
IGNITE_QUIET 配置为false可以禁用静默模式,启用详细模式,其会输出更多的信息 true
IGNITE_LOG_DIR 该属性会指定Ignite日志的输出目录 $IGNITE_HOME/work/log
IGNITE_DUMP_THREADS_ON_FAILURE 如果配置为true,在捕获严重错误时会在日志中输出线程堆栈信息 true

# 9.2.默认日志

Ignite默认会使用java.util.logging.Logger(JUL),通过$IGNITE_HOME/config/java.util.logging.properties配置文件进行配置,然后将日志写入$IGNITE_HOME/work/log文件夹,要修改这个日志目录,需要使用IGNITE_LOG_DIR环境变量。

另外,Ignite启动于静默模式,会阻止INFODEBUG日志的输出。要关闭静默模式,可以使用-DIGNITE_QUIET=false系统属性。注意静默模式的所有信息都是输出到标准输出(STDOUT)的。

默认日志目录

如果是在Java应用内启动Ignite,日志目录为$IGNITE_HOME/work,默认为/tmp/ignite/work/log/,不过要注意将日志目录配置为一个更可靠的位置。

如果使用jul-to-slf4j桥,要确保配置正确

如果使用了jul-to-slf4j桥,需要特别关注下Ignite中的JUL日志级别。如果在org.apache上配置了DEBUG级别,那么最终的日志级别会为INFO。这意味着在生成日志时会产生十倍的负载,然后在通过桥时被丢弃。JUL默认级别为INFO,在org.apache.ignite.logger.java.JavaLogger#isDebugEnabled中设置一个断点,会显示JUL子系统是否在生成调试级别日志。

注意

通过LoggingMXBean,可以在运行时对默认的日志记录器进行重新配置。

基本日志配置:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">

  <!-- uncomment the following section to set, e.g., log4j as the logging library to be used-->
  <!--property name="gridLogger">
    <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
      <constructor-arg type="java.lang.String" value="log4j.xml"/>
    </bean>
  </property-->

   <!-- how frequently Ignite will output basic node metrics into the log-->
  <property name="metricsLogFrequency" value="#{60 * 10 * 1000}"/>

</bean>
1
2
3
4
5
6
7
8
9
10
11
12
13

# 9.3.Log4j

如果在启动独立集群节点时要使用Log4j模块,需要在执行ignite.{sh|bat}脚本前,将optional/ignite-log4j文件夹移动到Ignite二进制包的lib目录下,这时这个模块目录中的内容会被添加到类路径。

如果项目中使用maven进行依赖管理,那么需要添加如下的依赖:

<dependency>
  <groupId>org.apache.ignite</groupId>
  <artifactId>ignite-log4j</artifactId>
  <version>${ignite.version}</version>
</dependency>
1
2
3
4
5

${ignite.version}替换为实际使用的Ignite版本。

要使用Log4j进行日志记录,需要配置IgniteConfigurationgridLogger属性,如下所示:

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
  <property name="gridLogger">
    <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
      <constructor-arg type="java.lang.String" value="log4j.xml"/>
    </bean>
  </property>
  <!-- Other Ignite configurations -->
  ...
</bean>
1
2
3
4
5
6
7
8
9

Java:

IgniteConfiguration cfg = new IgniteConfiguration();

IgniteLogger log = new Log4JLogger("log4j.xml");

cfg.setGridLogger(log);

// Start Ignite node.
Ignite ignite = Ignition.start(cfg);

ignite.log().info("Info Message Logged!");
1
2
3
4
5
6
7
8
9
10

在上面的配置中,log4j.xml的路径要么是绝对路径,要么是相对路径,相对路径可以相对于META-INF,也可以相对于IGNITE_HOME

注意

Log4j支持运行时配置,即配置文件的修改无需应用重启即可生效。

# 9.4.Log4j2

如果在启动独立集群节点时要使用Log4j2模块,需要在执行ignite.{sh|bat}脚本前,将optional/ignite-log4j2文件夹移动到Ignite二进制包的lib目录下,这时这个模块目录中的内容会被添加到类路径。

如果项目中使用maven进行依赖管理,那么需要添加如下的依赖:

<dependency>
  <groupId>org.apache.ignite</groupId>
  <artifactId>ignite-log4j2</artifactId>
  <version>${ignite.version}</version>
</dependency>
1
2
3
4
5

${ignite.version}替换为实际使用的Ignite版本。

要使用Log4j2进行日志记录,需要配置IgniteConfigurationgridLogger属性,如下所示:

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
  <property name="gridLogger">
    <bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
      <constructor-arg type="java.lang.String" value="log4j2.xml"/>
    </bean>
  </property>
  <!-- Other Ignite configurations -->
  ...
</bean>
1
2
3
4
5
6
7
8
9

Java:

IgniteConfiguration cfg = new IgniteConfiguration();

IgniteLogger log = new Log4J2Logger("log4j2.xml");

cfg.setGridLogger(log);

// Start Ignite node.
Ignite ignite = Ignition.start(cfg);

ignite.log().info("Info Message Logged!");
1
2
3
4
5
6
7
8
9
10

在上面的配置中,log4j2.xml的路径要么是绝对路径,要么是相对路径,相对路径可以相对于META-INF,也可以相对于IGNITE_HOME

注意

Log4j2支持运行时配置,即配置文件的修改无需应用重启即可生效。

# 9.5.JCL

如果在启动独立集群节点时要使用JCL模块,需要在执行ignite.{sh|bat}脚本前,将optional/ignite-jcl文件夹移动到Ignite二进制包的lib目录下,这时这个模块目录中的内容会被添加到类路径。

如果项目中使用maven进行依赖管理,那么需要添加如下的依赖:

<dependency>
  <groupId>org.apache.ignite</groupId>
  <artifactId>ignite-jcl</artifactId>
  <version>${ignite.version}</version>
</dependency>
1
2
3
4
5

${ignite.version}替换为实际使用的Ignite版本。

要使用JCL进行日志记录,需要配置IgniteConfigurationgridLogger属性,如下所示:

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
  <property name="gridLogger">
    <bean class="org.apache.ignite.logger.jcl.JclLogger">
    </bean>
  </property>
  <!-- Other Ignite configurations -->
  ...
</bean>
1
2
3
4
5
6
7
8

Java:

IgniteConfiguration cfg = new IgniteConfiguration();

IgniteLogger log = new JclLogger();

cfg.setGridLogger(log);

// Start Ignite node.
Ignite ignite = Ignition.start(cfg);

ignite.log().info("Info Message Logged!");
1
2
3
4
5
6
7
8
9
10

注意

注意JCL只是简单地将日志消息转发给底层的日志系统,这需要正确的配置,具体请参见JCL官方文档。比如要使用Log4j,类路径中需要添加必要的库文件。

# 9.6.SLF4J

如果在启动独立集群节点时要使用SLF4J模块,需要在执行ignite.{sh|bat}脚本前,将optional/ignite-slf4j文件夹移动到Ignite二进制包的lib目录下,这时这个模块目录中的内容会被添加到类路径。

如果项目中使用maven进行依赖管理,那么需要添加如下的依赖:

<dependency>
  <groupId>org.apache.ignite</groupId>
  <artifactId>ignite-slf4j</artifactId>
  <version>${ignite.version}</version>
</dependency>
1
2
3
4
5

${ignite.version}替换为实际使用的Ignite版本。

要使用JCL进行日志记录,需要配置IgniteConfigurationgridLogger属性,如下所示:

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
  <property name="gridLogger">
    <bean class="org.apache.ignite.logger.slf4j.Slf4jLogger"/>
  </property>

  <!-- Other Ignite configurations -->

</bean>
1
2
3
4
5
6
7
8

Java:

IgniteConfiguration cfg = new IgniteConfiguration();

IgniteLogger log = new Slf4jLogger();

cfg.setGridLogger(log);

// Start Ignite node.
Ignite ignite = Ignition.start(cfg);

ignite.log().info("Info Message Logged!");
1
2
3
4
5
6
7
8
9
10

要了解更多的信息,可以看SLF4J手册

# 9.7.日志配置示例

下面的步骤可以引导开发者进行日志的配置,这可以覆盖大多数的场景。

  1. 使用Log4j或者Log4j2作为日志框架,具体可以看上面章节的介绍;
  2. 如果使用了默认的配置文件(ignite-log4j.xmlignite-log4j2.xml),需要取消CONSOLEAppender的注释;
  3. 在日志配置文件中,指定日志文件的路径,默认值为${IGNITE_HOME}/work/log/ignite.log
  4. Ignite以详细日志模式启动:
  • 如果使用的是ignite.sh,指定-v选项;
  • 如果通过Java代码启动,使用IGNITE_QUIET=false系统变量。

# 9.8.日志管理提示

日志在故障排除和查找错误方面起着重要作用。

以下是一些关于如何管理日志文件的一般提示:

  • 不要将日志文件存储在/tmp文件夹中,每次重启服务器时都会清除此文件夹;
  • 确保存储日志文件的磁盘上有足够的可用空间;
  • 定期存档旧日志文件以节省存储空间。

# 10.异常处理

下表描述了Ignite API支持的异常以及可以用来处理这些异常的操作。可以查看javadoc中的throws子句,查看是否存在已检查异常。

异常 描述 要采取的动作 运行时异常
IgniteException 此异常表示网格中存在错误。 操作失败,从方法退出。
IgniteClientDisconnectedException 当客户端节点与集群断开连接时,Ignite API(缓存操作、计算API和数据结构操作)会抛出此异常。 Future中等待并重试。
IgniteAuthenticationException 当节点身份验证失败或安全身份验证失败时,会抛出此异常。 操作失败,从方法退出。
IgniteClientException 缓存操作会抛出此异常。 根据异常消息确定下一步的动作。
IgniteDeploymentException 当Ignite API(计算网格相关)未能在节点上部署作业或任务时,会抛出此异常。 操作失败,从方法退出。
IgniteInterruptedException 此异常用于将标准InterruptedException包装为IgniteException 清除中断标志后重试。
IgniteSpiException SPI引发的异常,如CollisionSpiLoadBalancingSpi
TcpDiscoveryIpFinderFailoverSpiUriDeploymentSpi等。
操作失败,从方法退出。
IgniteSQLException SQL查询处理失败会抛出此异常,该异常会包含相关规范定义的错误代码 操作失败,从方法退出。
IgniteAccessControlException 认证/授权失败时会抛出此异常。 操作失败,从方法退出。
IgniteCacheRestartingException 如果缓存正在重启,Ignite的缓存API会抛出此异常。 Future中等待并重试。
IgniteFutureTimeoutException Future的计算超时时,会抛出此异常。 要么增加超时限制要么方法退出。
IgniteFutureCancelledException Future的计算因为被取消而无法获得结果时,会抛出此异常。 可进行重试。
IgniteIllegalStateException 此异常表示Ignite实例对于请求的操作处于无效状态。 操作失败,从方法退出。
IgniteNeedReconnectException 此异常显示节点应尝试重新连接到集群。 可进行重试。
IgniteDataIntegrityViolationException 如果发现数据完整性冲突,会抛出此异常。 操作失败,从方法退出。
IgniteOutOfMemoryException 系统没有足够内存处理Ignite操作(缓存操作)时,会抛出此异常。 操作失败,从方法退出。
IgniteTxOptimisticCheckedException 当事务以乐观方式失败时,会抛出此异常。 可进行重试
IgniteTxRollbackCheckedException 当事务自动回滚时,会抛出此异常。 可进行重试。
IgniteTxTimeoutCheckedException 当事务超时时,会抛出此异常。 可进行重试。
ClusterTopologyException 当集群拓扑发生错误(比如节点故障)时会抛出此异常(针对计算和事件API)。 Future中等待并重试。

# 11.FAQ

1.堆内和堆外内存存储有何不同?

当处理很大的堆时,通过在Java主堆空间外部缓存数据,可以使缓存克服漫长的JVM垃圾收集(GC)导致的暂停,但是数据仍然在内存中。 更多信息

2.Apache Ignite是一个键值存储么?

Apache Ignite是一个具有计算能力的、有弹性的内存中的分布式对象存储。在其最简单的形式中,是的,Apache Ignite可以作为一个键/值存储(缓存),但是也暴露了更丰富的API来与数据交互,比如完整的ANSI99兼容的SQL查询、文本检索、事务等等。 更多信息

3.Apache Ignite是否支持JSON文档?

当前,Apache Ignite并不完整支持JSON文档,但是当前处于beta阶段的Node.js客户端会支持JSON文档。

4.Apache Ignite是否可以用于Apache Hive?

是,Apache Ignite的Hadoop加速器提供了一系列的组件,支持在任何的Hadoop发行版中执行内存中的Hadoop作业执行和文件系统操作,包括Apache Hive。 在Ignite化的Hadoop中运行Apache Hive

5.在事务隔离的悲观模式中,是否锁定键的读和写?

是的,主要的问题在于,在悲观模式中,访问是会获得锁,而在乐观模式中,锁是在提交阶段获得的。 更多信息

6.是否可以用Hibernate访问Apache Ignite?

是的,Apache Ignite可以用作Hibernate的二级缓存(或者L2缓存),它可以显著地提升应用的持久化层的速度。 更多信息

7.Apache Ignite是否支持JDBC?

是的,Apache Ignite提供了JDBC驱动,可以在缓存中使用标准SQL查询和JDBC API获得分布式的数据。 更多信息

8.Apache Ignite是否保证消息的顺序?

是的,如果希望收到消息的顺序与发送消息的顺序一致,可以使用sendOrdered(...)方法。可以传递一个超时时间来指定一条消息在队列中的等待时间,它会等待本来应在其之前发送的消息。如果超时时间过期,所有的还没有到达该节点中一个给定主题的消息都会被忽略。 更多信息

9.是否可以运行Java和.Net闭包?它是如何工作的?

.Net节点可以同时执行Java和.Net闭包,而标准Java节点只能执行Java闭包。当启动ApacheIgnite.exe时,它会使用位于IGNITE_HOME/platforms/dotnet/bin的一个脚本在同一个进程下同时启动JVM和CLR,.Net闭包会被CLR处理执行。

10.Java和.Net之间的转换成本是什么?

仅有的最小可能的开销是一个额外的数组复制+JNI调用,在本地测试时这个开销可能降低性能,但在真正的分布式负载环境下可以忽略不计。

11.闭包是如何传输的?

每个闭包都是一个特定类的对象。当它要被发送时会序列化成二进制的形式,通过网络发送到一个远程节点然后在那里反序列化。该远程节点在类路径中应该有该闭包类,或者开启对等类加载以从发送端加载该类。

12.SQL查询是否被负载平衡?

SQL查询总是被广播到保存有要查询的数据的每个节点,例外就是本地SQL查询(query.setLocal(true)),它只是在一个本地节点执行,还有就是可以精确标识节点的部分查询。

13.用户是否可以控制资源分配?即,是否可以限制用户A为50个节点,但是用户B可以在所有的100个节点上执行任务?

多租户只在缓存中存在,它们可以在创建在一个节点的子集上(可以看CacheConfiguration.setNodeFilter)以及在每个缓存基础上安全地赋予权限。

IGFS的未来如何?

开发IGFS的初衷是将其做为Hadoop加速的解决方案。然而在实践中,IGFS提供了不一致的性能表现,它所提供的任何性能增加对于生产来说都微不足道,此外它的整合成本也比较高。要获得数量级的性能提升,基于内存的存储必须与应用使用的API紧密耦合,对于IGFS,存储是Ignite,而API是用Hive、Impala、Pig、MapReduce等分别开发的。

对于废弃Hadoop的场景以及实时分析来说,最好使用Ignite的标准配置:即打开Ignite的原生持久化,然后使用Ignite SQL、计算网格或ML处理Ignite中的数据,并使用Hadoop框架处理HDFS数据集。也可以考虑将Spark作为一个通用API,用于合并存储在两个数据集中的数据。

最后更新时间:: 5/10/2020, 12:40:02 PM