在CSV中获取奇怪的字符,使其无法在Spark中读取

7y4bm7vi  于 5个月前  发布在  Spark
关注(0)|答案(2)|浏览(63)

我正在使用Salesforce Bulk 2.0 API来获取Salesforce Objects的数据。我已经创建了相应的函数,以便-

/**
 * Fetches results from a Job ID of Batch query.
 * NOTE : Provided Job ID must be in "JobComplete" state to fetch results
 *
 * @param jobId      Job ID
 * @param maxRecords Number of records to fetch in one chunk of API hit
 * @return {@link String} CSV-formatted String/results (text/csv)
 * @throws IOException Exception in mapping object
 * @author ujjawal pandey
 */
public String getJobResults(String jobId, String maxRecords) throws IOException {
    String getJobInfoUrl = String.format(JobResourcePath.getJobResultPath(), this.apiUrl,
            this.apiVersion, jobId, maxRecords);
    String sforceLocator = null;
    String filePath = jobId + ".csv";
    boolean isFirstDatasetFetch = true;
    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
            Files.newOutputStream(Paths.get(filePath)),
            StandardCharsets.UTF_8));
    do {
        String getJobInfoUrlLocator = getJobInfoUrl;
        HttpClient httpClient = HttpClientBuilder.create().build();
        if (sforceLocator != null) {
            getJobInfoUrlLocator = getJobInfoUrl + "&locator=" + sforceLocator;
        }

        LOGGER.info(getJobInfoUrlLocator);
        HttpGet httpGet = new HttpGet(getJobInfoUrlLocator);
        httpGet.setHeader("Authorization", "Bearer " + accessToken);

        HttpResponse response = httpClient.execute(httpGet);
        LOGGER.info(response.toString());
        int responseCode = response.getStatusLine().getStatusCode();
        String responseBody = "";

        if (responseCode == Constants.HTTP_STATUS_OK) {
            org.apache.http.HttpEntity entity = response.getEntity();
            sforceLocator = response.getFirstHeader("Sforce-Locator").getValue();
            LOGGER.info("Locator is: " + sforceLocator);

            responseBody = EntityUtils.toString(entity);

            LOGGER.debug(String.valueOf(responseBody));
            if (isFirstDatasetFetch) {
                writer.write(responseBody);
                isFirstDatasetFetch = false;
            } else {
                writer.write(responseBody.substring(
                        responseBody.indexOf('\n') + 1));
            }
        } else {
            LOGGER.error(responseBody);
            throw new RuntimeException(responseBody);
        }
    } while (sforceLocator != null && !sforceLocator.equals("null"));
    writer.close();
    return filePath;
}

字符串
问题是创建的CSV格式是正确的,但在某些列中出现了一些奇怪的字符。例如,â¢(欧元在它们之间,粘贴使其消失)
现在,当我在Spark中阅读时,使用以下配置-

spark.read()
     .option("header", "true")
     .option("delimiter", ",")
     .option("lineSep", "\n")
     .option("multiLine", "true")
     .option("encoding", "UTF-8")
     .csv(hdfsTempCsvStoragePath + "/" + csvPath);


我得到了额外的行(4),因为下面的字符“可能”。PFA。
x1c 0d1x的数据
我知道这个问题与编码有关,但与良好的理解无关。
1.我在这里错过了什么(基本概念),因为这个问题正在发生?
1.解决这个问题的最佳方法是什么?
因为我认为我错过了一些东西,所以我没有尝试太多。我认为最后的选择是我必须清理CSV,以便Spark可以读取它。

bxfogqkk

bxfogqkk1#

我没有代表评论。这不是一个真正的答案。但我有几个意见,你应该会发现有帮助。我不知道Spark。
1.当执行创建CSV的Java时,将此属性添加到命令行:-Dfile.encoding=UTF-8
这将使UTF-8字符不会被篡改为两个非ASPLESS字符,并且它将保留可处理的非ASPLESS UTF-8字符。
1.如果你要清理CSV文件,这个正则表达式可以用来识别特殊字符:/[^\x00-\x7f]/
如果你可以访问从时间开始的数据,你可以使用这个正则表达式来识别所有之前需要注意的字符。
没有人可以建议你最好的方法,因为我们不知道数据,什么进程正在监视你的批量API作业(例如,如果你抛出一个异常,数据是否可以被处理以供人类审查),有多频繁地出现特殊字符(以及哪些字符),或者数据100%正确的紧迫性(或者即使这是可能的)。

zbq4xfa0

zbq4xfa02#

实际上,这些额外的行不是因为下面的字符,而是因为上面的一行,它有双引号,那些没有转义,因此spark无法正确解析CSV,将下面的行分成多行。

spark.read()
     .option("header", "true")
     .option("delimiter", ",")
     .option("multiLine", "true")
     .option("encoding", "UTF-8")
     .option("escape", "\"")

字符串
另外,那些奇怪的字符实际上是“子弹”,但由于我的函数中的编码不匹配,它们就这样出现了。我用UTF-8编写CSV,但EntityUtils.toString(entity);正在将其转换为其他编码,所以我通过EntityUtils.toString(entity, "UTF-8")强制将toString转换为UTF-8。

相关问题