Newer
Older
Import / applications / HighwayDash / ports / Framework / ResourceLoader.cpp
/*
  to test, compile with:
    g++ -D RESOURCE_TESTS --std=c++11 Log.cpp ResourceLoader.cpp -o test2 -pthread
*/
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <deque>
#include <map>
#include <atomic>
#include <algorithm>
#include "Log.h"
#include "HttpClient.h"
#include "ResourceLoader.h"


//#define USE_SERVER
//#define SERVER_IP           "127.0.0.1"
#define SERVER_IP           "192.168.0.18"
//#define SERVER_IP           "192.168.0.20"


// TODO : get rid of all the raw pointers, use more shared_ptrs everywhere

Resource::~Resource()
{
}


bool Resource::operator==(Resource&)
{
  return false;
}


bool Resource::IsLoaded()
{
    return state > Loading;
}


void Resource::Wait()
{
    Log(LL_Warn, "RES", "WAIT CALLED\nWARNING: THIS COULD CAUSE HITCHING");
    std::unique_lock<std::mutex> lck(mutex);
    if (state > Loading)
        cond.wait(lck);
}


struct ResourceLoader
{
    enum ResourcePool {
        FilePool,
        NetworkPool,
        OtherPool,
        MaxPool
    };

    std::deque<Resource*>             jobs; // start state, job created
    std::deque<Resource*>             pendingCallbackList; // job loaded, waiting for update to call callback
    std::vector<Resource*>            cache; // finally added to completed jobs
    std::mutex                        mutex;
    std::condition_variable           cond;
    std::atomic<bool>                 running;
    std::atomic<bool>                 pendingCallback;
    std::vector<std::thread>          threads;
    ResourcePool                      type;
    int                               threadCount;

    static const char* TypeName(ResourcePool t) {
        const char* typeNames[] = { "File", "Network", "Other", "Max" };
        return typeNames[t];
    }

    ResourceLoader(ResourcePool t, int a_threadCount = 1) : type(t), threadCount(a_threadCount) {
        Log(LL_Debug, "RES", "starting %s resource pool loader", TypeName(type));
        Start();
    }

    ~ResourceLoader() {
        Log(LL_Debug, "RES", "stopping %s resource pool loader", TypeName(type));
        Stop();
    }

    void Start() {
        pendingCallback = false;
        running = true;
        for (int i = 0; i < threadCount; i++) {
          Log(LL_Debug, "RES", "creating thread");
          threads.push_back(std::thread([this](){
            std::unique_lock<std::mutex> lck(mutex);
            while (running) {
                // Wait for jobs to be added to the queue
                //Log(LL_Debug, "RES", "waiting for job");
                cond.wait(lck);
                if (running) {
                    // Drain the queue
                    //Log(LL_Debug, "RES", "processing loading jobs");
                    while (jobs.size()) {
                        // Take from the queue a waiting job
                        Resource *r = jobs.front();
                        jobs.pop_front();
                        r->state = Resource::Loading;

                        // do loading
                        mutex.unlock();
                        //Log(LL_Debug, "RES", "loading jobs");
                        r->Load();
                        mutex.lock();

                        // mark it as loaded and notify anyone waiting for it
                        r->state = Resource::CallbackPending;
                        pendingCallbackList.push_back(r);
                        r->cond.notify_all();
                        pendingCallback = true;
                    }
                } else {
                    Log(LL_Debug, "RES", "stopping thread");
                }
            }
          }));
          Log(LL_Debug, "RES", "started thread");
        }
    }

    void Stop() {
        running = false;
        cond.notify_all();
        for (int i = 0; i < threadCount; i++)
          threads[i].join();
    }

    void Update() {
        bool expected = true;
        // if pendingCallback is true, here we atomically get it and reset it back to false and if this succeeds we continue
        if (pendingCallback) {
        if (pendingCallback.compare_exchange_strong(expected, false)) {
        if (expected) {
            std::unique_lock<std::mutex> lck(mutex);
            while (pendingCallbackList.size()) {
                Resource *r = pendingCallbackList.front();
                pendingCallbackList.pop_front();
                lck.unlock();
                r->callback(r);
                r->state = Resource::Complete;
                lck.lock();
            }
        }}}
    }

    void AddJobToQueueInternal(Resource* r) {
        std::unique_lock<std::mutex> lck(mutex);
        jobs.push_back(r);
        cache.push_back(r);
        cond.notify_one();
    }

    Resource* FindResourceJobInternal(Resource& res) {
        //Log(LL_Debug, "RES", "looking for job");
        std::unique_lock<std::mutex> lck(mutex);
        for (size_t i = 0; i < cache.size(); i++)
          if (*cache[i] == res)
            return cache[i];
        //auto cached = cache.find(filename);
        //if (cached != cache.end())
        //  return cached->second;
        return nullptr;
    }

    static void AddJobToQueue(Resource* r, ResourcePool pool) {
        if (pool == FilePool)
            s_resourceLoaderFilePool.AddJobToQueueInternal(r);
        else
            s_resourceLoaderNetworkPool.AddJobToQueueInternal(r);
    }

    static Resource* FindResourceJob(Resource& res, ResourcePool pool) {
        if (pool == FilePool)
            return s_resourceLoaderFilePool.FindResourceJobInternal(res);
        return s_resourceLoaderNetworkPool.FindResourceJobInternal(res);
    }

    static ResourceLoader s_resourceLoaderFilePool;
    static ResourceLoader s_resourceLoaderNetworkPool;
};


ResourceLoader ResourceLoader::s_resourceLoaderFilePool(FilePool);
ResourceLoader ResourceLoader::s_resourceLoaderNetworkPool(NetworkPool, 5);


