PeakeCoin CrossChain Transfer - hive_bridge

avatar

Attempting to work on and debug our backend cross-chain swapping actions. We try and keep all code accessible.

hive_bridge/main.py



from common.config import HIVE_BRIDGE_ACCOUNT, PEK_TOKEN_SYMBOL
from common.logger import logger
from common.db import init_db, get_db

from beem import Hive
from beem.account import Account
from beem.nodelist import NodeList
import json as pyjson
import re
import signal
import sys
import time
import sqlite3

HIVE_NODES_FALLBACK = [
    "https://api.hive.blog",
    "https://anyx.io",
    "https://rpc.ausbit.dev",
    "https://hive.roelandp.nl",
]

# ---- graceful shutdown -------------------------------------------------------
_shutdown = False
def _handle_sigint(signum, frame):
    global _shutdown
    _shutdown = True
signal.signal(signal.SIGINT, _handle_sigint)
signal.signal(signal.SIGTERM, _handle_sigint)

# ---- simple KV state (last processed block) ----------------------------------
def _get_state(key: str, default=None):
    conn = get_db()
    try:
        c = conn.cursor()
        c.execute("CREATE TABLE IF NOT EXISTS state (key TEXT PRIMARY KEY, value TEXT)")
        c.execute("SELECT value FROM state WHERE key = ?", (key,))
        row = c.fetchone()
        return row[0] if row else default
    finally:
        conn.close()

