Newer
Older
Import / applications / HighwayDash / ports / Framework / ResourceLoader.cpp
/*************************** MetaData ****************************
#  BlockyFroggy
#  Copyright © 2017 John Ryland.
#  All rights reserved.

# See: meta-tool.py
# to test, compile with:
UNIT_TEST_CMD="g++ -D UNIT_TESTS --std=c++14 -I. -pthread -o unittest ResourceLoader.cpp Utilities.cpp Common.cpp"
CODE_GEN_CMD="g++ -D CODE_GEN --std=c++14 -I. -pthread -o codegen ResourceLoader.cpp Utilities.cpp Common.cpp"
**************************** MetaData ***************************/


#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <deque>
#include <map>
#include <atomic>
#include <algorithm>
#include "Log.h"
#include "HttpClient.h"
#include "ResourceLoader.h"
#include "Utilities.h"


DECLARE_LOG_CONTEXT(RES)


#ifdef CODE_GEN
void code_gen() { Utilities::xxd("gen.h", "Font6.png", "Font6"); }
#else
#include "gen.h"
#endif


//#define USE_SERVER
//#define SERVER_IP           "127.0.0.1"
#define SERVER_IP           "192.168.0.18"
//#define SERVER_IP           "192.168.0.20"


//! @todo get rid of all the raw pointers, use more shared_ptrs everywhere

Resource::~Resource()
{
}


bool Resource::operator==(Resource&)
{
  return false;
}


bool Resource::isLoaded()
{
    return state > Loading;
}


void Resource::wait()
{
    Log(LL_Warn, "WAIT CALLED\nWARNING: THIS COULD CAUSE HITCHING");
    std::unique_lock<std::mutex> lck(mutex);
    if (state > Loading)
        cond.wait(lck);
}


struct ResourceLoader
{
    enum ResourcePool {
        FilePool,
        NetworkPool,
        OtherPool,
        MaxPool
    };

    std::deque<Resource*>             jobs; // start state, job created
    std::deque<Resource*>             pendingCallbackList; // job loaded, waiting for update to call callback
    std::vector<Resource*>            cache; // finally added to completed jobs
    std::mutex                        mutex;
    std::condition_variable           cond;
    std::atomic<bool>                 running;
    std::atomic<bool>                 pendingCallback;
    std::vector<std::thread>          threads;
    ResourcePool                      type;
    int                               threadCount;

    static const char* TypeName(ResourcePool t) {
        const char* typeNames[] = { "File", "Network", "Other", "Max" };
        return typeNames[t];
    }

    ResourceLoader(ResourcePool t, int a_threadCount = 1) : type(t), threadCount(a_threadCount) {
        Log(LL_Debug, "starting %s resource pool loader", TypeName(type));
        start();
    }

    ~ResourceLoader() {
        Log(LL_Debug, "stopping %s resource pool loader", TypeName(type));
        stop();
    }

    void start() {
        pendingCallback = false;
        running = true;
        for (int i = 0; i < threadCount; i++) {
          Log(LL_Debug, "creating thread");
          threads.push_back(std::thread([this](){
            AllocationTracking::g_enableAllocTracing = true;
            std::unique_lock<std::mutex> lck(mutex);
            while (running) {
                Log(LL_Debug, "thread waiting for work");
                // Wait for jobs to be added to the queue
                //Log(LL_Debug, "waiting for job");
                cond.wait(lck);
                if (running) {
                    // Drain the queue
                    //Log(LL_Debug, "processing loading jobs");
                    while (jobs.size()) {
                        // Take from the queue a waiting job
                        Resource *r = jobs.front();
                        jobs.pop_front();
                        r->state = Resource::Loading;

                        // do loading
                        mutex.unlock();
                        //Log(LL_Debug, "loading jobs");
                        r->load();
                        mutex.lock();

                        // mark it as loaded and notify anyone waiting for it
                        r->state = Resource::CallbackPending;
                        pendingCallbackList.push_back(r);
                        r->cond.notify_all();
                        pendingCallback = true;
                    }
                } else {
                    Log(LL_Debug, "stopping thread");
                }
            }
          }));
          Log(LL_Debug, "started thread");
        }
    }

    void stop() {
        running = false;
        cond.notify_all();
        for (int i = 0; i < threadCount; i++)
          threads[i].join();
    }

    void update() {
        bool expected = true;
        // if pendingCallback is true, here we atomically get it and reset it back to false and if this succeeds we continue
        if (pendingCallback) {
        if (pendingCallback.compare_exchange_strong(expected, false)) {
        if (expected) {
            std::unique_lock<std::mutex> lck(mutex);
            while (pendingCallbackList.size()) {
                Resource *r = pendingCallbackList.front();
                pendingCallbackList.pop_front();
                lck.unlock();
                r->callback(r);
                r->state = Resource::Complete;
                lck.lock();
            }
        }}}
    }