// Call periodically to allow dispatch of pending callbacks back to the thread calling this
void ResourceLoaderUpdate(float /*dt*/)
{
    ResourceLoader::s_resourceLoaderFilePool.Update();
    ResourceLoader::s_resourceLoaderNetworkPool.Update();
}


struct TextFileResource : public Resource
{
    TextFileResource(const char* a_filename) {
        filename = a_filename;
        state = Pending;
    }

    bool operator==(TextFileResource& other) {
      return other.filename == filename;
    }

    void Load() {
        FILE* f = fopen(filename.c_str(), "rb");
        if (f) {
            fseek(f, 0, SEEK_END);
            size_t siz = ftell(f);
            fseek(f, 0, SEEK_SET);
            data.resize(siz);
            size_t r = fread(data.data(), siz, 1, f);
            Log(LL_Debug, "RES", "loaded file %s contents[%i]: %s", filename.c_str(), r, data.data());
            fclose(f);
        }
    }

    std::string             filename;
};


struct PngFileResource : public Resource
{
    PngFileResource(const char* a_filename) {
        filename = a_filename;
        state = Pending;
    }

    bool operator==(PngFileResource& other) {
      return other.filename == filename;
    }

    void Load() {

       // TODO: make load a PNG

        FILE* f = fopen(filename.c_str(), "rb");
        if (f) {
            fseek(f, 0, SEEK_END);
            size_t siz = ftell(f);
            fseek(f, 0, SEEK_SET);
            data.resize(siz);
            size_t r = fread(data.data(), siz, 1, f);
            Log(LL_Debug, "RES", "loaded file %s contents[%i]: %s", filename.c_str(), r, data.data());
            fclose(f);
        }
    }

    std::string             filename;
};


extern void LoadPlatformFile(const char* filename, std::vector<uint8_t>& data);


struct AssetResource : public Resource
{
    AssetResource(const char* a_filename) {
        filename = a_filename;
        state = Pending;
    }

    bool operator==(AssetResource& other) {
      return other.filename == filename;
    }

    void Load() {
#ifndef USE_SERVER
      LoadPlatformFile(filename.c_str(), data);
#else      
        std::string asset_url = SERVER_IP ":8005";
        HttpRequest  request = { "GET", "http://" + asset_url + "/asset/" + filename, {} };
        HttpResponse result;
        MakeHttpRequestWrapper(request, result, data);
        Log(LL_Debug, "RES", "status: %s", result.status.c_str());
        //Log(LL_Debug, "RES", "loaded file %s contents: %s", filename.c_str(), data.data());
#endif
    }

    std::string             filename;
};


/*
  Document d;
  if ((ParseResult)d.Parse((const char*)result.body.data())) {
    std::string access_token = d["token"].GetString();
    printf("token = %s\n", access_token.c_str());
  }
*/


// Async API
Resource* LoadFileAsync(const char* filename, bool onlineAsset, std::function<void(Resource*)> callback) {
    Resource* tmpRes = nullptr;
    ResourceLoader::ResourcePool pool = ResourceLoader::FilePool;
    if (onlineAsset) {
      tmpRes = new AssetResource(filename);
      pool = ResourceLoader::NetworkPool;
    } else {
      std::string fn(filename);
      std::string ext = fn.substr(fn.find_last_of(".") + 1);
      if (ext == "png")
          tmpRes = new PngFileResource(filename);
      else
          tmpRes = new TextFileResource(filename);
    }
    Resource* res = ResourceLoader::FindResourceJob(*tmpRes, pool);
    if (!res) {
        // Create a new job
        res = tmpRes;
        res->callback = callback;
        ResourceLoader::AddJobToQueue(res, pool);
    } else {
        delete tmpRes;
        Log(LL_Debug, "RES", "Found existing job, chaining callbacks");
        if (res->IsLoaded()) {
            // Found in the cache and loaded already
            callback(res);
        } else {
            // Found in the cache, but not loaded yet, so we need to chain the callbacks
            std::function<void(Resource*)> originalCallback = res->callback;
            std::function<void(Resource*)> latestCallback = callback;
            std::function<void(Resource*)> chainedCallback = [originalCallback, latestCallback](Resource* r) { originalCallback(r); latestCallback(r); };
            res->callback = chainedCallback;
        }
    }
    return res;
}


// Non-async API
Resource* LoadFile(const char* filename, bool onlineAsset) {
    Resource* res = LoadFileAsync(filename, onlineAsset, [](Resource* /*r*/){} );
    res->Wait();
    return res;
}


#ifdef RESOURCE_TESTS


void SystemLogger(const char* a_tag, const char* a_fmt, va_list a_args)
{
    printf("[%s] ", a_tag);
    vprintf(a_fmt, a_args);
    printf("\n");
}


int main(int argc, char* argv[])
{
    printf("entered main\n");
    //g_resourceLoader.Start();
    
    printf("adding job 1\n");
    Resource* r1 = LoadFileAsync("test1.txt");
    printf("adding job 2\n");
    Resource* r2 = LoadFileAsync("test2.txt");
    printf("adding job 1\n");
    Resource* r3 = LoadFileAsync("test1.txt");
    printf("adding job 3\n");
    Resource* r4 = LoadFileAsync("test3.txt");

    printf("waiting for job to finish\n");
    r1->Wait();

    printf("adding job 4\n");
    Resource* r5 = LoadFile("test4.txt");

    printf("adding job 2\n");
    Resource* r6 = LoadFile("test2.txt");

    printf("adding job 5\n");
    Resource* r7 = LoadFileAsync("test5.txt");

    printf("waiting for job to finish\n");
    r7->Wait();

    //g_resourceLoader.Stop();
    printf("leaving main\n");
}


#endif