使用hazelcast 实现 arrow 流高效共享
依赖
xml
<dependencies>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-spring</artifactId>
</dependency>
<dependency>
<groupId>org.duckdb</groupId>
<artifactId>duckdb_jdbc</artifactId>
<version>1.3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>16.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>16.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>16.0.0</version>
</dependency>
</dependencies>
code
JDK11 以上需要在启动参数中开启 --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
- 运行
testLocalCache5
作为arrow
流的生产者, 对应的 byte[] 放到 hazelcast即可 - 运行
testLocalCache6
作为arrow
流的消费者, 根据 hazelcast 的 byte[] 初始化ArrowStreamReader
java
@Test
@SneakyThrows
void testLocalCache5() {
Config config = new Config();
config.addCacheConfig(new CacheSimpleConfig().setName("A"));
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
BufferAllocator rootAllocator = new RootAllocator();
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema(), rootAllocator);
BigIntVector idVector = (BigIntVector) vectorSchemaRoot.getVector("id");
BigIntVector tmsVector = (BigIntVector) vectorSchemaRoot.getVector("tms");
VarCharVector waveVector = (VarCharVector) vectorSchemaRoot.getVector("wave");
VarCharVector argVector = (VarCharVector) vectorSchemaRoot.getVector("arg");
waveVector.allocateNew(1);
idVector.allocateNew(1);
argVector.allocateNew(1);
tmsVector.allocateNew(1);
idVector.set(0, 0);
tmsVector.set(0, 1);
waveVector.setSafe(0, "demo".getBytes());
argVector.setSafe(0, "demo".getBytes());
vectorSchemaRoot.setRowCount(4000);
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, null, Channels.newChannel(out));
writer.start();
writer.writeBatch();
ArrowArrayStream stream = ArrowArrayStream.allocateNew(rootAllocator);
ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), rootAllocator);
Data.exportArrayStream(rootAllocator, reader, stream);
hazelcastInstance.getMap("A").set("key", out.toByteArray());
Thread.sleep(1000000000);
}
@Test
@SneakyThrows
void testLocalCache6() {
Config config = new Config();
config.addCacheConfig(new CacheSimpleConfig().setName("A"));
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
DuckDBConnection conn = (DuckDBConnection) DriverManager.getConnection("jdbc:duckdb:");
BufferAllocator rootAllocator = new RootAllocator();
Object o = hazelcastInstance.getMap("A").get("key");
ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream((byte[]) o), rootAllocator);
ArrowArrayStream stream = ArrowArrayStream.allocateNew(rootAllocator);
Data.exportArrayStream(rootAllocator, reader, stream);
conn.registerArrowStream("testStream", stream);
conn.createStatement().execute(" COPY ( select id,tms,wave,arg from testStream) to " +
"'D:\\1\\aaa.parquet' ( FORMAT 'parquet', CODEC 'snappy') ");
}
private Schema schema() {
Field wave = new Field("wave", FieldType.nullable(new ArrowType.Utf8()), null);
Field arg = new Field("arg", FieldType.nullable(new ArrowType.Utf8()), null);
Field id = new Field("id", FieldType.nullable(new ArrowType.Int(64, true)), null);
Field tms = new Field("tms", FieldType.nullable(new ArrowType.Int(64, true)), null);
return new Schema(asList(id, tms, wave, arg));
}
使用duckdb表作为 arrow 生产者
消费者逻辑不变
java
@Test
@SneakyThrows
void testLocalCache7() {
Config config = new Config();
config.addCacheConfig(new CacheSimpleConfig().setName("A"));
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
try (var conn = DriverManager.getConnection("jdbc:duckdb:");
var stmt = conn.prepareStatement("SELECT * FROM generate_series(2000)");
var resultset = (DuckDBResultSet) stmt.executeQuery();
var allocator = new RootAllocator()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (var reader = (ArrowReader) resultset.arrowExportStream(allocator, 256)) {
try (ArrowStreamWriter writer = new ArrowStreamWriter(reader.getVectorSchemaRoot(), null, baos)) {
writer.start();
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
writer.writeBatch();
root.clear(); // 清除当前批次的数据
}
writer.end();
}
}
hazelcastInstance.getMap("A").set("key", baos.toByteArray());
}
Thread.sleep(1000000000);
}