UpdateAttributeREST.java 文件源码

java
阅读 22 收藏 0 点赞 0 评论 0

项目:nifi-dataminded-bundle 作者:
@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {

    // Get flowfile
    FlowFile flowFile = processSession.get();
    if (flowFile == null) {
        return;
    }

    try {

        // Get filename of flowFile
        String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());

        // Invoke REST service with filename as parameter (For now parameter is just '1')
        String restEndpoint = processContext.getProperty(REST_ENDPOINT).getValue();
        JSONObject jsonResult = Unirest.get(restEndpoint)
                .header("accept", "application/json")
                .asJson()
                .getBody()
                .getObject();

        // Add attributes to flowfile based on REST call
        Map<String, String> newAttributes = new HashMap<>();
        newAttributes.put(ATT_ACCOUNT_NAME, jsonResult.getString("name"));
        newAttributes.put(ATT_ACCOUNT_USERNAME, jsonResult.getString("username"));
        FlowFile updatedFlowFile = processSession.putAllAttributes(flowFile, newAttributes);

        // Transfer flowfile to success state
        processSession.transfer(updatedFlowFile, REL_SUCCESS);

    } catch (UnirestException e) {
        processSession.transfer(flowFile, REL_FAILURE);
        throw new ProcessException(e);
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号