    void addJobToQueueInternal(Resource* r) {
        {
          std::unique_lock<std::mutex> lck(mutex);
          jobs.push_back(r);
          cache.push_back(r);
        }
        cond.notify_one();
    }

    Resource* findResourceJobInternal(Resource& res) {
        //Log(LL_Debug, "looking for job");
        std::unique_lock<std::mutex> lck(mutex);
        for (size_t i = 0; i < cache.size(); i++)
          if (*cache[i] == res)
            return cache[i];
        //auto cached = cache.find(filename);
        //if (cached != cache.end())
        //  return cached->second;
        return nullptr;
    }
};


class ResourceLoaders
{
public:
  static void init() {
    if (!s_resourceLoaderFilePool)
      s_resourceLoaderFilePool = new ResourceLoader(ResourceLoader::FilePool);
    if (!s_resourceLoaderNetworkPool)
      s_resourceLoaderNetworkPool = new ResourceLoader(ResourceLoader::NetworkPool, 5);
  }

  static void addJobToQueue(Resource* r, ResourceLoader::ResourcePool pool) {
    init();
    if (pool == ResourceLoader::FilePool)
      s_resourceLoaderFilePool->addJobToQueueInternal(r);
    else
      s_resourceLoaderNetworkPool->addJobToQueueInternal(r);
  }

  static Resource* FindResourceJob(Resource& res, ResourceLoader::ResourcePool pool) {
    init();
    if (pool == ResourceLoader::FilePool)
      return s_resourceLoaderFilePool->findResourceJobInternal(res);
    return s_resourceLoaderNetworkPool->findResourceJobInternal(res);
  }

  static void update() {
    init();
    s_resourceLoaderFilePool->update();
    s_resourceLoaderNetworkPool->update();
  }

  static ResourceLoader *s_resourceLoaderFilePool;
  static ResourceLoader *s_resourceLoaderNetworkPool;
};

ResourceLoader *ResourceLoaders::s_resourceLoaderFilePool = nullptr;
ResourceLoader *ResourceLoaders::s_resourceLoaderNetworkPool = nullptr;


// Call periodically to allow dispatch of pending callbacks back to the thread calling this
void resourceLoaderUpdate(float /*dt*/)
{
    ResourceLoaders::update();
}


struct TextFileResource : public Resource
{
    TextFileResource(const char* a_filename) {
        filename = a_filename;
        state = Pending;
    }

    bool operator==(TextFileResource& other) {
        return other.filename == filename;
    }

    void load() {
	// See: loadPlatformFile
/*
        std::ifstream f(filename.c_str(), std::ifstream::ate | std::ifstream::binary);
        if (!f.fail()) {
            data.resize(f.tellg());
            f.seekg(0, std::ios::beg);
            data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
            Log(LL_Debug, "loaded file %s contents[%zu]: %s", filename.c_str(), data.size(), data.data());
        }
*/
        FILE* f = fopen(filename.c_str(), "rb");
        if (f) {
            fseek(f, 0, SEEK_END);
            size_t siz = ftell(f);
            fseek(f, 0, SEEK_SET);
            data.resize(siz);
            size_t r = fread(data.data(), siz, 1, f);
            Log(LL_Debug, "loaded file %s contents[%zu]: %s", filename.c_str(), r, data.data());
            fclose(f);
        }
    }

    std::string             filename;
};


struct PngFileResource : public Resource
{
    PngFileResource(const char* a_filename) {
        filename = a_filename;
        state = Pending;
    }

    bool operator==(PngFileResource& other) {
      return other.filename == filename;
    }

    void load() {

       //! @todo make load a PNG

        FILE* f = fopen(filename.c_str(), "rb");
        if (f) {
            fseek(f, 0, SEEK_END);
            size_t siz = ftell(f);
            fseek(f, 0, SEEK_SET);
            data.resize(siz);
            size_t r = fread(data.data(), siz, 1, f);
            Log(LL_Debug, "loaded file %s contents[%zu]: %s", filename.c_str(), r, data.data());
            fclose(f);
        }
    }

    std::string             filename;
};


extern void loadPlatformFile(const char* filename, std::vector<uint8_t>& data);


struct AssetResource : public Resource
{
    AssetResource(const char* a_filename) {
        filename = a_filename;
        state = Pending;
    }

    bool operator==(AssetResource& other) {
      return other.filename == filename;
    }

    void load() {
#ifndef USE_SERVER
      loadPlatformFile(filename.c_str(), data);
#else      
      std::string  asset_url = SERVER_IP ":8005";
      HttpRequest  request = { "GET", "http://" + asset_url + "/asset/" + filename, {} };
      HttpResponse result;
      makeHttpRequestWrapper(request, result, data);
      Log(LL_Debug, "status: %s", result.status.c_str());
      //Log(LL_Debug, "loaded file %s contents: %s", filename.c_str(), data.data());
#endif
    }

