Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
2ec1914
refactor(log): replace utility/log with imp/log in shm.cpp
mutouyun Dec 15, 2025
bf62216
refactor(log): replace utility/log with imp/log in prod_cons.h and qu…
mutouyun Dec 15, 2025
309baf7
refactor(log): change log level from warning to debug in prod_cons.h
mutouyun Dec 15, 2025
6143c23
refactor(log): replace utility/log with imp/log in ipc.cpp
mutouyun Dec 15, 2025
e9a7dba
refactor(log): replace utility/log with imp/log in all platform and s…
mutouyun Dec 15, 2025
0c4421d
refactor(log): fix remaining complex log format calls
mutouyun Dec 15, 2025
2983549
fix(log): add missing LIBIPC_LOG() in posix get_wait_time.h
mutouyun Dec 15, 2025
2ff5c94
fix(log): add missing LIBIPC_LOG() to all functions using log interface
mutouyun Dec 15, 2025
1664526
fix(log): fix malformed log calls and add missing LIBIPC_LOG() in shm…
mutouyun Dec 15, 2025
2b1ed4b
fix(log): remove remaining format specifiers and fix malformed log calls
mutouyun Dec 15, 2025
73d59ba
fix(log): add missing LIBIPC_LOG() and fix lambda log capture
mutouyun Dec 15, 2025
0f8bd34
fix(log): add missing LIBIPC_LOG() in get_info member function
mutouyun Dec 15, 2025
66a66f1
fix(log): fix Windows platform compilation errors
mutouyun Dec 15, 2025
afd1467
fix(log): use template to pass logger to initiator and fix format error
mutouyun Dec 15, 2025
72eedeb
fix(log): use constructor template instead of class template for init…
mutouyun Dec 15, 2025
ab8e6c7
fix(log): move sa_initiator struct outside get_sa() function
mutouyun Dec 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions src/libipc/ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "libipc/rw_lock.h"
#include "libipc/waiter.h"

#include "libipc/utility/log.h"
#include "libipc/imp/log.h"
#include "libipc/utility/id_pool.h"
#include "libipc/utility/scope_guard.h"
#include "libipc/utility/utility.h"
Expand Down Expand Up @@ -76,6 +76,7 @@ ipc::buff_t make_cache(T &data, std::size_t size) {
}

