如何在Clickhouse中使用C集成Rust UDF?

slmsl1lt  于 8个月前  发布在  ClickHouse
关注(0)|答案(1)|浏览(90)

在C main.c中有一个自定义项

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

extern char *u_shaped_processor(char* camps_list_c);

void replacechar(char *s,char c1,char c2) {
    int i=0;

    for(i=0;s[i];i++) {
        if(s[i]==c1) {
            s[i]=c2;
        }
        if (s[i]=='\n'){
            s[i] = '\0';
        }

    }

}

void str_replace(
    char *target,
    const char *needle,
    const char *replacement
) {
    char buffer[1024] = { 0 };
    char *insert_point = &buffer[0];
    const char *tmp = target;
    size_t needle_len = strlen(needle);
    size_t repl_len = strlen(replacement);

    while (1) {
        const char *p = strstr(tmp, needle);

        // walked past last occurrence of needle; copy remaining part
        if (p == NULL) {
            strcpy(insert_point, tmp);
            break;
        }

        // copy part before needle
        memcpy(insert_point, tmp, p - tmp);
        insert_point += p - tmp;

        // copy replacement string
        memcpy(insert_point, replacement, repl_len);
        insert_point += repl_len;

        // adjust pointers, move on
        tmp = p + needle_len;
    }

    // write altered string back to target
    strcpy(target, buffer);
} 

int main(){
    char *line = NULL;
    size_t len = 0;
    ssize_t read;
    int index = 0;
    int range = 0;
    line = (char*) malloc(len);

    while ((read = getline(&line, &len, stdin)) != -1){
        range = atoi(line);
        index = 0;
        while (index < range){
            line = (char*) malloc(len);
            read = getline(&line, &len, stdin);
            char *final_string = u_shaped_processor(line);
            str_replace(final_string, "\"", "\\\"");
            //replacechar(line, '"', "'");
            printf("{\"result\": \"%s\"} \n", final_string);
            ++index;
        }
        fflush(stdout);
        line = (char*) malloc(len);
        index = 0;
    }
    //fclose(fp);
    free(line);
    return 0;
}

rust func(src/lib.rs)

use std::collections::HashMap;

use libc::c_char;
use std::ffi::CStr;
use std::str;
use std::ffi::CString;

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct InputFormat {
    ts_list: Vec<(String, String)>,
}

extern {
    fn str_c() -> *const c_char;
}

#[no_mangle]
pub unsafe extern "C" fn u_shaped_processor(camps_list_c: *const c_char) ->  *const c_char {
    let mut camps_map: HashMap<String, f64> = HashMap::new();
    //convert c string to rust string
    let c_str = CStr::from_ptr(camps_list_c);
    let rust_str = c_str.to_str().expect("Bad encoding");
    let camps_list_str: String = rust_str.to_owned();

    let value: InputFormat = match serde_json::from_str(&camps_list_str) {
                Ok(value) => value,
                Err(why) => panic!("{:?}", why)
        };
    let camps = value.ts_list.len();
    if camps == 1 {
        let record = camps_map
            .entry((*value.ts_list[0].0).to_string())
            .or_insert(0.0);
        *record += 1.0;
    } else if camps == 2 {
        let record = camps_map
            .entry((*value.ts_list[0].0).to_string())
            .or_insert(0.0);
        *record += 0.5;
        let record = camps_map
            .entry((*value.ts_list[1].0).to_string())
            .or_insert(0.0);
        *record += 0.5;
    } else {
        let mid_weight = 0.2 / (camps - 2) as f64;
        for i in 0..camps {
            let entry = camps_map
                .entry((*value.ts_list[i].0).to_string())
                .or_insert(0.0);
            if i == 0 || i == camps - 1 {
                *entry += 0.4;
            } else {
                *entry += mid_weight;
            }
        }
    }
   let mut result = HashMap::new();
   result.insert(
        "result".to_string(), 
        serde_json::to_string(&camps_map).unwrap()
    );
   let mut final_map = HashMap::new();
   final_map.insert(
        "result".to_string(),
        serde_json::to_string(&result).unwrap()
    );
    let c_string = CString::new(&*final_map["result"]).expect("CString::new failed");
    return c_string.into_raw(); // Move ownership to C
}

/// # Safety
/// The ptr should be a valid pointer to the string allocated by rust
#[no_mangle]
pub unsafe extern fn free_string(ptr: *const c_char) {
    // Take the ownership back to rust and drop the owner
    let _ = CString::from_raw(ptr as *mut _);
}

function.xml

<function>
    <type>executable_pool</type>
    <name>att_c</name>
    <execute_direct>1</execute_direct>
    <argument>
        <type>Array(Tuple(Nullable(String), Nullable(DateTime)))</type>
        <name>ts_list</name>
    </argument>
    <return_type>String</return_type>
    <return_name>result</return_name>
    <format>JSONEachRow</format>
    <send_chunk_header>1</send_chunk_header>
    <command>attribution_c</command>
</function>

查询方式:

SELECT att_c([tuple(CAST(number AS Nullable(String)), CAST(now() AS Nullable(DateTime))), tuple(CAST(number+1 AS Nullable(String)), CAST(now() AS Nullable(DateTime))), tuple(CAST(number+2 AS Nullable(String)), CAST(now() AS Nullable(DateTime))), tuple(CAST(number+5 AS Nullable(String)), CAST(now() AS Nullable(DateTime)))]) as att
FROM system.numbers LIMIT 10

本地运行结果:

1
{"ts_list":[["c1","2023-04-21 00:00:00"],["c2","2023-04-22 00:00:00"],["c3","2023-04-23 00:00:00"],["c4","2023-04-24 00:00:00"],["c5","2023-04-25 00:00:00"]]}
{"result": "{\"c4\":\"0.06666666666666667\",\"c2\":\"0.06666666666666667\",\"c5\":\"0.4\",\"c3\":\"0.06666666666666667\",\"c1\":\"0.4\"}"}

以上是预期的结果,但我在CH上得到了错误:

Code: 1. DB::Exception: Function 'att_c': wrong result, expected 10 row(s), actual 0: while executing 'FUNCTION att_c(array(tuple(CAST(number, 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 1), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 2), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 5), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)'))) :: 2) -> att_c(array(tuple(CAST(number, 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 1), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 2), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 5), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')))) String : 7'. (UNSUPPORTED_METHOD) (version 22.7.1.2484 (official build))

问题在于外部功能。带有外部fn注解的代码工作正常,并返回所需的所有内容。有没有一种方法,我可以通过打印线在clickhouse调试。我在哪里可以找到这些日志?

kknvjkwl

kknvjkwl1#

我的UDF在C中删除str replace并使用gcc编译时适用于上述代码。
这些文章是很好的起点

整合rust和C

https://medium.com/@AlexanderObregon/integrating-rust-into-existing-c-c-projects-e0810dbddded

在两个lang之间发送数据

https://dev.to/kgrech/7-ways-to-pass-a-string-between-rust-and-c-4ieb

相关问题