简介
Apache Spark 目前越来越流行,在实际开发使用过程中,会需要使用到一些 C/C++ 的类库,需要使用 JNI 来调用,本文通过一个简单的例子来对 Spark 使用 JNI 进行讲解。
本文的测试环境是 Linux,其他环境类似,但是不保证 C++ 等代码可以在其他系统下编译使用,Spark 版本使用的是 2.1。
JNI 封装 Base64
本文通过一个简单的示例,来封装一下 Base64(随便在网上找了一个Base64 C++ 代码,为避免文件名冲突,文件重命名为 base64_impl.h)。
定义 Java 类
首先先定义一个 Java 类,当然会有 native 方法。
// filename: Base64.java
import java.io.Serializable;
public class Base64 implements Serializable {
static {
System.loadLibrary("base64");
}
public Base64() {}
public native static String encode(String data);
public native static String decode(String data);
public static void main(String[] args) {
System.out.println(Base64.encode("test_string"));
System.out.println(Base64.decode("dGVzdF9zdHJpbmc="));
}
}
注意:需要实现 Serializable,否则可能在 Spark 程序中出现序列化错误。
注意:类必须是 public,否则可能会报错:
error: class Base64 cannot be accessed in package <empty>
编译和生成头文件
使用 javac 编译上面的 Java 类,然后使用 javah 来生成一个头文件,后续我们需要实现头文件的方法。
# 编译,生成 .class 文件
javac Base64.java
# 生成头文件
javah Base64
需要注意:本例子中为了简单,没有使用 package,如果有 package 的话,请注意写 package 全名;如果有额外的依赖,可以通过 -cp 来指定依赖的 jar 包路径或 class 文件路径。
javac -cp $class_path:. Base64.java
javah -cp $class_path:. Base64
生成的头文件大概是这个样子的
// filename: Base64.h
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class Base64 */
#ifndef _Included_Base64
#define _Included_Base64
#ifdef __cplusplus
extern "C" {
#endif
/*
* Class: Base64
* Method: encode
* Signature: (Ljava/lang/String;)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_Base64_encode
(JNIEnv *, jclass, jstring);
/*
* Class: Base64
* Method: decode
* Signature: (Ljava/lang/String;)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_Base64_decode
(JNIEnv *, jclass, jstring);
#ifdef __cplusplus
}
#endif
#endif
实现 .h 头文件
简单的封装 base64 代码
// filename: Base64.cpp
#include "Base64.h"
#include "base64_impl.h"
JNIEXPORT jstring JNICALL Java_Base64_encode
(JNIEnv *env, jclass clazz, jstring data) {
int data_size = env->GetStringUTFLength(data);
const char* data_ptr = env->GetStringUTFChars(data, 0);
int output_size = Base64::EncodedLength(data_size);
char* buf = new char[output_size+1];
if (!Base64::Encode(data_ptr, data_size, buf, output_size)) {
printf("Failed to encode input string\n");
}
buf[output_size] = '\0';
jstring ret = env->NewStringUTF(buf);
delete[] buf;
return ret;
}
JNIEXPORT jstring JNICALL Java_Base64_decode
(JNIEnv *env, jclass clazz, jstring data) {
int data_size = env->GetStringUTFLength(data);
const char* data_ptr = env->GetStringUTFChars(data, 0);
int output_size = Base64::DecodedLength(data_ptr, data_size);
char* buf = new char[output_size+1];
if (!Base64::Decode(data_ptr, data_size, buf, output_size)) {
printf("Failed to decode input string\n");
}
buf[output_size] = '\0';
jstring ret = env->NewStringUTF(buf);
delete[] buf;
return ret;
}
编译测试
本文采用了一个 makefile 的编译方法,makefile 的逻辑是写明 .o 和 .so 的生成逻辑即可。
需要依赖环境变量 JAVA_HOME。
CC=g++
CFLAGS=-Wall
OBJS=Base64.o
all: libbase64.so
Base64.o: Base64.cpp
$(CC) $(CFLAGS) -I${JAVA_HOME}/include \
-I${JAVA_HOME}/include/linux \
-I. \
-fpic -c $< -o $@
libbase64.so: $(OBJS)
g++ -shared -o $@ $(OBJS)
rm -f $(OBJS)
.PHONY : clean
clean:
rm -f $(OBJS) libbase64.so
注意:makefile 有个规定,命令前面必须是 tab,否则会报错。
makefile:7: *** missing separator. Stop.
直接执行编译命令 make 即可,然后会生成 so 动态库,接下来就可以直接测试了。
$ make
g++ -Wall -I/path/to/your-jdk/include \
-I/path/to/your-jdk/include/linux \
-I. \
-fpic -c Base64.cpp -o Base64.o
g++ -shared -o libbase64.so Base64.o
rm -f Base64.o
$ java Base64
dGVzdF9zdHJpbmc=
test_string
至此,jni 的库写好了。
Spark 调用 jni
基本方法是先打包 class 文件为 jar,然后 so 文件和 jar 文件一起提交,并且指定 java.library.path,这样就可以找到 jni 的 so 库,保证代码可以执行。
打包 jar
这里直接使用了 jar 命令来进行打包,其使用方法与 tar 类似,可以配置 maven 等编译工具来进行打包。
jar cf base64.jar Base64.class
Spark-shell 测试
启动 spark-shell
$ spark-shell --files libbase64.so --jars base64.jar \
--conf spark.executor.extraJavaOptions='-Djava.library.path=./' \
--conf spark.driver.extraJavaOptions='-Djava.library.path=./' \
--master yarn \
--queue <queue_name> \
--num-executors 10 \
--executor-cores 4 \
--executor-memory 5G \
--driver-memory 5G
参数注意事项:–files 传入 so 动态库,–jars 传入 jar 包,需要额外配置 java.library.path。
当然配置环境变量 LD_LIBRARY_PATH 也是可以的。
测试代码
val rows = spark.read.format("text").load("/path/to/base64_encoded_string_files")
import org.apache.spark.sql.types._
val schema = StructType(
StructField("value",StringType,true) ::
Nil
)
val encoder = org.apache.spark.sql.catalyst.encoders.RowEncoder(schema)
val df = rows.map(row => {
// implements java.io.Serializable 就不会报序列化错误
val base64 = new Base64()
val buffer = row.toSeq.toBuffer
val line = buffer(0).asInstanceOf[String]
org.apache.spark.sql.Row.fromSeq(Array(Base64.decode(line.stripLineEnd)))
})(encoder)
df.show()
结束语
至此,完成了一个完整的 spark 调用 jni 的例子,例子还相对比较简单,完整示例参考 github。
本文的示例是一个静态方法的示例,类方法也是类似的,只是参数略有不同,后续会单独对 JNI 部分进行一个小的总结。