overview
There may be situations when the normal execution of a node is not possible.
The scenarios which come to mind are :
failure to read the metadata with spark, but it still is possible to run a script in hive to populate the target table outside of the pipeline, thus not using the framework
inability of a network copy to create the copy, but there are ways to do it outside of the framework
Failure to create a tarbal from hdfs using the parameters in the framework, but there is a way to do it outside of the framework
For these scenarios when the node goal have been attained by an external tool, there is a need to update the node execution map in the shared memory
For this purpose there is a set of applications which access the shared memory directly and
- list the failed nodes
- update the node execution status to completed or skipped.
shmview app
The application lists the contents of the nodes execution map
CODE:
#include <cbaWFSharedMemory.h>
#include <cbaUtils.h>
#include<iostream>
using namespace std;
int main()
{
char* workflowHome=getenv("CBAWF_HOME");
if (0==workflowHome)
{
cout<<"CBAWF_HOME needs to be set in your environment"<<endl;
return 1;
}
string configFile;
char* workflowIni=getenv("CBAWFINI");
if (0 == workflowIni)
{
configFile=workflowHome;
configFile +="/etc/cbaWorkflow.ini";
}
else
{
configFile=workflowIni;
}
using namespace cbaWorkflowUtils;
utils util;
map<string,string> m_variables;
util.populateVariables(configFile,m_variables);
string segmentName=m_variables["shared_memory"];
string mapName=m_variables["map_name"];
map<string,int> executionMap;
cbaWFSharedMemory cba(segmentName);
cba.setMapName(mapName);
cba.getNodesExecutionStatusMap(executionMap);
for(auto iter=executionMap.begin();iter != executionMap.end();iter++)
{
cout<<iter->first<<" "<<iter->second<<endl;
}
return 0;
}
shmupdate app
The application takes two parameters the node id and the status and updates the execution map:
shmupdate 13058 4
CODE:
#include <cbaWFSharedMemory.h>
#include <cbaUtils.h>
#include<iostream>
using namespace std;
int main(int argc,char**argv)
{
if(argc != 3)
{
cout<<"nodeID and status are needed"<<endl;
return -1;
}
int status =boost::lexical_cast<int>(argv[2]);
string nodeId=argv[1];
char* workflowHome=getenv("CBAWF_HOME");
if (0==workflowHome)
{
cout<<"CBAWF_HOME needs to be set in your environment"<<endl;
return 1;
}
string configFile;
char* workflowIni=getenv("CBAWFINI");
if (0 == workflowIni)
{
configFile=workflowHome;
configFile +="/etc/cbaWorkflow.ini";
}
else
{
configFile=workflowIni;
}
using namespace cbaWorkflowUtils;
utils util;
map<string,string> m_variables;
util.populateVariables(configFile,m_variables);
string segmentName=m_variables["shared_memory"];
string mapName=m_variables["map_name"];
map<string,int> executionMap;
cbaWFSharedMemory cba(segmentName);
cba.setMapName(mapName);
cout<<"setting node "<<nodeId<<" status to "<<status<<endl;
cba.updateNodeExecutionStatus(nodeId,status);
/*
cba.getNodesExecutionStatusMap(executionMap);
for(auto iter=executionMap.begin();iter != executionMap.end();iter++)
{
cout<<iter->first<<" "<<iter->second<<endl;
}
*/
return 0;
}
smcreate app
the application creates a shared memory map from the list of space separated fields node_id, status
the application is provided for testing purposes and getting comfortable with using shmview and shmupdate
CODE:
#include <cbaWFSharedMemory.h>
#include <cbaUtils.h>
#include<iostream>
using namespace std;
int main()
{
char* workflowHome=getenv("CBAWF_HOME");
if (0==workflowHome)
{
cout<<"CBAWF_HOME needs to be set in your environment"<<endl;
return 1;
}
string configFile;
char* workflowIni=getenv("CBAWFINI");
if (0 == workflowIni)
{
configFile=workflowHome;
configFile +="/etc/cbaWorkflow.ini";
}
else
{
configFile=workflowIni;
}
using namespace cbaWorkflowUtils;
utils util;
map<string,string> m_variables;
util.populateVariables(configFile,m_variables);
string segmentName=m_variables["shared_memory"];
string mapName=m_variables["map_name"];
cout<<"******************************************"<<endl;
cout<<"* NOTE this will recreate the memory map *"<<endl;
cout<<"* The applications using it will FAIL *"<<endl;
cout<<"******************************************"<<endl;
cout<<"Enter file containing map, or CTRL+C to exit"<<endl;
string mapFileName;
cin>>mapFileName;
if(mapFileName.length() ==0)
return 1;
map<string,int> executionMap;
ifstream input(mapFileName.c_str());
if(! input.is_open())
{
cout<<"Failed to open "<<mapFileName<<endl;
return 1;
}
string line;
while(getline(input,line))
{
vector<string>pair;
boost::split(pair,line,boost::is_any_of(" "));
if(pair.size()==1)
pair.push_back("0");
executionMap[pair[0]]=boost::lexical_cast<int>(pair[1]);
}
cbaWFSharedMemory cba(segmentName);
cba.createSegment();
cba.createNodesExecutionStatusMap(mapName,executionMap);
return 0;
}