mirror of https://github.com/qTox/qTox.git
Browse Source
qTox will automatically import the old history on startup. This new database code is much more robust. It is very resilient and will not corrupt or disappear after a crash or power failure, unlike the old code. The on-disk database format is also much more compact now. The database sync option in the advanced settings has been removed, we know run many database operations asynchronously so performance should not be a problem anymore, but we always ensure resiliency in case of abrupt termination, so there is no tradeoff anymore.pull/2701/head
31 changed files with 1045 additions and 455 deletions
@ -0,0 +1,466 @@
@@ -0,0 +1,466 @@
|
||||
#include "rawdatabase.h" |
||||
#include <QDebug> |
||||
#include <QMetaObject> |
||||
#include <QMutexLocker> |
||||
#include <QCoreApplication> |
||||
#include <QFile> |
||||
#include <cassert> |
||||
#include <tox/toxencryptsave.h> |
||||
|
||||
/// The two following defines are required to use SQLCipher
|
||||
/// They are used by the sqlite3.h header
|
||||
#define SQLITE_HAS_CODEC |
||||
#define SQLITE_TEMP_STORE 2 |
||||
|
||||
#include <sqlcipher/sqlite3.h> |
||||
|
||||
RawDatabase::RawDatabase(const QString &path, const QString& password) |
||||
: workerThread{new QThread}, path{path}, currentHexKey{deriveKey(password)} |
||||
{ |
||||
workerThread->setObjectName("qTox Database"); |
||||
moveToThread(workerThread.get()); |
||||
workerThread->start(); |
||||
|
||||
if (!open(path, currentHexKey)) |
||||
return; |
||||
} |
||||
|
||||
RawDatabase::~RawDatabase() |
||||
{ |
||||
close(); |
||||
workerThread->exit(0); |
||||
while (workerThread->isRunning()) |
||||
workerThread->wait(50); |
||||
} |
||||
|
||||
bool RawDatabase::open(const QString& path, const QString &hexKey) |
||||
{ |
||||
if (QThread::currentThread() != workerThread.get()) |
||||
{ |
||||
bool ret; |
||||
QMetaObject::invokeMethod(this, "open", Qt::BlockingQueuedConnection, Q_RETURN_ARG(bool, ret), |
||||
Q_ARG(const QString&, path), Q_ARG(const QString&, hexKey)); |
||||
return ret; |
||||
} |
||||
|
||||
if (!QFile::exists(path) && QFile::exists(path+".tmp")) |
||||
{ |
||||
qWarning() << "Restoring database from temporary export file! Did we crash while changing the password?"; |
||||
QFile::rename(path+".tmp", path); |
||||
} |
||||
|
||||
if (sqlite3_open_v2(path.toUtf8().data(), &sqlite, |
||||
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, nullptr) != SQLITE_OK) |
||||
{ |
||||
qWarning() << "Failed to open database"<<path<<"with error:"<<sqlite3_errmsg(sqlite); |
||||
return false; |
||||
} |
||||
|
||||
if (!hexKey.isEmpty()) |
||||
{ |
||||
if (!execNow("PRAGMA key = \"x'"+hexKey+"'\"")) |
||||
{ |
||||
qWarning() << "Failed to set encryption key"; |
||||
close(); |
||||
return false; |
||||
} |
||||
|
||||
if (!execNow("SELECT count(*) FROM sqlite_master")) |
||||
{ |
||||
qWarning() << "Database is unusable, check that the password is correct"; |
||||
close(); |
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
void RawDatabase::close() |
||||
{ |
||||
if (QThread::currentThread() != workerThread.get()) |
||||
return (void)QMetaObject::invokeMethod(this, "close", Qt::BlockingQueuedConnection); |
||||
|
||||
// We assume we're in the ctor or dtor, so we just need to finish processing our transactions
|
||||
process(); |
||||
|
||||
if (sqlite3_close(sqlite) == SQLITE_OK) |
||||
sqlite = nullptr; |
||||
else |
||||
qWarning() << "Error closing database:"<<sqlite3_errmsg(sqlite); |
||||
} |
||||
|
||||
bool RawDatabase::isOpen() |
||||
{ |
||||
// We don't need thread safety since only the ctor/dtor can write this pointer
|
||||
return sqlite != nullptr; |
||||
} |
||||
|
||||
bool RawDatabase::execNow(const QString& statement) |
||||
{ |
||||
return execNow(Query{statement}); |
||||
} |
||||
|
||||
bool RawDatabase::execNow(const RawDatabase::Query &statement) |
||||
{ |
||||
return execNow(QVector<Query>{statement}); |
||||
} |
||||
|
||||
bool RawDatabase::execNow(const QVector<RawDatabase::Query> &statements) |
||||
{ |
||||
if (!sqlite) |
||||
{ |
||||
qWarning() << "Trying to exec, but the database is not open"; |
||||
return false; |
||||
} |
||||
|
||||
std::atomic_bool done{false}; |
||||
std::atomic_bool success{false}; |
||||
|
||||
Transaction trans; |
||||
trans.queries = statements; |
||||
trans.done = &done; |
||||
trans.success = &success; |
||||
{ |
||||
QMutexLocker locker{&transactionsMutex}; |
||||
pendingTransactions.enqueue(trans); |
||||
} |
||||
|
||||
// We can't use blocking queued here, otherwise we might process future transactions
|
||||
// before returning, but we only want to wait until this transaction is done.
|
||||
QMetaObject::invokeMethod(this, "process"); |
||||
while (!done.load(std::memory_order_acquire)) |
||||
QThread::msleep(10); |
||||
|
||||
return success.load(std::memory_order_acquire); |
||||
} |
||||
|
||||
void RawDatabase::execLater(const QString &statement) |
||||
{ |
||||
execLater(Query{statement}); |
||||
} |
||||
|
||||
void RawDatabase::execLater(const RawDatabase::Query &statement) |
||||
{ |
||||
execLater(QVector<Query>{statement}); |
||||
} |
||||
|
||||
void RawDatabase::execLater(const QVector<RawDatabase::Query> &statements) |
||||
{ |
||||
if (!sqlite) |
||||
{ |
||||
qWarning() << "Trying to exec, but the database is not open"; |
||||
return; |
||||
} |
||||
|
||||
Transaction trans; |
||||
trans.queries = statements; |
||||
{ |
||||
QMutexLocker locker{&transactionsMutex}; |
||||
pendingTransactions.enqueue(trans); |
||||
} |
||||
|
||||
QMetaObject::invokeMethod(this, "process"); |
||||
} |
||||
|
||||
void RawDatabase::sync() |
||||
{ |
||||
QMetaObject::invokeMethod(this, "process", Qt::BlockingQueuedConnection); |
||||
} |
||||
|
||||
bool RawDatabase::setPassword(const QString& password) |
||||
{ |
||||
if (!sqlite) |
||||
{ |
||||
qWarning() << "Trying to change the password, but the database is not open"; |
||||
return false; |
||||
} |
||||
|
||||
if (QThread::currentThread() != workerThread.get()) |
||||
{ |
||||
bool ret; |
||||
QMetaObject::invokeMethod(this, "setPassword", Qt::BlockingQueuedConnection, |
||||
Q_RETURN_ARG(bool, ret), Q_ARG(const QString&, password)); |
||||
return ret; |
||||
} |
||||
|
||||
// If we need to decrypt or encrypt, we'll need to sync and close,
|
||||
// so we always process the pending queue before rekeying for consistency
|
||||
process(); |
||||
|
||||
if (QFile::exists(path+".tmp")) |
||||
{ |
||||
qWarning() << "Found old temporary export file while rekeying, deleting it"; |
||||
QFile::remove(path+".tmp"); |
||||
} |
||||
|
||||
if (!password.isEmpty()) |
||||
{ |
||||
QString newHexKey = deriveKey(password); |
||||
if (!currentHexKey.isEmpty()) |
||||
{ |
||||
if (!execNow("PRAGMA rekey = \"x'"+newHexKey+"'\"")) |
||||
{ |
||||
qWarning() << "Failed to change encryption key"; |
||||
close(); |
||||
return false; |
||||
} |
||||
} |
||||
else |
||||
{ |
||||
// Need to encrypt the database
|
||||
if (!execNow("ATTACH DATABASE '"+path+".tmp' AS encrypted KEY \"x'"+newHexKey+"'\";" |
||||
"SELECT sqlcipher_export('encrypted');" |
||||
"DETACH DATABASE encrypted;")) |
||||
{ |
||||
qWarning() << "Failed to export encrypted database"; |
||||
close(); |
||||
return false; |
||||
} |
||||
|
||||
// This is racy as hell, but nobody will race with us since we hold the profile lock
|
||||
// If we crash or die here, the rename should be atomic, so we can recover no matter what
|
||||
close(); |
||||
QFile::remove(path); |
||||
QFile::rename(path+".tmp", path); |
||||
currentHexKey = newHexKey; |
||||
if (!open(path, currentHexKey)) |
||||
{ |
||||
qWarning() << "Failed to open encrypted database"; |
||||
return false; |
||||
} |
||||
} |
||||
} |
||||
else |
||||
{ |
||||
if (currentHexKey.isEmpty()) |
||||
return true; |
||||
|
||||
// Need to decrypt the database
|
||||
if (!execNow("ATTACH DATABASE '"+path+".tmp' AS plaintext KEY '';" |
||||
"SELECT sqlcipher_export('plaintext');" |
||||
"DETACH DATABASE plaintext;")) |
||||
{ |
||||
qWarning() << "Failed to export decrypted database"; |
||||
close(); |
||||
return false; |
||||
} |
||||
|
||||
// This is racy as hell, but nobody will race with us since we hold the profile lock
|
||||
// If we crash or die here, the rename should be atomic, so we can recover no matter what
|
||||
close(); |
||||
QFile::remove(path); |
||||
QFile::rename(path+".tmp", path); |
||||
currentHexKey.clear(); |
||||
if (!open(path)) |
||||
{ |
||||
qCritical() << "Failed to open decrypted database"; |
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
bool RawDatabase::rename(const QString &newPath) |
||||
{ |
||||
if (!sqlite) |
||||
{ |
||||
qWarning() << "Trying to change the password, but the database is not open"; |
||||
return false; |
||||
} |
||||
|
||||
if (QThread::currentThread() != workerThread.get()) |
||||
{ |
||||
bool ret; |
||||
QMetaObject::invokeMethod(this, "rename", Qt::BlockingQueuedConnection, |
||||
Q_RETURN_ARG(bool, ret), Q_ARG(const QString&, newPath)); |
||||
return ret; |
||||
} |
||||
|
||||
process(); |
||||
|
||||
if (path == newPath) |
||||
return true; |
||||
|
||||
if (QFile::exists(newPath)) |
||||
return false; |
||||
|
||||
close(); |
||||
if (!QFile::rename(path, newPath)) |
||||
return false; |
||||
path = newPath; |
||||
return open(path, currentHexKey); |
||||
} |
||||
|
||||
|
||||
QString RawDatabase::deriveKey(QString password) |
||||
{ |
||||
if (password.isEmpty()) |
||||
return {}; |
||||
|
||||
QByteArray passData = password.toUtf8(); |
||||
|
||||
static_assert(TOX_PASS_KEY_LENGTH >= 32, "toxcore must provide 256bit or longer keys"); |
||||
|
||||
static const uint8_t expandConstant[TOX_PASS_SALT_LENGTH+1] = "L'ignorance est le pire des maux"; |
||||
TOX_PASS_KEY key; |
||||
tox_derive_key_with_salt((uint8_t*)passData.data(), passData.size(), expandConstant, &key, nullptr); |
||||
return QByteArray((char*)key.key, 32).toHex(); |
||||
} |
||||
|
||||
void RawDatabase::process() |
||||
{ |
||||
assert(QThread::currentThread() == workerThread.get()); |
||||
|
||||
if (!sqlite) |
||||
return; |
||||
|
||||
forever |
||||
{ |
||||
// Fetch the next transaction
|
||||
Transaction trans; |
||||
{ |
||||
QMutexLocker locker{&transactionsMutex}; |
||||
if (pendingTransactions.isEmpty()) |
||||
return; |
||||
trans = pendingTransactions.dequeue(); |
||||
} |
||||
|
||||
// In case we exit early, prepare to signal errors
|
||||
if (trans.success != nullptr) |
||||
trans.success->store(false, std::memory_order_release); |
||||
|
||||
// Add transaction commands if necessary
|
||||
if (trans.queries.size() > 1) |
||||
{ |
||||
trans.queries.prepend(Query{"BEGIN;"}); |
||||
trans.queries.append({"COMMIT;"}); |
||||
} |
||||
|
||||
// Compile queries
|
||||
for (Query& query : trans.queries) |
||||
{ |
||||
assert(query.statements.isEmpty()); |
||||
// sqlite3_prepare_v2 only compiles one statement at a time in the query, we need to loop over them all
|
||||
int curParam=0; |
||||
const char* compileTail = query.query.data(); |
||||
do { |
||||
// Compile the next statement
|
||||
sqlite3_stmt* stmt; |
||||
int r; |
||||
if ((r = sqlite3_prepare_v2(sqlite, compileTail, |
||||
query.query.size() - static_cast<int>(compileTail - query.query.data()), |
||||
&stmt, &compileTail)) != SQLITE_OK) |
||||
{ |
||||
qWarning() << "Failed to prepare statement"<<query.query<<"with error"<<r; |
||||
goto cleanupStatements; |
||||
} |
||||
query.statements += stmt; |
||||
|
||||
// Now we can bind our params to this statement
|
||||
int nParams = sqlite3_bind_parameter_count(stmt); |
||||
if (query.blobs.size() < curParam+nParams) |
||||
{ |
||||
qWarning() << "Not enough parameters to bind to query "<<query.query; |
||||
goto cleanupStatements; |
||||
} |
||||
for (int i=0; i<nParams; ++i) |
||||
{ |
||||
const QByteArray& blob = query.blobs[curParam+i]; |
||||
if (sqlite3_bind_blob(stmt, i+1, blob.data(), blob.size(), SQLITE_STATIC) != SQLITE_OK) |
||||
{ |
||||
qWarning() << "Failed to bind param"<<curParam+i<<"to query "<<query.query; |
||||
goto cleanupStatements; |
||||
} |
||||
} |
||||
curParam += nParams; |
||||
} while (compileTail != query.query.data()+query.query.size()); |
||||
} |
||||
|
||||
// Execute each statement of each query of our transaction
|
||||
for (Query& query : trans.queries) |
||||
{ |
||||
for (sqlite3_stmt* stmt : query.statements) |
||||
{ |
||||
int column_count = sqlite3_column_count(stmt); |
||||
int result; |
||||
do { |
||||
result = sqlite3_step(stmt); |
||||
|
||||
// Execute our row callback
|
||||
if (result == SQLITE_ROW && query.rowCallback) |
||||
{ |
||||
QVector<QVariant> row; |
||||
for (int i=0; i<column_count; ++i) |
||||
row += extractData(stmt, i); |
||||
|
||||
query.rowCallback(row); |
||||
} |
||||
} while (result == SQLITE_ROW); |
||||
if (result == SQLITE_ERROR) |
||||
{ |
||||
qWarning() << "Error executing query "<<query.query; |
||||
goto cleanupStatements; |
||||
} |
||||
else if (result == SQLITE_MISUSE) |
||||
{ |
||||
qWarning() << "Misuse executing query "<<query.query; |
||||
goto cleanupStatements; |
||||
} |
||||
else if (result == SQLITE_CONSTRAINT) |
||||
{ |
||||
qWarning() << "Constraint error executing query "<<query.query; |
||||
goto cleanupStatements; |
||||
} |
||||
else if (result != SQLITE_DONE) |
||||
{ |
||||
qWarning() << "Unknown error"<<result<<"executing query "<<query.query; |
||||
goto cleanupStatements; |
||||
} |
||||
} |
||||
|
||||
if (query.insertCallback) |
||||
query.insertCallback(sqlite3_last_insert_rowid(sqlite)); |
||||
} |
||||
|
||||
if (trans.success != nullptr) |
||||
trans.success->store(true, std::memory_order_release); |
||||
|
||||
// Free our statements
|
||||
cleanupStatements: |
||||
for (Query& query : trans.queries) |
||||
{ |
||||
for (sqlite3_stmt* stmt : query.statements) |
||||
sqlite3_finalize(stmt); |
||||
query.statements.clear(); |
||||
} |
||||
|
||||
// Signal transaction results
|
||||
if (trans.done != nullptr) |
||||
trans.done->store(true, std::memory_order_release); |
||||
} |
||||
} |
||||
|
||||
QVariant RawDatabase::extractData(sqlite3_stmt *stmt, int col) |
||||
{ |
||||
int type = sqlite3_column_type(stmt, col); |
||||
if (type == SQLITE_INTEGER) |
||||
{ |
||||
return sqlite3_column_int64(stmt, col); |
||||
} |
||||
else if (type == SQLITE_TEXT) |
||||
{ |
||||
const char* str = reinterpret_cast<const char*>(sqlite3_column_text(stmt, col)); |
||||
int len = sqlite3_column_bytes(stmt, col); |
||||
return QString::fromUtf8(str, len); |
||||
} |
||||
else if (type == SQLITE_NULL) |
||||
{ |
||||
return QVariant{}; |
||||
} |
||||
else |
||||
{ |
||||
const char* data = reinterpret_cast<const char*>(sqlite3_column_blob(stmt, col)); |
||||
int len = sqlite3_column_bytes(stmt, col); |
||||
return QByteArray::fromRawData(data, len); |
||||
} |
||||
} |
||||
@ -0,0 +1,115 @@
@@ -0,0 +1,115 @@
|
||||
#ifndef RAWDATABASE_H |
||||
#define RAWDATABASE_H |
||||
|
||||
#include <QString> |
||||
#include <QByteArray> |
||||
#include <QThread> |
||||
#include <QQueue> |
||||
#include <QVector> |
||||
#include <QPair> |
||||
#include <QMutex> |
||||
#include <QVariant> |
||||
#include <memory> |
||||
#include <atomic> |
||||
|
||||
struct sqlite3; |
||||
struct sqlite3_stmt; |
||||
|
||||
/// Implements a low level RAII interface to a SQLCipher (SQlite3) database
|
||||
/// Thread-safe, does all database operations on a worker thread
|
||||
/// The queries must not contain transaction commands (BEGIN/COMMIT/...) or the behavior is undefined
|
||||
class RawDatabase : QObject |
||||
{ |
||||
Q_OBJECT |
||||
|
||||
public: |
||||
/// A query to be executed by the database. Can be composed of one or more SQL statements in the query,
|
||||
/// optional BLOB parameters to be bound, and callbacks fired when the query is executed
|
||||
/// Calling any database method from a query callback is undefined behavior
|
||||
class Query |
||||
{ |
||||
public: |
||||
Query(QString query, QVector<QByteArray> blobs = {}, std::function<void(int64_t)> insertCallback={}) |
||||
: query{query.toUtf8()}, blobs{blobs}, insertCallback{insertCallback} {} |
||||
Query(QString query, std::function<void(int64_t)> insertCallback) |
||||
: query{query.toUtf8()}, insertCallback{insertCallback} {} |
||||
Query(QString query, std::function<void(const QVector<QVariant>&)> rowCallback) |
||||
: query{query.toUtf8()}, rowCallback{rowCallback} {} |
||||
Query() = default; |
||||
private: |
||||
QByteArray query; ///< UTF-8 query string
|
||||
QVector<QByteArray> blobs; ///< Bound data blobs
|
||||
std::function<void(int64_t)> insertCallback; ///< Called after execution with the last insert rowid
|
||||
std::function<void(const QVector<QVariant>&)> rowCallback; ///< Called during execution for each row
|
||||
QVector<sqlite3_stmt*> statements; ///< Statements to be compiled from the query
|
||||
|
||||
friend class RawDatabase; |
||||
}; |
||||
|
||||
public: |
||||
/// Tries to open a database
|
||||
/// If password is empty, the database will be opened unencrypted
|
||||
/// Otherwise we will use toxencryptsave to derive a key and encrypt the database
|
||||
RawDatabase(const QString& path, const QString& password); |
||||
~RawDatabase(); |
||||
bool isOpen(); ///< Returns true if the database was opened successfully
|
||||
/// Executes a SQL transaction synchronously.
|
||||
/// Returns whether the transaction was successful.
|
||||
bool execNow(const QString& statement); |
||||
bool execNow(const Query& statement); |
||||
bool execNow(const QVector<Query>& statements); |
||||
/// Executes a SQL transaction asynchronously.
|
||||
void execLater(const QString& statement); |
||||
void execLater(const Query& statement); |
||||
void execLater(const QVector<Query>& statements); |
||||
/// Waits until all the pending transactions are executed
|
||||
void sync(); |
||||
|
||||
public slots: |
||||
/// Changes the database password, encrypting or decrypting if necessary
|
||||
/// If password is empty, the database will be decrypted
|
||||
/// Will process all transactions before changing the password
|
||||
bool setPassword(const QString& password); |
||||
/// Moves the database file on disk to match the new path
|
||||
/// /// Will process all transactions before renaming
|
||||
bool rename(const QString& newPath); |
||||
|
||||
protected slots: |
||||
/// Should only be called from the constructor, runs on the caller's thread
|
||||
bool open(const QString& path, const QString& hexKey = {}); |
||||
/// Should only be called from the destructor, runs on the caller's thread
|
||||
void close(); |
||||
/// Implements the actual processing of pending transactions
|
||||
/// Unqueues, compiles, binds and executes queries, then notifies of results
|
||||
/// MUST only be called from the worker thread
|
||||
void process(); |
||||
/// Extracts a variant from one column of a result row depending on the column type
|
||||
QVariant extractData(sqlite3_stmt* stmt, int col); |
||||
|
||||
protected: |
||||
/// Derives a 256bit key from the password and returns it hex-encoded
|
||||
static QString deriveKey(QString password); |
||||
|
||||
private: |
||||
/// SQL transactions to be processed
|
||||
/// A transaction is made of queries, which can have bound BLOBs
|
||||
struct Transaction |
||||
{ |
||||
QVector<Query> queries; |
||||
/// If not a nullptr, the result of the transaction will be set
|
||||
std::atomic_bool* success = nullptr; |
||||
/// If not a nullptr, will be set to true when the transaction has been executed
|
||||
std::atomic_bool* done = nullptr; |
||||
}; |
||||
|
||||
private: |
||||
sqlite3* sqlite; |
||||
std::unique_ptr<QThread> workerThread; |
||||
QQueue<Transaction> pendingTransactions; |
||||
/// Protects pendingTransactions
|
||||
QMutex transactionsMutex; |
||||
QString path; |
||||
QString currentHexKey; |
||||
}; |
||||
|
||||
#endif // RAWDATABASE_H
|
||||
@ -0,0 +1,231 @@
@@ -0,0 +1,231 @@
|
||||
#include "history.h" |
||||
#include "src/persistence/profile.h" |
||||
#include "src/persistence/settings.h" |
||||
#include "src/persistence/db/rawdatabase.h" |
||||
#include "src/persistence/historykeeper.h" |
||||
#include <QDebug> |
||||
#include <cassert> |
||||
|
||||
using namespace std; |
||||
|
||||
History::History(const QString &profileName, const QString &password) |
||||
: db{getDbPath(profileName), password} |
||||
{ |
||||
init(); |
||||
} |
||||
|
||||
History::History(const QString &profileName, const QString &password, const HistoryKeeper &oldHistory) |
||||
: History{profileName, password} |
||||
{ |
||||
import(oldHistory); |
||||
} |
||||
|
||||
History::~History() |
||||
{ |
||||
// We could have execLater requests pending with a lambda attached,
|
||||
// so clear the pending transactions first
|
||||
db.sync(); |
||||
} |
||||
|
||||
bool History::isValid() |
||||
{ |
||||
return db.isOpen(); |
||||
} |
||||
|
||||
void History::setPassword(const QString& password) |
||||
{ |
||||
db.setPassword(password); |
||||
} |
||||
|
||||
void History::rename(const QString &newName) |
||||
{ |
||||
db.rename(getDbPath(newName)); |
||||
} |
||||
|
||||
void History::eraseHistory() |
||||
{ |
||||
db.execNow("DELETE FROM faux_offline_pending;" |
||||
"DELETE FROM history;" |
||||
"DELETE FROM aliases;" |
||||
"DELETE FROM peers;" |
||||
"VACUUM;"); |
||||
} |
||||
|
||||
void History::removeFriendHistory(const QString &friendPk) |
||||
{ |
||||
if (!peers.contains(friendPk)) |
||||
return; |
||||
int64_t id = peers[friendPk]; |
||||
|
||||
if (db.execNow(QString("DELETE FROM faux_offline_pending " |
||||
"WHERE faux_offline_pending.id IN ( " |
||||
"SELECT faux_offline_pending.id FROM faux_offline_pending " |
||||
"LEFT JOIN history ON faux_offline_pending.id = history.id " |
||||
"WHERE chat_id=%1 " |
||||
"); " |
||||
"DELETE FROM history WHERE chat_id=%1; " |
||||
"DELETE FROM aliases WHERE owner=%1; " |
||||
"DELETE FROM peers WHERE id=%1; " |
||||
"VACUUM;").arg(id))) |
||||
{ |
||||
peers.remove(friendPk); |
||||
} |
||||
else |
||||
{ |
||||
qWarning() << "Failed to remove friend's history"; |
||||
} |
||||
} |
||||
|
||||
QVector<RawDatabase::Query> History::generateNewMessageQueries(const QString &friendPk, const QString &message, |
||||
const QString &sender, const QDateTime &time, bool isSent, QString dispName, |
||||
std::function<void(int64_t)> insertIdCallback) |
||||
{ |
||||
QVector<RawDatabase::Query> queries; |
||||
|
||||
// Get the db id of the peer we're chatting with
|
||||
int64_t peerId; |
||||
if (peers.contains(friendPk)) |
||||
{ |
||||
peerId = peers[friendPk]; |
||||
} |
||||
else |
||||
{ |
||||
if (peers.isEmpty()) |
||||
peerId = 0; |
||||
else |
||||
peerId = *max_element(begin(peers), end(peers))+1; |
||||
peers[friendPk] = peerId; |
||||
queries += RawDatabase::Query{("INSERT INTO peers (id, public_key) VALUES (%1, '"+friendPk+"');").arg(peerId)}; |
||||
} |
||||
|
||||
// Get the db id of the sender of the message
|
||||
int64_t senderId; |
||||
if (peers.contains(sender)) |
||||
{ |
||||
senderId = peers[sender]; |
||||
} |
||||
else |
||||
{ |
||||
if (peers.isEmpty()) |
||||
senderId = 0; |
||||
else |
||||
senderId = *max_element(begin(peers), end(peers))+1; |
||||
peers[sender] = senderId; |
||||
queries += RawDatabase::Query{("INSERT INTO peers (id, public_key) VALUES (%1, '"+sender+"');").arg(senderId)}; |
||||
} |
||||
|
||||
queries += RawDatabase::Query(QString("INSERT OR IGNORE INTO aliases (owner, display_name) VALUES (%1, ?);") |
||||
.arg(senderId), {dispName.toUtf8()}); |
||||
|
||||
// If the alias already existed, the insert will ignore the conflict and last_insert_rowid() will return garbage,
|
||||
// so we have to check changes() and manually fetch the row ID in this case
|
||||
queries += RawDatabase::Query(QString("INSERT INTO history (timestamp, chat_id, message, sender_alias) " |
||||
"VALUES (%1, %2, ?, (" |
||||
" CASE WHEN changes() IS 0 THEN (" |
||||
" SELECT id FROM aliases WHERE owner=%3 AND display_name=?)" |
||||
" ELSE last_insert_rowid() END" |
||||
"));") |
||||
.arg(time.toMSecsSinceEpoch()).arg(peerId).arg(senderId), |
||||
{message.toUtf8(), dispName.toUtf8()}, insertIdCallback); |
||||
|
||||
if (!isSent) |
||||
queries += RawDatabase::Query{"INSERT INTO faux_offline_pending (id) VALUES (last_insert_rowid());"}; |
||||
|
||||
return queries; |
||||
} |
||||
|
||||
void History::addNewMessage(const QString &friendPk, const QString &message, const QString &sender, |
||||
const QDateTime &time, bool isSent, QString dispName, std::function<void(int64_t)> insertIdCallback) |
||||
{ |
||||
db.execLater(generateNewMessageQueries(friendPk, message, sender, time, isSent, dispName, insertIdCallback)); |
||||
} |
||||
|
||||
QList<History::HistMessage> History::getChatHistory(const QString &friendPk, const QDateTime &from, const QDateTime &to) |
||||
{ |
||||
QList<HistMessage> messages; |
||||
|
||||
auto rowCallback = [&messages](const QVector<QVariant>& row) |
||||
{ |
||||
messages += {row[0].toLongLong(), |
||||
row[1].isNull(), |
||||
QDateTime::fromMSecsSinceEpoch(row[2].toLongLong()), |
||||
row[3].toString(), |
||||
row[4].toString(), |
||||
row[5].toString(), |
||||
row[6].toString()}; |
||||
}; |
||||
|
||||
// Don't forget to update the rowCallback if you change the selected columns!
|
||||
db.execNow({QString("SELECT history.id, faux_offline_pending.id, timestamp, chat.public_key, " |
||||
"aliases.display_name, sender.public_key, message FROM history " |
||||
"LEFT JOIN faux_offline_pending ON history.id = faux_offline_pending.id " |
||||
"JOIN peers chat ON chat_id = chat.id " |
||||
"JOIN aliases ON sender_alias = aliases.id " |
||||
"JOIN peers sender ON aliases.owner = sender.id " |
||||
"WHERE timestamp BETWEEN %1 AND %2 AND chat.public_key='%3';") |
||||
.arg(from.toMSecsSinceEpoch()).arg(to.toMSecsSinceEpoch()).arg(friendPk), rowCallback}); |
||||
|
||||
return messages; |
||||
} |
||||
|
||||
void History::markAsSent(qint64 id) |
||||
{ |
||||
db.execLater(QString("DELETE FROM faux_offline_pending WHERE id=%1;").arg(id)); |
||||
} |
||||
|
||||
QString History::getDbPath(const QString &profileName) |
||||
{ |
||||
return Settings::getInstance().getSettingsDirPath() + profileName + ".db"; |
||||
} |
||||
|
||||
void History::init() |
||||
{ |
||||
if (!isValid()) |
||||
{ |
||||
qWarning() << "Database not open, init failed"; |
||||
return; |
||||
} |
||||
|
||||
db.execLater("CREATE TABLE IF NOT EXISTS peers (id INTEGER PRIMARY KEY, public_key TEXT NOT NULL UNIQUE);" |
||||
"CREATE TABLE IF NOT EXISTS aliases (id INTEGER PRIMARY KEY, owner INTEGER," |
||||
"display_name BLOB NOT NULL, UNIQUE(owner, display_name));" |
||||
"CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, timestamp INTEGER NOT NULL, " |
||||
"chat_id INTEGER NOT NULL, sender_alias INTEGER NOT NULL, " |
||||
"message BLOB NOT NULL);" |
||||
"CREATE TABLE IF NOT EXISTS faux_offline_pending (id INTEGER PRIMARY KEY);"); |
||||
|
||||
// Cache our current peers
|
||||
db.execLater(RawDatabase::Query{"SELECT id, public_key FROM peers;", [this](const QVector<QVariant>& row) |
||||
{ |
||||
peers[row[1].toString()] = row[0].toInt(); |
||||
}}); |
||||
} |
||||
|
||||
void History::import(const HistoryKeeper &oldHistory) |
||||
{ |
||||
if (!isValid()) |
||||
{ |
||||
qWarning() << "New database not open, import failed"; |
||||
return; |
||||
} |
||||
|
||||
qDebug() << "Importing old database..."; |
||||
QTime t=QTime::currentTime(); |
||||
t.start(); |
||||
QVector<RawDatabase::Query> queries; |
||||
constexpr int batchSize = 1000; |
||||
queries.reserve(batchSize); |
||||
QList<HistoryKeeper::HistMessage> oldMessages = oldHistory.exportMessagesDeleteFile(); |
||||
for (const HistoryKeeper::HistMessage& msg : oldMessages) |
||||
{ |
||||
queries += generateNewMessageQueries(msg.chat, msg.message, msg.sender, msg.timestamp, true, msg.dispName); |
||||
if (queries.size() == batchSize) |
||||
{ |
||||
db.execLater(queries); |
||||
queries.clear(); |
||||
} |
||||
} |
||||
db.execLater(queries); |
||||
db.sync(); |
||||
qDebug() << "Imported old database in"<<t.elapsed()<<"ms"; |
||||
} |
||||
@ -0,0 +1,77 @@
@@ -0,0 +1,77 @@
|
||||
#ifndef HISTORY_H |
||||
#define HISTORY_H |
||||
|
||||
#include <tox/toxencryptsave.h> |
||||
#include <QDateTime> |
||||
#include <QVector> |
||||
#include <QHash> |
||||
#include <cstdint> |
||||
#include "src/persistence/db/rawdatabase.h" |
||||
|
||||
class Profile; |
||||
class HistoryKeeper; |
||||
class RawDatabase; |
||||
|
||||
/// Interacts with the profile database to save the chat history
|
||||
class History |
||||
{ |
||||
public: |
||||
struct HistMessage |
||||
{ |
||||
HistMessage(qint64 id, bool isSent, QDateTime timestamp, QString chat, QString dispName, QString sender, QString message) : |
||||
chat{chat}, sender{sender}, message{message}, dispName{dispName}, timestamp{timestamp}, id{id}, isSent{isSent} {} |
||||
|
||||
QString chat; |
||||
QString sender; |
||||
QString message; |
||||
QString dispName; |
||||
QDateTime timestamp; |
||||
qint64 id; |
||||
bool isSent; |
||||
}; |
||||
|
||||
public: |
||||
/// Opens the profile database and prepares to work with the history
|
||||
/// If password is empty, the database will be opened unencrypted
|
||||
History(const QString& profileName, const QString& password); |
||||
/// Opens the profile database, and import from the old database
|
||||
/// If password is empty, the database will be opened unencrypted
|
||||
History(const QString& profileName, const QString& password, const HistoryKeeper& oldHistory); |
||||
~History(); |
||||
/// Checks if the database was opened successfully
|
||||
bool isValid(); |
||||
/// Imports messages from the old history file
|
||||
void import(const HistoryKeeper& oldHistory); |
||||
/// Changes the database password, will encrypt or decrypt if necessary
|
||||
void setPassword(const QString& password); |
||||
/// Moves the database file on disk to match the new name
|
||||
void rename(const QString& newName); |
||||
|
||||
/// Erases all the chat history from the database
|
||||
void eraseHistory(); |
||||
/// Erases the chat history with one friend
|
||||
void removeFriendHistory(const QString& friendPk); |
||||
/// Saves a chat message in the database
|
||||
void addNewMessage(const QString& friendPk, const QString& message, const QString& sender, |
||||
const QDateTime &time, bool isSent, QString dispName, |
||||
std::function<void(int64_t)> insertIdCallback={}); |
||||
/// Fetches chat messages from the database
|
||||
QList<HistMessage> getChatHistory(const QString& friendPk, const QDateTime &from, const QDateTime &to); |
||||
/// Marks a message as sent, removing it from the faux-offline pending messages list
|
||||
void markAsSent(qint64 id); |
||||
|
||||
protected: |
||||
/// Makes sure the history tables are created
|
||||
void init(); |
||||
static QString getDbPath(const QString& profileName); |
||||
QVector<RawDatabase::Query> generateNewMessageQueries(const QString& friendPk, const QString& message, |
||||
const QString& sender, const QDateTime &time, bool isSent, QString dispName, |
||||
std::function<void(int64_t)> insertIdCallback={}); |
||||
|
||||
private: |
||||
RawDatabase db; |
||||
// Cached mappings to speed up message saving
|
||||
QHash<QString, int64_t> peers; ///< Maps friend public keys to unique IDs by index
|
||||
}; |
||||
|
||||
#endif // HISTORY_H
|
||||
Loading…
Reference in new issue