    std::string             filename;
};


/*
  Document d;
  if ((ParseResult)d.Parse((const char*)result.body.data())) {
    std::string access_token = d["token"].GetString();
    Log(LL_Debug, "token = %s", access_token.c_str());
  }
*/


// Async API
Resource* loadFileAsync(const char* filename, bool onlineAsset, std::function<void(Resource*)> callback)
{
    Resource* tmpRes = nullptr;
    ResourceLoader::ResourcePool pool = ResourceLoader::FilePool;
    if (onlineAsset) {
      tmpRes = new AssetResource(filename);
      pool = ResourceLoader::NetworkPool;
    } else {
      std::string fn(filename);
      std::string ext = fn.substr(fn.find_last_of(".") + 1);
      if (ext == "png")
          tmpRes = new PngFileResource(filename);
      else
          tmpRes = new TextFileResource(filename);
    }
    Resource* res = ResourceLoaders::FindResourceJob(*tmpRes, pool);
    if (!res) {
        // Create a new job
        res = tmpRes;
        res->callback = callback;
        ResourceLoaders::addJobToQueue(res, pool);
    } else {
        delete tmpRes;
        Log(LL_Debug, "Found existing job, chaining callbacks");
        if (res->isLoaded()) {
            // Found in the cache and loaded already
            callback(res);
        } else {
            // Found in the cache, but not loaded yet, so we need to chain the callbacks
            std::function<void(Resource*)> originalCallback = res->callback;
            // Call the existing callback followed by the callback we have just been passed
            res->callback = [originalCallback, callback](Resource* r) { originalCallback(r); callback(r); };;
        }
    }
    return res;
}


// Non-async API
Resource* loadFile(const char* filename, bool onlineAsset)
{
    Resource* res = loadFileAsync(filename, onlineAsset, [](Resource* /*r*/){} );
    res->wait();
    return res;
}




#ifdef UNIT_TESTS

void unit_test()
{
    auto callback = [](Resource*){};

    Log(LL_Debug, "entered main");
    //g_resourceLoader.Start();
    
    Log(LL_Debug, "adding job 1");
    Resource* r1 = loadFileAsync("test1.txt", false, callback);
    Log(LL_Debug, "adding job 2");
    Resource* r2 = loadFileAsync("test2.txt", false, callback);
    Log(LL_Debug, "adding job 1");
    Resource* r3 = loadFileAsync("test1.txt", false, callback);
    Log(LL_Debug, "adding job 3");
    Resource* r4 = loadFileAsync("test3.txt", false, callback);

    Log(LL_Debug, "waiting for job to finish");
    r1->wait();

    Log(LL_Debug, "adding job 4");
    Resource* r5 = loadFile("test4.txt", false);

    Log(LL_Debug, "adding job 2");
    Resource* r6 = loadFile("test2.txt", false);

    Log(LL_Debug, "adding job 5");
    Resource* r7 = loadFileAsync("test5.txt", false, callback);

    Log(LL_Debug, "waiting for job to finish");
    r7->wait();

    //g_resourceLoader.Stop();
    Log(LL_Debug, "leaving main");
}

#endif


#if defined(CODE_GEN) || defined(UNIT_TESTS)

#include <fstream>


void loadPlatformFile(const char* filename, std::vector<uint8_t>& data)
{
#if 0
  std::ifstream file(filename, std::ifstream::ate | std::ifstream::binary);
#else
  data.clear();
  std::ifstream file(filename, std::ios::binary);
#endif
  if (!file.fail()) {
#if 0
    data.resize(f.tellg());
    file.seekg(0, std::ios::beg);
    data.assign((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
#else
    file.seekg(0, std::ios::end);
    data.resize(file.tellg());
    file.seekg(0, std::ios::beg);
    file.read((char*)data.data(), data.size());
#endif
    LogTag(LL_Error, "LDR", "lens: %lu", data.size());
  } else {
    LogTag(LL_Error, "LDR", "Couldn't load %s file", filename);
  }
}


void filterLogger(enum LogLevel a_level, const char* a_tag,
                  const char* a_file, const char* a_function, int a_line, const char* a_fmt, ...)
{
  if (a_level == LL_Trace)
    return;
  va_list args;
  va_start(args, a_fmt);
  static std::mutex logMutex;
  std::lock_guard<std::mutex> lock(logMutex);
  fprintf(stderr, "[%i] [%s] %s (%i): ", pthread_mach_thread_np(pthread_self()), a_tag, a_file, a_line);
  fprintf(stderr, a_fmt, args);
  fprintf(stderr, "\n");
  va_end(args);
}


int main(int argc, char* argv[])
{
#ifdef CODE_GEN
  code_gen();
#endif
#ifdef UNIT_TESTS
  unit_test();
#endif
}

#endif