acc_t *cc_acc(std::string const &pref) {
LIBIPC_LOG();
static auto *phs = new ipc::unordered_map<std::string, ipc::shm::handle>; // no delete
static std::mutex lock;
std::lock_guard<std::mutex> guard {lock};
Expand All @@ -84,7 +85,7 @@ acc_t *cc_acc(std::string const &pref) {
std::string shm_name {ipc::make_prefix(pref, "CA_CONN__")};
ipc::shm::handle h;
if (!h.acquire(shm_name.c_str(), sizeof(acc_t))) {
ipc::error("[cc_acc] acquire failed: %s\n", shm_name.c_str());
log.error("[cc_acc] acquire failed: ", shm_name);
return nullptr;
}
it = phs->emplace(pref, std::move(h)).first;
Expand Down Expand Up @@ -217,17 +218,19 @@ auto& chunk_storages() {
std::mutex lock_;

static bool make_handle(ipc::shm::handle &h, std::string const &shm_name, std::size_t chunk_size) {
LIBIPC_LOG();
if (!h.valid() &&
!h.acquire( shm_name.c_str(),
sizeof(chunk_info_t) + chunk_info_t::chunks_mem_size(chunk_size) )) {
ipc::error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = %zd\n", chunk_size);
log.error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = ", chunk_size);
return false;
}
return true;
}

public:
chunk_info_t *get_info(conn_info_head *inf, std::size_t chunk_size) {
LIBIPC_LOG();
std::string pref {(inf == nullptr) ? std::string{} : inf->prefix_};
std::string shm_name {ipc::make_prefix(pref, "CHUNK_INFO__", chunk_size)};
ipc::shm::handle *h;
Expand All @@ -240,7 +243,7 @@ auto& chunk_storages() {
}
auto *info = static_cast<chunk_info_t*>(h->get());
if (info == nullptr) {
ipc::error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = %zd\n", chunk_size);
log.error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = ", chunk_size);
return nullptr;
}
return info;
Expand Down Expand Up @@ -290,8 +293,9 @@ std::pair<ipc::storage_id_t, void*> acquire_storage(conn_info_head *inf, std::si
}

void *find_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size) {
LIBIPC_LOG();
if (id < 0) {
ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
log.error("[find_storage] id is invalid: id = ", (long)id, ", size = ", size);
return nullptr;
}
std::size_t chunk_size = calc_chunk_size(size);
Expand All @@ -301,8 +305,9 @@ void *find_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size)
}

void release_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size) {
LIBIPC_LOG();
if (id < 0) {
ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
log.error("[release_storage] id is invalid: id = ", (long)id, ", size = ", size);
return;
}
std::size_t chunk_size = calc_chunk_size(size);
Expand Down Expand Up @@ -334,8 +339,9 @@ bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::broadcast>,

template <typename Flag>
void recycle_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) {
LIBIPC_LOG();
if (id < 0) {
ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
log.error("[recycle_storage] id is invalid: id = ", (long)id, ", size = ", size);
return;
}
std::size_t chunk_size = calc_chunk_size(size);
Expand All @@ -355,11 +361,12 @@ void recycle_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size

template <typename MsgT>
bool clear_message(conn_info_head *inf, void* p) {
LIBIPC_LOG();
auto msg = static_cast<MsgT*>(p);
if (msg->storage_) {
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
if (r_size <= 0) {
ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size);
log.error("[clear_message] invalid msg size: ", (int)r_size);
return true;
}
release_storage(*reinterpret_cast<ipc::storage_id_t*>(&msg->data_),
Expand Down Expand Up @@ -518,33 +525,34 @@ static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm

template <typename F>
static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t size) {
LIBIPC_LOG();
if (data == nullptr || size == 0) {
ipc::error("fail: send(%p, %zd)\n", data, size);
log.error("fail: send(", data, ", ", size, ")");
return false;
}
auto que = queue_of(h);
if (que == nullptr) {
ipc::error("fail: send, queue_of(h) == nullptr\n");
log.error("fail: send, queue_of(h) == nullptr");
return false;
}
if (que->elems() == nullptr) {
ipc::error("fail: send, queue_of(h)->elems() == nullptr\n");
log.error("fail: send, queue_of(h)->elems() == nullptr");
return false;
}
if (!que->ready_sending()) {
ipc::error("fail: send, que->ready_sending() == false\n");
log.error("fail: send, que->ready_sending() == false");
return false;
}
ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed);
if (conns == 0) {
ipc::error("fail: send, there is no receiver on this connection.\n");
log.error("fail: send, there is no receiver on this connection.");
return false;
}
// calc a new message id
conn_info_t *inf = info_of(h);
auto acc = inf->acc();
if (acc == nullptr) {
ipc::error("fail: send, info_of(h)->acc() == nullptr\n");
log.error("fail: send, info_of(h)->acc() == nullptr");
return false;
}
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
Expand All @@ -558,7 +566,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0);
}
// try using message fragment
//ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
//log.debug("fail: shm::handle for big message. msg_id: ", msg_id, ", size: ", size);
}
// push message fragment
std::int32_t offset = 0;
Expand All @@ -581,14 +589,15 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
}

