@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
// Get flowfile
FlowFile flowFile = processSession.get();
if (flowFile == null) {
return;
}
try {
// Get filename of flowFile
String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
// Invoke REST service with filename as parameter (For now parameter is just '1')
String restEndpoint = processContext.getProperty(REST_ENDPOINT).getValue();
JSONObject jsonResult = Unirest.get(restEndpoint)
.header("accept", "application/json")
.asJson()
.getBody()
.getObject();
// Add attributes to flowfile based on REST call
Map<String, String> newAttributes = new HashMap<>();
newAttributes.put(ATT_ACCOUNT_NAME, jsonResult.getString("name"));
newAttributes.put(ATT_ACCOUNT_USERNAME, jsonResult.getString("username"));
FlowFile updatedFlowFile = processSession.putAllAttributes(flowFile, newAttributes);
// Transfer flowfile to success state
processSession.transfer(updatedFlowFile, REL_SUCCESS);
} catch (UnirestException e) {
processSession.transfer(flowFile, REL_FAILURE);
throw new ProcessException(e);
}
}
UpdateAttributeREST.java 文件源码
java
阅读 22
收藏 0
点赞 0
评论 0
项目:nifi-dataminded-bundle
作者:
评论列表
文章目录