class PipelineManager : public Thread {
public:
PipelineManager();
void addSource( Format frameType );
void addDestination( Format frameType );
void clearTargets();
void connectTogether(Module *m1, Module *m2, const Frame &f);
void makeConnections(Module *start);
void start( Frame *frame ) { Thread::start( (void *)frame ); }
void execute( void *p );
void unconnectedRoute( Module *m, const Frame &f );
private:
std::list<Module*> sourceModules;
std::list<Module*> destinationModules;
std::list<Format> source;
std::list<Format> destination;
};
PipelineManager *pipelineMgr = 0;
PipelineManager::PipelineManager()
{
}
/*
void PipelineManager::newModule( Module *m )
{
printf("adding module: %s\n", m->name() );
allModules.push_front( m );
// update source modules list
for ( list<Format>::iterator it = source.begin(); it != source.end(); ++it ) {
if ( (*it) == m->inputFormat() ) {
sourceModules.push_front( m );
// Just add it once
break;
}
}
// update destination modules list
for ( list<Format>::iterator it = destination.begin(); it != destination.end(); ++it ) {
if ( (*it) == m->outputFormat() ) {
destinationModules.push_front( m );
// Just add it once
break;
}
}
}
*/
void PipelineManager::addSource( Format frameType )
{
// update source modules list
Module *m = moduleMapper()->findModuleWithInputFormat( frameType );
if ( m ) {
printf("adding source: %s\n", (const char *)frameType );
source.push_front( frameType );
sourceModules.push_front( m );
} else {
printf("No source for %s found!!!\n", (const char *)frameType );
}
}
void PipelineManager::addDestination( Format frameType )
{
Module *m = moduleMapper()->findModuleWithOutputFormat( frameType );
if ( m ) {
printf("adding destination: %s\n", (const char *)frameType );
destination.push_front( frameType );
destinationModules.push_front( m );
} else {
printf("No destination for %s found!!!\n", (const char *)frameType );
}
}
void PipelineManager::clearTargets()
{
sourceModules.clear();
destinationModules.clear();
source.clear();
destination.clear();
}
void PipelineManager::connectTogether( Module *m1, Module *m2, const Frame &f )
{
/*
//printf(" [%s] -> [%s] %s", m1->outputFormat(), m2->inputFormat(), m2->name() );
printf(" -> %s", m2->name() );
staticDispatch( m2, Init, 0 );
if ( m2->isBlocking() || m1->isBlocking() ) {
ThreadBoundryModule *threadModule = new ThreadBoundryModule( 32, m2->inputFormat() );
threadModule->init();
m1->connectTo( threadModule, f );
threadModule->connectTo( m2, f );
} else {
m1->connectTo( m2, f );
}
*/
}
/*
Connects together module with a module that can process the frame
and then gets the module to process this first frame
*/
void PipelineManager::unconnectedRoute( Module *m, const Frame &f )
{
Module *m2 = moduleMapper()->findModuleWithInputFormat( f.id() );
if ( m2 ) {
//connectTogether( m, m2, f );
printf("Connecting together: %s -> %s\n", m->name(), m2->name() );
staticDispatch( m2, Init, 0 );
m->connectTo( m2, f );
m2->connectedFrom( m, f );
staticDispatch( m2, Process, &f );
} else {
printf("Didn't find route for %s\n", m->name());
}
}
void PipelineManager::makeConnections( Module *start )
{
/*
printf("making connections:\n");
Frame frame( "UNKNOWN", 0 );
Module *currentModule = start;
Format dstFmt = destination.front();
dispatch( currentModule, Init, 0 );
printf(" %s (pid: %i)", currentModule->name(), getpid() );
while ( currentModule->outputFormat() != dstFmt ) {
Module *m = moduleMapper()->findModuleWithInputFormat( currentModule->outputFormat() );
if ( m ) {
connectTogether( currentModule, m, frame );
currentModule = m;
} else {
break;
}
}
printf("\n");
*/
}
void PipelineManager::execute( void *d )
{
printf("starting...\n");
for ( list<Module *>::iterator it = sourceModules.begin(); it != sourceModules.end(); ++it ) {
//makeConnections( (*it) );
staticDispatch( (*it), Init, 0 );
staticDispatch( (*it), Process, d );
}
}