/*************************** MetaData ****************************
# BlockyFroggy
# Copyright © 2017 John Ryland.
# All rights reserved.
# See: meta-tool.py
# to test, compile with:
UNIT_TEST_CMD="g++ -D UNIT_TESTS --std=c++14 -I. -pthread -o unittest ResourceLoader.cpp Utilities.cpp Common.cpp"
CODE_GEN_CMD="g++ -D CODE_GEN --std=c++14 -I. -pthread -o codegen ResourceLoader.cpp Utilities.cpp Common.cpp"
**************************** MetaData ***************************/
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <deque>
#include <map>
#include <atomic>
#include <algorithm>
#include "Log.h"
#include "HttpClient.h"
#include "ResourceLoader.h"
#include "Utilities.h"
DECLARE_LOG_CONTEXT(RES)
#ifdef CODE_GEN
void code_gen() { Utilities::xxd("gen.h", "Font6.png", "Font6"); }
#else
#include "gen.h"
#endif
//#define USE_SERVER
//#define SERVER_IP "127.0.0.1"
#define SERVER_IP "192.168.0.18"
//#define SERVER_IP "192.168.0.20"
//! @todo get rid of all the raw pointers, use more shared_ptrs everywhere
Resource::~Resource()
{
}
bool Resource::operator==(Resource&)
{
return false;
}
bool Resource::isLoaded()
{
return state > Loading;
}
void Resource::wait()
{
Log(LL_Warn, "WAIT CALLED\nWARNING: THIS COULD CAUSE HITCHING");
std::unique_lock<std::mutex> lck(mutex);
if (state > Loading)
cond.wait(lck);
}
struct ResourceLoader
{
enum ResourcePool {
FilePool,
NetworkPool,
OtherPool,
MaxPool
};
std::deque<Resource*> jobs; // start state, job created
std::deque<Resource*> pendingCallbackList; // job loaded, waiting for update to call callback
std::vector<Resource*> cache; // finally added to completed jobs
std::mutex mutex;
std::condition_variable cond;
std::atomic<bool> running;
std::atomic<bool> pendingCallback;
std::vector<std::thread> threads;
ResourcePool type;
int threadCount;
static const char* TypeName(ResourcePool t) {
const char* typeNames[] = { "File", "Network", "Other", "Max" };
return typeNames[t];
}
ResourceLoader(ResourcePool t, int a_threadCount = 1) : type(t), threadCount(a_threadCount) {
Log(LL_Debug, "starting %s resource pool loader", TypeName(type));
start();
}
~ResourceLoader() {
Log(LL_Debug, "stopping %s resource pool loader", TypeName(type));
stop();
}
void start() {
pendingCallback = false;
running = true;
for (int i = 0; i < threadCount; i++) {
Log(LL_Debug, "creating thread");
threads.push_back(std::thread([this](){
AllocationTracking::g_enableAllocTracing = true;
std::unique_lock<std::mutex> lck(mutex);
while (running) {
Log(LL_Debug, "thread waiting for work");
// Wait for jobs to be added to the queue
//Log(LL_Debug, "waiting for job");
cond.wait(lck);
if (running) {
// Drain the queue
//Log(LL_Debug, "processing loading jobs");
while (jobs.size()) {
// Take from the queue a waiting job
Resource *r = jobs.front();
jobs.pop_front();
r->state = Resource::Loading;
// do loading
mutex.unlock();
//Log(LL_Debug, "loading jobs");
r->load();
mutex.lock();
// mark it as loaded and notify anyone waiting for it
r->state = Resource::CallbackPending;
pendingCallbackList.push_back(r);
r->cond.notify_all();
pendingCallback = true;
}
} else {
Log(LL_Debug, "stopping thread");
}
}
}));
Log(LL_Debug, "started thread");
}
}
void stop() {
running = false;
cond.notify_all();
for (int i = 0; i < threadCount; i++)
threads[i].join();
}
void update() {
bool expected = true;
// if pendingCallback is true, here we atomically get it and reset it back to false and if this succeeds we continue
if (pendingCallback) {
if (pendingCallback.compare_exchange_strong(expected, false)) {
if (expected) {
std::unique_lock<std::mutex> lck(mutex);
while (pendingCallbackList.size()) {
Resource *r = pendingCallbackList.front();
pendingCallbackList.pop_front();
lck.unlock();
r->callback(r);
r->state = Resource::Complete;
lck.lock();
}
}}}
}
void addJobToQueueInternal(Resource* r) {
{
std::unique_lock<std::mutex> lck(mutex);
jobs.push_back(r);
cache.push_back(r);
}
cond.notify_one();
}
Resource* findResourceJobInternal(Resource& res) {
//Log(LL_Debug, "looking for job");
std::unique_lock<std::mutex> lck(mutex);
for (size_t i = 0; i < cache.size(); i++)
if (*cache[i] == res)
return cache[i];
//auto cached = cache.find(filename);
//if (cached != cache.end())
// return cached->second;
return nullptr;
}
};
class ResourceLoaders
{
public:
static void init() {
if (!s_resourceLoaderFilePool)
s_resourceLoaderFilePool = new ResourceLoader(ResourceLoader::FilePool);
if (!s_resourceLoaderNetworkPool)
s_resourceLoaderNetworkPool = new ResourceLoader(ResourceLoader::NetworkPool, 5);
}
static void addJobToQueue(Resource* r, ResourceLoader::ResourcePool pool) {
init();
if (pool == ResourceLoader::FilePool)
s_resourceLoaderFilePool->addJobToQueueInternal(r);
else
s_resourceLoaderNetworkPool->addJobToQueueInternal(r);
}
static Resource* FindResourceJob(Resource& res, ResourceLoader::ResourcePool pool) {
init();
if (pool == ResourceLoader::FilePool)
return s_resourceLoaderFilePool->findResourceJobInternal(res);
return s_resourceLoaderNetworkPool->findResourceJobInternal(res);
}
static void update() {
init();
s_resourceLoaderFilePool->update();
s_resourceLoaderNetworkPool->update();
}
static ResourceLoader *s_resourceLoaderFilePool;
static ResourceLoader *s_resourceLoaderNetworkPool;
};
ResourceLoader *ResourceLoaders::s_resourceLoaderFilePool = nullptr;
ResourceLoader *ResourceLoaders::s_resourceLoaderNetworkPool = nullptr;
// Call periodically to allow dispatch of pending callbacks back to the thread calling this
void resourceLoaderUpdate(float /*dt*/)
{
ResourceLoaders::update();
}
struct TextFileResource : public Resource
{
TextFileResource(const char* a_filename) {
filename = a_filename;
state = Pending;
}
bool operator==(TextFileResource& other) {
return other.filename == filename;
}
void load() {
// See: loadPlatformFile
/*
std::ifstream f(filename.c_str(), std::ifstream::ate | std::ifstream::binary);
if (!f.fail()) {
data.resize(f.tellg());
f.seekg(0, std::ios::beg);
data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
Log(LL_Debug, "loaded file %s contents[%zu]: %s", filename.c_str(), data.size(), data.data());
}
*/
FILE* f = fopen(filename.c_str(), "rb");
if (f) {
fseek(f, 0, SEEK_END);
size_t siz = ftell(f);
fseek(f, 0, SEEK_SET);
data.resize(siz);
size_t r = fread(data.data(), siz, 1, f);
Log(LL_Debug, "loaded file %s contents[%zu]: %s", filename.c_str(), r, data.data());
fclose(f);
}
}
std::string filename;
};
struct PngFileResource : public Resource
{
PngFileResource(const char* a_filename) {
filename = a_filename;
state = Pending;
}
bool operator==(PngFileResource& other) {
return other.filename == filename;
}
void load() {
//! @todo make load a PNG
FILE* f = fopen(filename.c_str(), "rb");
if (f) {
fseek(f, 0, SEEK_END);
size_t siz = ftell(f);
fseek(f, 0, SEEK_SET);
data.resize(siz);
size_t r = fread(data.data(), siz, 1, f);
Log(LL_Debug, "loaded file %s contents[%zu]: %s", filename.c_str(), r, data.data());
fclose(f);
}
}
std::string filename;
};
extern void loadPlatformFile(const char* filename, std::vector<uint8_t>& data);
struct AssetResource : public Resource
{
AssetResource(const char* a_filename) {
filename = a_filename;
state = Pending;
}
bool operator==(AssetResource& other) {
return other.filename == filename;
}
void load() {
#ifndef USE_SERVER
loadPlatformFile(filename.c_str(), data);
#else
std::string asset_url = SERVER_IP ":8005";
HttpRequest request = { "GET", "http://" + asset_url + "/asset/" + filename, {} };
HttpResponse result;
makeHttpRequestWrapper(request, result, data);
Log(LL_Debug, "status: %s", result.status.c_str());
//Log(LL_Debug, "loaded file %s contents: %s", filename.c_str(), data.data());
#endif
}
std::string filename;
};
/*
Document d;
if ((ParseResult)d.Parse((const char*)result.body.data())) {
std::string access_token = d["token"].GetString();
Log(LL_Debug, "token = %s", access_token.c_str());
}
*/
// Async API
Resource* loadFileAsync(const char* filename, bool onlineAsset, std::function<void(Resource*)> callback)
{
Resource* tmpRes = nullptr;
ResourceLoader::ResourcePool pool = ResourceLoader::FilePool;
if (onlineAsset) {
tmpRes = new AssetResource(filename);
pool = ResourceLoader::NetworkPool;
} else {
std::string fn(filename);
std::string ext = fn.substr(fn.find_last_of(".") + 1);
if (ext == "png")
tmpRes = new PngFileResource(filename);
else
tmpRes = new TextFileResource(filename);
}
Resource* res = ResourceLoaders::FindResourceJob(*tmpRes, pool);
if (!res) {
// Create a new job
res = tmpRes;
res->callback = callback;
ResourceLoaders::addJobToQueue(res, pool);
} else {
delete tmpRes;
Log(LL_Debug, "Found existing job, chaining callbacks");
if (res->isLoaded()) {
// Found in the cache and loaded already
callback(res);
} else {
// Found in the cache, but not loaded yet, so we need to chain the callbacks
std::function<void(Resource*)> originalCallback = res->callback;
// Call the existing callback followed by the callback we have just been passed
res->callback = [originalCallback, callback](Resource* r) { originalCallback(r); callback(r); };;
}
}
return res;
}
// Non-async API
Resource* loadFile(const char* filename, bool onlineAsset)
{
Resource* res = loadFileAsync(filename, onlineAsset, [](Resource* /*r*/){} );
res->wait();
return res;
}
#ifdef UNIT_TESTS
void unit_test()
{
auto callback = [](Resource*){};
Log(LL_Debug, "entered main");
//g_resourceLoader.Start();
Log(LL_Debug, "adding job 1");
Resource* r1 = loadFileAsync("test1.txt", false, callback);
Log(LL_Debug, "adding job 2");
Resource* r2 = loadFileAsync("test2.txt", false, callback);
Log(LL_Debug, "adding job 1");
Resource* r3 = loadFileAsync("test1.txt", false, callback);
Log(LL_Debug, "adding job 3");
Resource* r4 = loadFileAsync("test3.txt", false, callback);
Log(LL_Debug, "waiting for job to finish");
r1->wait();
Log(LL_Debug, "adding job 4");
Resource* r5 = loadFile("test4.txt", false);
Log(LL_Debug, "adding job 2");
Resource* r6 = loadFile("test2.txt", false);
Log(LL_Debug, "adding job 5");
Resource* r7 = loadFileAsync("test5.txt", false, callback);
Log(LL_Debug, "waiting for job to finish");
r7->wait();
//g_resourceLoader.Stop();
Log(LL_Debug, "leaving main");
}
#endif
#if defined(CODE_GEN) || defined(UNIT_TESTS)
#include <fstream>
void loadPlatformFile(const char* filename, std::vector<uint8_t>& data)
{
#if 0
std::ifstream file(filename, std::ifstream::ate | std::ifstream::binary);
#else
data.clear();
std::ifstream file(filename, std::ios::binary);
#endif
if (!file.fail()) {
#if 0
data.resize(f.tellg());
file.seekg(0, std::ios::beg);
data.assign((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
#else
file.seekg(0, std::ios::end);
data.resize(file.tellg());
file.seekg(0, std::ios::beg);
file.read((char*)data.data(), data.size());
#endif
LogTag(LL_Error, "LDR", "lens: %lu", data.size());
} else {
LogTag(LL_Error, "LDR", "Couldn't load %s file", filename);
}
}
void filterLogger(enum LogLevel a_level, const char* a_tag,
const char* a_file, const char* a_function, int a_line, const char* a_fmt, ...)
{
if (a_level == LL_Trace)
return;
va_list args;
va_start(args, a_fmt);
static std::mutex logMutex;
std::lock_guard<std::mutex> lock(logMutex);
fprintf(stderr, "[%i] [%s] %s (%i): ", pthread_mach_thread_np(pthread_self()), a_tag, a_file, a_line);
fprintf(stderr, a_fmt, args);
fprintf(stderr, "\n");
va_end(args);
}
int main(int argc, char* argv[])
{
#ifdef CODE_GEN
code_gen();
#endif
#ifdef UNIT_TESTS
unit_test();
#endif
}
#endif