在 Apache Spark 中使用 JNI 调用 C/C++ 代码


简介

Apache Spark 目前越来越流行,在实际开发使用过程中,会需要使用到一些 C/C++ 的类库,需要使用 JNI 来调用,本文通过一个简单的例子来对 Spark 使用 JNI 进行讲解。

本文的测试环境是 Linux,其他环境类似,但是不保证 C++ 等代码可以在其他系统下编译使用,Spark 版本使用的是 2.1。

JNI 封装 Base64

本文通过一个简单的示例,来封装一下 Base64(随便在网上找了一个Base64 C++ 代码,为避免文件名冲突,文件重命名为 base64_impl.h)。

定义 Java 类

首先先定义一个 Java 类,当然会有 native 方法。

// filename: Base64.java
import java.io.Serializable;

public class Base64 implements Serializable {
    static {
        System.loadLibrary("base64");
    }

    public Base64() {}

    public native static String encode(String data);
    public native static String decode(String data);

    public static void main(String[] args) {
        System.out.println(Base64.encode("test_string"));
        System.out.println(Base64.decode("dGVzdF9zdHJpbmc="));
    }
}

注意:需要实现 Serializable,否则可能在 Spark 程序中出现序列化错误。

注意:类必须是 public,否则可能会报错:

error: class Base64 cannot be accessed in package <empty>

编译和生成头文件

使用 javac 编译上面的 Java 类,然后使用 javah 来生成一个头文件,后续我们需要实现头文件的方法。

# 编译,生成 .class 文件
javac Base64.java
# 生成头文件
javah Base64

需要注意:本例子中为了简单,没有使用 package,如果有 package 的话,请注意写 package 全名;如果有额外的依赖,可以通过 -cp 来指定依赖的 jar 包路径或 class 文件路径。

javac -cp $class_path:. Base64.java

javah -cp $class_path:. Base64

生成的头文件大概是这个样子的

// filename: Base64.h
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class Base64 */

#ifndef _Included_Base64
#define _Included_Base64
#ifdef __cplusplus
extern "C" {
#endif
/*
 * Class:     Base64
 * Method:    encode
 * Signature: (Ljava/lang/String;)Ljava/lang/String;
 */
JNIEXPORT jstring JNICALL Java_Base64_encode
  (JNIEnv *, jclass, jstring);

/*
 * Class:     Base64
 * Method:    decode
 * Signature: (Ljava/lang/String;)Ljava/lang/String;
 */
JNIEXPORT jstring JNICALL Java_Base64_decode
  (JNIEnv *, jclass, jstring);

#ifdef __cplusplus
}
#endif
#endif

实现 .h 头文件

简单的封装 base64 代码

// filename: Base64.cpp
#include "Base64.h"
#include "base64_impl.h"

JNIEXPORT jstring JNICALL Java_Base64_encode
  (JNIEnv *env, jclass clazz, jstring data) {
    int data_size = env->GetStringUTFLength(data);
    const char* data_ptr = env->GetStringUTFChars(data, 0);
    int output_size = Base64::EncodedLength(data_size);

    char* buf = new char[output_size+1];
    if (!Base64::Encode(data_ptr, data_size, buf, output_size)) {
        printf("Failed to encode input string\n");
    }
    buf[output_size] = '\0';

    jstring ret = env->NewStringUTF(buf);
    delete[] buf;
    return ret;
}

JNIEXPORT jstring JNICALL Java_Base64_decode
  (JNIEnv *env, jclass clazz, jstring data) {
    int data_size = env->GetStringUTFLength(data);
    const char* data_ptr = env->GetStringUTFChars(data, 0);
    int output_size = Base64::DecodedLength(data_ptr, data_size);

    char* buf = new char[output_size+1];
    if (!Base64::Decode(data_ptr, data_size, buf, output_size)) {
        printf("Failed to decode input string\n");
    }
    buf[output_size] = '\0';

    jstring ret = env->NewStringUTF(buf);
    delete[] buf;
    return ret;
}

编译测试

本文采用了一个 makefile 的编译方法,makefile 的逻辑是写明 .o 和 .so 的生成逻辑即可。

需要依赖环境变量 JAVA_HOME。

CC=g++
CFLAGS=-Wall
OBJS=Base64.o
all: libbase64.so

Base64.o: Base64.cpp
	$(CC) $(CFLAGS) -I${JAVA_HOME}/include \
	        -I${JAVA_HOME}/include/linux \
	        -I. \
	  -fpic -c $< -o $@

libbase64.so: $(OBJS)
	g++ -shared -o $@ $(OBJS)
	rm -f $(OBJS)
.PHONY : clean
clean:
	rm -f $(OBJS) libbase64.so

注意:makefile 有个规定,命令前面必须是 tab,否则会报错。

makefile:7: *** missing separator.  Stop.

直接执行编译命令 make 即可,然后会生成 so 动态库,接下来就可以直接测试了。

$ make
g++ -Wall -I/path/to/your-jdk/include \
        -I/path/to/your-jdk/include/linux \
        -I. \
  -fpic -c Base64.cpp -o Base64.o
g++ -shared -o libbase64.so Base64.o
rm -f Base64.o

$ java Base64
dGVzdF9zdHJpbmc=
test_string

至此,jni 的库写好了。

Spark 调用 jni

基本方法是先打包 class 文件为 jar,然后 so 文件和 jar 文件一起提交,并且指定 java.library.path,这样就可以找到 jni 的 so 库,保证代码可以执行。

打包 jar

这里直接使用了 jar 命令来进行打包,其使用方法与 tar 类似,可以配置 maven 等编译工具来进行打包。

jar cf base64.jar Base64.class

Spark-shell 测试

启动 spark-shell

$ spark-shell --files libbase64.so --jars base64.jar \
    --conf spark.executor.extraJavaOptions='-Djava.library.path=./' \
    --conf spark.driver.extraJavaOptions='-Djava.library.path=./' \
    --master yarn \
    --queue <queue_name> \
    --num-executors 10 \
    --executor-cores 4 \
    --executor-memory 5G \
    --driver-memory 5G

参数注意事项:–files 传入 so 动态库,–jars 传入 jar 包,需要额外配置 java.library.path。

当然配置环境变量 LD_LIBRARY_PATH 也是可以的。

测试代码

val rows = spark.read.format("text").load("/path/to/base64_encoded_string_files")

import org.apache.spark.sql.types._

val schema = StructType(
  StructField("value",StringType,true) ::
  Nil
)

val encoder = org.apache.spark.sql.catalyst.encoders.RowEncoder(schema)

val df = rows.map(row => {
  // implements java.io.Serializable 就不会报序列化错误
  val base64 = new Base64()
  val buffer = row.toSeq.toBuffer
  val line = buffer(0).asInstanceOf[String]
  org.apache.spark.sql.Row.fromSeq(Array(Base64.decode(line.stripLineEnd)))
})(encoder)

df.show()

结束语

至此,完成了一个完整的 spark 调用 jni 的例子,例子还相对比较简单,完整示例参考 github

本文的示例是一个静态方法的示例,类方法也是类似的,只是参数略有不同,后续会单独对 JNI 部分进行一个小的总结。

参考资料

  1. A Simple Java Native Interface (JNI) example in Java and Scala
  2. 使用JNI进行Java与C/C++语言混合编程(1)–在Java中调用C/C++本地库

如果觉得文章对您有帮助,用微信请作者喝杯咖啡吧!这样他会更有动力,分享更多更好的知识!

wechat赞赏