def _set_state(key: str, value: str):
    conn = get_db()
    try:
        c = conn.cursor()
        c.execute("INSERT INTO state(key, value) VALUES(?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, str(value)))
        conn.commit()
    finally:
        conn.close()

# ---- event insert with dedupe ------------------------------------------------
def _insert_event(source: str, tx_id: str, amount: float, to_address: str):
    conn = get_db()
    try:
        c = conn.cursor()
        # Make sure your common.db init created a UNIQUE index like:
        # CREATE UNIQUE INDEX IF NOT EXISTS ux_events_source_txid ON events(source, tx_id);
        c.execute(
            "INSERT OR IGNORE INTO events (source, tx_id, amount, to_address, status) VALUES (?, ?, ?, ?, ?)",
            (source, tx_id, amount, to_address, 'pending')
        )
        conn.commit()
        return c.rowcount > 0
    finally:
        conn.close()

# ---- validation --------------------------------------------------------------
_polygon_addr_re = re.compile(r"^0x[a-fA-F0-9]{40}$")
def _is_polygon_address(s: str) -> bool:
    return bool(s and _polygon_addr_re.match(s.strip()))

# ---- hive client w/ rotation -------------------------------------------------
def _make_hive(nodes):
    # Do not broadcast, short timeouts, auto-retries
    return Hive(node=nodes, nobroadcast=True, timeout=30, num_retries=3)

def monitor_hive_transfers(poll_interval_sec: int = 2, backoff_max: int = 16):
    logger.info("Monitoring Hive Engine for PEK → bridge transfers …")

    # Build node list (prefer live list, fall back to static)
    try:
        nl = NodeList()
        nl.update_nodes()
        hive_nodes = nl.get_nodes(normal=True, appbase=True) or HIVE_NODES_FALLBACK[:]
    except Exception:
        hive_nodes = HIVE_NODES_FALLBACK[:]

    hive = _make_hive(hive_nodes)
    backoff = 1

    # Start from last irreversible block to reduce dupes; persist progress
    last_processed = _get_state("hive_last_processed_block")
    try:
        dgp = hive.get_dynamic_global_properties()
        lib = dgp["last_irreversible_block_num"]
    except Exception as e:
        logger.warning(f"Could not fetch DGP at start: {e}")
        lib = None

    if last_processed is None:
        # First run: start a bit behind LIB (or head) to be safe
        try:
            head = hive.get_dynamic_global_properties()["head_block_number"]
        except Exception:
            head = None
        start_from = (lib or head or 0) - 5
        last_processed = max(start_from, 0)
        _set_state("hive_last_processed_block", last_processed)
        logger.info(f"Initialized last_processed to {last_processed}")

    last_processed = int(last_processed)

    while not _shutdown:
        try:
            dgp = hive.get_dynamic_global_properties()
            head = dgp["head_block_number"]
            lib = dgp["last_irreversible_block_num"]

            # Only process up to LIB to avoid reorgs
            target = lib

            if target <= last_processed:
                time.sleep(poll_interval_sec)
                continue

            # Process blocks (cap each loop to avoid huge catch-ups)
            end = min(last_processed + 200, target)
            for block_num in range(last_processed + 1, end + 1):
                block = hive.rpc.get_block(block_num)
                if not block:
                    continue

                txs = block.get("transactions", [])
                for tx in txs:
                    # NOTE: HIVE "transactions" don't always include tx_id here.
                    # We derive tx_id from the operation receipt returned by nodes that provide it,
                    # but some RPCs won't. Fallback: compose a pseudo-id for dedupe.
                    tx_id = tx.get("transaction_id") or f"{block_num}:{hash(pyjson.dumps(tx, sort_keys=True))}"

                    for op in tx.get("operations", []):
                        if not isinstance(op, (list, tuple)) or len(op) != 2:
                            continue
                        op_name, op_data = op
                        if op_name != "custom_json":
                            continue
                        if op_data.get("id") != "ssc-mainnet-hive":
                            continue

                        raw_json = op_data.get("json")
                        try:
                            data = pyjson.loads(raw_json) if isinstance(raw_json, str) else (raw_json or {})
                        except Exception as e:
                            logger.debug(f"custom_json parse error at block {block_num}: {e}")
                            continue

                        # Looking for: tokens.transfer to bridge account with PEK symbol
                        if data.get("contractName") == "tokens" and data.get("contractAction") == "transfer":
                            payload = data.get("contractPayload") or {}
                            if (
                                payload.get("symbol") == PEK_TOKEN_SYMBOL
                                and payload.get("to") == HIVE_BRIDGE_ACCOUNT
                            ):
                                # quantity can be string; memo should be polygon address
                                try:
                                    amount = float(payload.get("quantity", 0))
                                except Exception:
                                    amount = 0.0
                                memo = (payload.get("memo") or "").strip()

                                if not _is_polygon_address(memo):
                                    logger.warning(
                                        f"Ignoring transfer (invalid Polygon memo): memo='{memo}' tx={tx_id} blk={block_num}"
                                    )
                                    continue

                                if _insert_event("hive", tx_id, amount, memo):
                                    logger.info(
                                        f"Bridge event recorded: {amount:g} {PEK_TOKEN_SYMBOL} → {memo} (tx={tx_id}, blk={block_num})"
                                    )
                                else:
                                    logger.debug(f"Duplicate event ignored (tx={tx_id})")

                last_processed = block_num
                if block_num % 20 == 0:
                    _set_state("hive_last_processed_block", last_processed)

            # Persist after each batch
            _set_state("hive_last_processed_block", last_processed)

            # Healthy pass → reset backoff
            backoff = 1
            time.sleep(poll_interval_sec)

        except Exception as e:
            logger.error(f"Hive monitor error: {e}")
            # Rotate nodes
            try:
                hive_nodes.append(hive_nodes.pop(0))
                hive = _make_hive(hive_nodes)
            except Exception as e2:
                logger.error(f"Node rotation failed: {e2}")

            # Backoff (capped)
            time.sleep(backoff)
            backoff = min(backoff * 2, backoff_max)

    logger.info("Shutdown requested; saving state …")
    _set_state("hive_last_processed_block", last_processed)

if __name__ == "__main__":
    # Ensure DB has the structures we need. Your common.db should create:
    #  - events(id PK, source TEXT, tx_id TEXT, amount REAL, to_address TEXT, status TEXT, created_at default now)
    #  - UNIQUE index on (source, tx_id)
    #  - state(key PRIMARY KEY, value TEXT)
    init_db()
    monitor_hive_transfers()


0
0
0.000
0 comments