Skip to content

Commit 6d48e0e

Browse files
authored
Merge pull request #31 from reu/replication-messages
Proper decoding logical replication messages
2 parents f0b4df2 + 94154ba commit 6d48e0e

File tree

1 file changed

+54
-1
lines changed

1 file changed

+54
-1
lines changed

postgres-replication/src/protocol.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
1212

1313
// logical replication message tags
1414
const BEGIN_TAG: u8 = b'B';
15+
const MESSAGE_TAG: u8 = b'M';
1516
const COMMIT_TAG: u8 = b'C';
1617
const ORIGIN_TAG: u8 = b'O';
1718
const RELATION_TAG: u8 = b'R';
@@ -165,7 +166,9 @@ impl PrimaryKeepAliveBody {
165166
pub enum LogicalReplicationMessage {
166167
/// A BEGIN statement
167168
Begin(BeginBody),
168-
/// A BEGIN statement
169+
/// A logical decoding message
170+
Message(MessageBody),
171+
/// A COMMIT statement
169172
Commit(CommitBody),
170173
/// An Origin replication message
171174
/// Note that there can be multiple Origin messages inside a single transaction.
@@ -199,6 +202,21 @@ impl LogicalReplicationMessage {
199202
timestamp: buf.read_i64::<BigEndian>()?,
200203
xid: buf.read_u32::<BigEndian>()?,
201204
}),
205+
MESSAGE_TAG => Self::Message(MessageBody {
206+
flags: buf.read_i8()?,
207+
message_lsn: buf.read_u64::<BigEndian>()?,
208+
prefix: buf.read_cstr()?,
209+
content: match buf.read_i32::<BigEndian>()? {
210+
len if len > 0 => buf.read_buf(len as usize)?,
211+
0 => Bytes::new(),
212+
len => {
213+
return Err(io::Error::new(
214+
io::ErrorKind::InvalidInput,
215+
format!("unexpected message content length `{len}`"),
216+
))
217+
}
218+
},
219+
}),
202220
COMMIT_TAG => Self::Commit(CommitBody {
203221
flags: buf.read_i8()?,
204222
commit_lsn: buf.read_u64::<BigEndian>()?,
@@ -491,6 +509,41 @@ impl BeginBody {
491509
}
492510
}
493511

512+
/// A logical decoding message
513+
#[derive(Debug)]
514+
pub struct MessageBody {
515+
message_lsn: u64,
516+
flags: i8,
517+
prefix: Bytes,
518+
content: Bytes,
519+
}
520+
521+
impl MessageBody {
522+
#[inline]
523+
/// The LSN of the logical decoding message.
524+
pub fn message_lsn(&self) -> Lsn {
525+
self.message_lsn
526+
}
527+
528+
#[inline]
529+
/// Flags. Currently can be either 0 for no flags or 1 if the logical decoding message is transactional.
530+
pub fn flags(&self) -> i8 {
531+
self.flags
532+
}
533+
534+
#[inline]
535+
/// The prefix of the logical decoding message.
536+
pub fn prefix(&self) -> io::Result<&str> {
537+
get_str(&self.prefix)
538+
}
539+
540+
#[inline]
541+
/// The content of the logical decoding message.
542+
pub fn content(&self) -> &Bytes {
543+
&self.content
544+
}
545+
}
546+
494547
/// A COMMIT statement
495548
#[derive(Debug)]
496549
pub struct CommitBody {

0 commit comments

Comments
 (0)