overview

There may be situations when the normal execution of a node is not possible.

The scenarios which come to mind are :

failure to read the metadata with spark, but it still is possible to run a script in hive to populate the target table outside of the pipeline, thus not using the framework

inability of a network copy to create the copy, but there are ways to do it outside of the framework

Failure to create a tarbal from hdfs using the parameters in the framework, but there is a way to do it outside of the framework


For these scenarios when the node goal have been attained by an external tool, there is a need to update the node execution map in the shared memory

For this purpose there is a set of applications which access the shared memory directly and

  • list the failed nodes
  • update the node execution status to completed or skipped.


shmview  app

The application lists the contents of the nodes execution map

CODE:

#include <cbaWFSharedMemory.h>
#include <cbaUtils.h>
#include<iostream>
using namespace std;


int main()
{

char* workflowHome=getenv("CBAWF_HOME");

if (0==workflowHome)
{
cout<<"CBAWF_HOME needs to be set in your environment"<<endl;

return 1;
}

string configFile;
char* workflowIni=getenv("CBAWFINI");
if (0 == workflowIni)
{
    configFile=workflowHome;
    configFile +="/etc/cbaWorkflow.ini";
}
else
{
    configFile=workflowIni;

}
using namespace cbaWorkflowUtils;
utils util;
map<string,string> m_variables;
util.populateVariables(configFile,m_variables);
string segmentName=m_variables["shared_memory"];
string mapName=m_variables["map_name"];

map<string,int> executionMap;
cbaWFSharedMemory cba(segmentName);
cba.setMapName(mapName);
cba.getNodesExecutionStatusMap(executionMap);

for(auto iter=executionMap.begin();iter != executionMap.end();iter++)
{
    cout<<iter->first<<" "<<iter->second<<endl;

}


return 0;

}

shmupdate app

The application takes two parameters the node id and the status and updates the execution map:

shmupdate 13058 4


CODE:


#include <cbaWFSharedMemory.h>
#include <cbaUtils.h>
#include<iostream>
using namespace std;


int main(int argc,char**argv)
{

if(argc != 3)
{
cout<<"nodeID and status are needed"<<endl;
return -1;
}

int status =boost::lexical_cast<int>(argv[2]);
string nodeId=argv[1];

char* workflowHome=getenv("CBAWF_HOME");

if (0==workflowHome)
{
cout<<"CBAWF_HOME needs to be set in your environment"<<endl;

return 1;
}

string configFile;
char* workflowIni=getenv("CBAWFINI");
if (0 == workflowIni)
{
configFile=workflowHome;
configFile +="/etc/cbaWorkflow.ini";
}
else
{
configFile=workflowIni;

}
using namespace cbaWorkflowUtils;
utils util;
map<string,string> m_variables;
util.populateVariables(configFile,m_variables);
string segmentName=m_variables["shared_memory"];
string mapName=m_variables["map_name"];
map<string,int> executionMap;
cbaWFSharedMemory cba(segmentName);
cba.setMapName(mapName);

cout<<"setting node "<<nodeId<<" status to "<<status<<endl;

cba.updateNodeExecutionStatus(nodeId,status);
/*
cba.getNodesExecutionStatusMap(executionMap);

for(auto iter=executionMap.begin();iter != executionMap.end();iter++)
{
cout<<iter->first<<" "<<iter->second<<endl;

}

*/
return 0;

}

smcreate app

the application creates a shared memory map from the list of space separated fields node_id, status

the application is provided for testing purposes and getting comfortable with using shmview and shmupdate

CODE:

#include <cbaWFSharedMemory.h>
#include <cbaUtils.h>
#include<iostream>
using namespace std;


int main()
{

char* workflowHome=getenv("CBAWF_HOME");

if (0==workflowHome)
{
cout<<"CBAWF_HOME needs to be set in your environment"<<endl;

return 1;
}

string configFile;
char* workflowIni=getenv("CBAWFINI");
if (0 == workflowIni)
{
configFile=workflowHome;
configFile +="/etc/cbaWorkflow.ini";
}
else
{
configFile=workflowIni;

}
using namespace cbaWorkflowUtils;
utils util;
map<string,string> m_variables;
util.populateVariables(configFile,m_variables);
string segmentName=m_variables["shared_memory"];
string mapName=m_variables["map_name"];

cout<<"******************************************"<<endl;
cout<<"* NOTE this will recreate the memory map *"<<endl;
cout<<"* The applications using it will FAIL *"<<endl;
cout<<"******************************************"<<endl;

cout<<"Enter file containing map, or CTRL+C to exit"<<endl;
string mapFileName;
cin>>mapFileName;
if(mapFileName.length() ==0)
return 1;
map<string,int> executionMap;

ifstream input(mapFileName.c_str());
if(! input.is_open())
{
cout<<"Failed to open "<<mapFileName<<endl;
return 1;
}

string line;
while(getline(input,line))
{
vector<string>pair;
boost::split(pair,line,boost::is_any_of(" "));
if(pair.size()==1)
pair.push_back("0");
executionMap[pair[0]]=boost::lexical_cast<int>(pair[1]);
}

cbaWFSharedMemory cba(segmentName);
cba.createSegment();
cba.createNodesExecutionStatusMap(mapName,executionMap);

return 0;

}