static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
return send([tm](auto *info, auto *que, auto msg_id) {
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
LIBIPC_LOG();
return send([tm, &log](auto *info, auto *que, auto msg_id) {
return [tm, &log, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(
[](void*) { return true; },
info->cc_id_, msg_id, remain, data, size);
}, tm)) {
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
log.debug("force_push: msg_id = ", msg_id, ", remain = ", remain, ", size = ", size);
if (!que->force_push(
[info](void* p) { return clear_message<typename queue_t::value_t>(info, p); },
info->cc_id_, msg_id, remain, data, size)) {
Expand Down Expand Up @@ -618,9 +627,10 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
}

static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
LIBIPC_LOG();
auto que = queue_of(h);
if (que == nullptr) {
ipc::error("fail: recv, queue_of(h) == nullptr\n");
log.error("fail: recv, queue_of(h) == nullptr");
return {};
}
if (!que->connected()) {
Expand Down Expand Up @@ -648,7 +658,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
// msg.remain_ may minus & abs(msg.remain_) < data_length
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg.remain_;
if (r_size <= 0) {
ipc::error("fail: recv, r_size = %d\n", (int)r_size);
log.error("fail: recv, r_size = ", (int)r_size);
return {};
}
std::size_t msg_size = static_cast<std::size_t>(r_size);
Expand All @@ -669,7 +679,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
que->connected_id()
});
if (r_info == nullptr) {
ipc::log("fail: ipc::mem::$new<recycle_t>.\n");
log.error("fail: ipc::mem::$new<recycle_t>.");
return ipc::buff_t{buf, msg_size}; // no recycle
} else {
return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) {
Expand All @@ -685,7 +695,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
}, r_info};
}
} else {
ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
log.error("fail: shm::handle for large message. msg_id: ", msg.id_, ", buf_id: ", buf_id, ", size: ", msg_size);
continue;
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/libipc/platform/linux/condition.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "libipc/utility/log.h"
#include "libipc/imp/log.h"
#include "libipc/mutex.h"

#include "get_wait_time.h"
Expand All @@ -19,20 +19,20 @@ class condition : public sync::obj_impl<a0_cnd_t> {
~condition() = default;

bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept {
LIBIPC_LOG();
if (!valid()) return false;
if (tm == invalid_value) {
int eno = A0_SYSERR(a0_cnd_wait(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition wait[%d]\n", eno);
log.error("fail condition wait[", eno, "]");
return false;
}
} else {
auto ts = linux_::detail::make_timespec(tm);
int eno = A0_SYSERR(a0_cnd_timedwait(native(), static_cast<a0_mtx_t *>(mtx.native()), {ts}));
if (eno != 0) {
if (eno != ETIMEDOUT) {
ipc::error("fail condition timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);
log.error("fail condition timedwait[", eno, "]: tm = ", tm, ", tv_sec = ", ts.tv_sec, ", tv_nsec = ", ts.tv_nsec);
}
return false;
}
Expand All @@ -41,20 +41,22 @@ class condition : public sync::obj_impl<a0_cnd_t> {
}

bool notify(ipc::sync::mutex &mtx) noexcept {
LIBIPC_LOG();
if (!valid()) return false;
int eno = A0_SYSERR(a0_cnd_signal(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition notify[%d]\n", eno);
log.error("fail condition notify[", eno, "]");
return false;
}
return true;
}

bool broadcast(ipc::sync::mutex &mtx) noexcept {
LIBIPC_LOG();
if (!valid()) return false;
int eno = A0_SYSERR(a0_cnd_broadcast(native(), static_cast<a0_mtx_t *>(mtx.native())));
if (eno != 0) {
ipc::error("fail condition broadcast[%d]\n", eno);
log.error("fail condition broadcast[", eno, "]");
return false;
}
return true;
Expand Down
13 changes: 7 additions & 6 deletions src/libipc/platform/linux/get_wait_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <cinttypes>
#include <system_error>

#include "libipc/utility/log.h"
#include "libipc/imp/log.h"

#include "a0/time.h"
#include "a0/err_macro.h"
Expand All @@ -14,30 +14,31 @@ namespace linux_ {
namespace detail {

inline bool calc_wait_time(timespec &ts, std::uint64_t tm /*ms*/) noexcept {
LIBIPC_LOG();
std::int64_t add_ns = static_cast<std::int64_t>(tm * 1000000ull);
if (add_ns < 0) {
ipc::error("invalid time = " PRIu64 "\n", tm);
log.error("invalid time = ", tm);
return false;
}
a0_time_mono_t now;
int eno = A0_SYSERR(a0_time_mono_now(&now));
if (eno != 0) {
ipc::error("fail get time[%d]\n", eno);
log.error("fail get time[", eno, "]");
return false;
}
a0_time_mono_t *target = reinterpret_cast<a0_time_mono_t *>(&ts);
if ((eno = A0_SYSERR(a0_time_mono_add(now, add_ns, target))) != 0) {
ipc::error("fail get time[%d]\n", eno);
log.error("fail get time[", eno, "]");
return false;
}
return true;
}

inline timespec make_timespec(std::uint64_t tm /*ms*/) noexcept(false) {
LIBIPC_LOG();
timespec ts {};
if (!calc_wait_time(ts, tm)) {
ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
tm, ts.tv_sec, ts.tv_nsec);
log.error("fail calc_wait_time: tm = ", tm, ", tv_sec = ", ts.tv_sec, ", tv_nsec = ", ts.tv_nsec);
throw std::system_error{static_cast<int>(errno), std::system_category()};
}
return ts;
Expand Down
19 changes: 11 additions & 8 deletions src/libipc/platform/linux/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <atomic>

#include "libipc/platform/detail.h"
#include "libipc/utility/log.h"
#include "libipc/imp/log.h"
#include "libipc/mem/resource.h"
#include "libipc/shm.h"

Expand All @@ -23,6 +23,7 @@ namespace sync {
class robust_mutex : public sync::obj_impl<a0_mtx_t> {
public:
bool lock(std::uint64_t tm) noexcept {
LIBIPC_LOG();
if (!valid()) return false;
for (;;) {
auto ts = linux_::detail::make_timespec(tm);
Expand All @@ -37,24 +38,25 @@ class robust_mutex : public sync::obj_impl<a0_mtx_t> {
case EOWNERDEAD: {
int eno2 = A0_SYSERR(a0_mtx_consistent(native()));
if (eno2 != 0) {
ipc::error("fail mutex lock[%d] -> consistent[%d]\n", eno, eno2);
log.error("fail mutex lock[", eno, "] -> consistent[", eno2, "]");
return false;
}
int eno3 = A0_SYSERR(a0_mtx_unlock(native()));
if (eno3 != 0) {
ipc::error("fail mutex lock[%d] -> unlock[%d]\n", eno, eno3);
log.error("fail mutex lock[", eno, "] -> unlock[", eno3, "]");
return false;
}
}
break; // loop again
default:
ipc::error("fail mutex lock[%d]\n", eno);
log.error("fail mutex lock[", eno, "]");
return false;
}
}
}

bool try_lock() noexcept(false) {
LIBIPC_LOG();
if (!valid()) return false;
int eno = A0_SYSERR(a0_mtx_timedlock(native(), {linux_::detail::make_timespec(0)}));
switch (eno) {
Expand All @@ -65,28 +67,29 @@ class robust_mutex : public sync::obj_impl<a0_mtx_t> {
case EOWNERDEAD: {
int eno2 = A0_SYSERR(a0_mtx_consistent(native()));
if (eno2 != 0) {
ipc::error("fail mutex try_lock[%d] -> consistent[%d]\n", eno, eno2);
log.error("fail mutex try_lock[", eno, "] -> consistent[", eno2, "]");
break;
}
int eno3 = A0_SYSERR(a0_mtx_unlock(native()));
if (eno3 != 0) {
ipc::error("fail mutex try_lock[%d] -> unlock[%d]\n", eno, eno3);
log.error("fail mutex try_lock[", eno, "] -> unlock[", eno3, "]");
break;
}
}
break;
default:
ipc::error("fail mutex try_lock[%d]\n", eno);
log.error("fail mutex try_lock[", eno, "]");
break;
}
throw std::system_error{eno, std::system_category()};
}

bool unlock() noexcept {
LIBIPC_LOG();
if (!valid()) return false;
int eno = A0_SYSERR(a0_mtx_unlock(native()));
if (eno != 0) {
ipc::error("fail mutex unlock[%d]\n", eno);
log.error("fail mutex unlock[", eno, "]");
return false;
}
return true;
Expand Down
Loading