/*
to test, compile with:
g++ -D RESOURCE_TESTS --std=c++11 Log.cpp ResourceLoader.cpp -o test2 -pthread
*/
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <deque>
#include <map>
#include <atomic>
#include <algorithm>
#include "Log.h"
#include "HttpClient.h"
#include "ResourceLoader.h"
//#define USE_SERVER
//#define SERVER_IP "127.0.0.1"
#define SERVER_IP "192.168.0.18"
//#define SERVER_IP "192.168.0.20"
// TODO : get rid of all the raw pointers, use more shared_ptrs everywhere
Resource::~Resource()
{
}
bool Resource::operator==(Resource&)
{
return false;
}
bool Resource::IsLoaded()
{
return state > Loading;
}
void Resource::Wait()
{
Log(LL_Warn, "RES", "WAIT CALLED\nWARNING: THIS COULD CAUSE HITCHING");
std::unique_lock<std::mutex> lck(mutex);
if (state > Loading)
cond.wait(lck);
}
struct ResourceLoader
{
enum ResourcePool {
FilePool,
NetworkPool,
OtherPool,
MaxPool
};
std::deque<Resource*> jobs; // start state, job created
std::deque<Resource*> pendingCallbackList; // job loaded, waiting for update to call callback
std::vector<Resource*> cache; // finally added to completed jobs
std::mutex mutex;
std::condition_variable cond;
std::atomic<bool> running;
std::atomic<bool> pendingCallback;
std::vector<std::thread> threads;
ResourcePool type;
int threadCount;
static const char* TypeName(ResourcePool t) {
const char* typeNames[] = { "File", "Network", "Other", "Max" };
return typeNames[t];
}
ResourceLoader(ResourcePool t, int a_threadCount = 1) : type(t), threadCount(a_threadCount) {
Log(LL_Debug, "RES", "starting %s resource pool loader", TypeName(type));
Start();
}
~ResourceLoader() {
Log(LL_Debug, "RES", "stopping %s resource pool loader", TypeName(type));
Stop();
}
void Start() {
pendingCallback = false;
running = true;
for (int i = 0; i < threadCount; i++) {
Log(LL_Debug, "RES", "creating thread");
threads.push_back(std::thread([this](){
std::unique_lock<std::mutex> lck(mutex);
while (running) {
// Wait for jobs to be added to the queue
//Log(LL_Debug, "RES", "waiting for job");
cond.wait(lck);
if (running) {
// Drain the queue
//Log(LL_Debug, "RES", "processing loading jobs");
while (jobs.size()) {
// Take from the queue a waiting job
Resource *r = jobs.front();
jobs.pop_front();
r->state = Resource::Loading;
// do loading
mutex.unlock();
//Log(LL_Debug, "RES", "loading jobs");
r->Load();
mutex.lock();
// mark it as loaded and notify anyone waiting for it
r->state = Resource::CallbackPending;
pendingCallbackList.push_back(r);
r->cond.notify_all();
pendingCallback = true;
}
} else {
Log(LL_Debug, "RES", "stopping thread");
}
}
}));
Log(LL_Debug, "RES", "started thread");
}
}
void Stop() {
running = false;
cond.notify_all();
for (int i = 0; i < threadCount; i++)
threads[i].join();
}
void Update() {
bool expected = true;
// if pendingCallback is true, here we atomically get it and reset it back to false and if this succeeds we continue
if (pendingCallback) {
if (pendingCallback.compare_exchange_strong(expected, false)) {
if (expected) {
std::unique_lock<std::mutex> lck(mutex);
while (pendingCallbackList.size()) {
Resource *r = pendingCallbackList.front();
pendingCallbackList.pop_front();
lck.unlock();
r->callback(r);
r->state = Resource::Complete;
lck.lock();
}
}}}
}
void AddJobToQueueInternal(Resource* r) {
std::unique_lock<std::mutex> lck(mutex);
jobs.push_back(r);
cache.push_back(r);
cond.notify_one();
}
Resource* FindResourceJobInternal(Resource& res) {
//Log(LL_Debug, "RES", "looking for job");
std::unique_lock<std::mutex> lck(mutex);
for (size_t i = 0; i < cache.size(); i++)
if (*cache[i] == res)
return cache[i];
//auto cached = cache.find(filename);
//if (cached != cache.end())
// return cached->second;
return nullptr;
}
static void AddJobToQueue(Resource* r, ResourcePool pool) {
if (pool == FilePool)
s_resourceLoaderFilePool.AddJobToQueueInternal(r);
else
s_resourceLoaderNetworkPool.AddJobToQueueInternal(r);
}
static Resource* FindResourceJob(Resource& res, ResourcePool pool) {
if (pool == FilePool)
return s_resourceLoaderFilePool.FindResourceJobInternal(res);
return s_resourceLoaderNetworkPool.FindResourceJobInternal(res);
}
static ResourceLoader s_resourceLoaderFilePool;
static ResourceLoader s_resourceLoaderNetworkPool;
};
ResourceLoader ResourceLoader::s_resourceLoaderFilePool(FilePool);
ResourceLoader ResourceLoader::s_resourceLoaderNetworkPool(NetworkPool, 5);
// Call periodically to allow dispatch of pending callbacks back to the thread calling this
void ResourceLoaderUpdate(float /*dt*/)
{
ResourceLoader::s_resourceLoaderFilePool.Update();
ResourceLoader::s_resourceLoaderNetworkPool.Update();
}
struct TextFileResource : public Resource
{
TextFileResource(const char* a_filename) {
filename = a_filename;
state = Pending;
}
bool operator==(TextFileResource& other) {
return other.filename == filename;
}
void Load() {
FILE* f = fopen(filename.c_str(), "rb");
if (f) {
fseek(f, 0, SEEK_END);
size_t siz = ftell(f);
fseek(f, 0, SEEK_SET);
data.resize(siz);
size_t r = fread(data.data(), siz, 1, f);
Log(LL_Debug, "RES", "loaded file %s contents[%i]: %s", filename.c_str(), r, data.data());
fclose(f);
}
}
std::string filename;
};
struct PngFileResource : public Resource
{
PngFileResource(const char* a_filename) {
filename = a_filename;
state = Pending;
}
bool operator==(PngFileResource& other) {
return other.filename == filename;
}
void Load() {
// TODO: make load a PNG
FILE* f = fopen(filename.c_str(), "rb");
if (f) {
fseek(f, 0, SEEK_END);
size_t siz = ftell(f);
fseek(f, 0, SEEK_SET);
data.resize(siz);
size_t r = fread(data.data(), siz, 1, f);
Log(LL_Debug, "RES", "loaded file %s contents[%i]: %s", filename.c_str(), r, data.data());
fclose(f);
}
}
std::string filename;
};
extern void LoadPlatformFile(const char* filename, std::vector<uint8_t>& data);
struct AssetResource : public Resource
{
AssetResource(const char* a_filename) {
filename = a_filename;
state = Pending;
}
bool operator==(AssetResource& other) {
return other.filename == filename;
}
void Load() {
#ifndef USE_SERVER
LoadPlatformFile(filename.c_str(), data);
#else
std::string asset_url = SERVER_IP ":8005";
HttpRequest request = { "GET", "http://" + asset_url + "/asset/" + filename, {} };
HttpResponse result;
MakeHttpRequestWrapper(request, result, data);
Log(LL_Debug, "RES", "status: %s", result.status.c_str());
//Log(LL_Debug, "RES", "loaded file %s contents: %s", filename.c_str(), data.data());
#endif
}
std::string filename;
};
/*
Document d;
if ((ParseResult)d.Parse((const char*)result.body.data())) {
std::string access_token = d["token"].GetString();
printf("token = %s\n", access_token.c_str());
}
*/
// Async API
Resource* LoadFileAsync(const char* filename, bool onlineAsset, std::function<void(Resource*)> callback) {
Resource* tmpRes = nullptr;
ResourceLoader::ResourcePool pool = ResourceLoader::FilePool;
if (onlineAsset) {
tmpRes = new AssetResource(filename);
pool = ResourceLoader::NetworkPool;
} else {
std::string fn(filename);
std::string ext = fn.substr(fn.find_last_of(".") + 1);
if (ext == "png")
tmpRes = new PngFileResource(filename);
else
tmpRes = new TextFileResource(filename);
}
Resource* res = ResourceLoader::FindResourceJob(*tmpRes, pool);
if (!res) {
// Create a new job
res = tmpRes;
res->callback = callback;
ResourceLoader::AddJobToQueue(res, pool);
} else {
delete tmpRes;
Log(LL_Debug, "RES", "Found existing job, chaining callbacks");
if (res->IsLoaded()) {
// Found in the cache and loaded already
callback(res);
} else {
// Found in the cache, but not loaded yet, so we need to chain the callbacks
std::function<void(Resource*)> originalCallback = res->callback;
std::function<void(Resource*)> latestCallback = callback;
std::function<void(Resource*)> chainedCallback = [originalCallback, latestCallback](Resource* r) { originalCallback(r); latestCallback(r); };
res->callback = chainedCallback;
}
}
return res;
}
// Non-async API
Resource* LoadFile(const char* filename, bool onlineAsset) {
Resource* res = LoadFileAsync(filename, onlineAsset, [](Resource* /*r*/){} );
res->Wait();
return res;
}
#ifdef RESOURCE_TESTS
void SystemLogger(const char* a_tag, const char* a_fmt, va_list a_args)
{
printf("[%s] ", a_tag);
vprintf(a_fmt, a_args);
printf("\n");
}
int main(int argc, char* argv[])
{
printf("entered main\n");
//g_resourceLoader.Start();
printf("adding job 1\n");
Resource* r1 = LoadFileAsync("test1.txt");
printf("adding job 2\n");
Resource* r2 = LoadFileAsync("test2.txt");
printf("adding job 1\n");
Resource* r3 = LoadFileAsync("test1.txt");
printf("adding job 3\n");
Resource* r4 = LoadFileAsync("test3.txt");
printf("waiting for job to finish\n");
r1->Wait();
printf("adding job 4\n");
Resource* r5 = LoadFile("test4.txt");
printf("adding job 2\n");
Resource* r6 = LoadFile("test2.txt");
printf("adding job 5\n");
Resource* r7 = LoadFileAsync("test5.txt");
printf("waiting for job to finish\n");
r7->Wait();
//g_resourceLoader.Stop();
printf("leaving main\n");
}
#endif