PeakeCoin Matic - pek_monitor

I have been attempting to make a bot to monitor @peakecoin.matic transactions to facilitate transactions to the Polygon blockchain.

I can not figure out a way to make this work @thecrazygm, @eoinstant, @powerpaul, @txracer, @aggroed, @cryptomancer I could use some of that brain power, lol. I am stumped, or exhausted.

To operate the peakecoin monitor app you have to input

 python pek_monitor.py monitor
or
 python pek_monitor.py all

#!/usr/bin/env python3
import time
import json
import argparse
from typing import Dict, List, Tuple
import requests

ACCOUNT = "peakecoin.matic"
TOKEN_SYMBOL = "PEK"
POLL_SECONDS = 10
REQUEST_TIMEOUT = 12
HE_CONTRACTS_RPC = "https://api.hive-engine.com/rpc/contracts"  # for find/aggregate
HE_BLOCKCHAIN_RPC = "https://api.hive-engine.com/rpc"           # for blockchain.getBlock, etc.

# ---------- Low-level RPC helpers ----------
def he_contracts_find(table: str, query: Dict, limit: int = 100, offset: int = 0) -> List[Dict]:
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "tokens",
            "table": table,
            "query": query,
            "limit": limit,
            "offset": offset,
            "indexes": []
        }
    }
    for attempt in range(3):
        try:
            r = requests.post(HE_CONTRACTS_RPC, json=payload, timeout=REQUEST_TIMEOUT)
            r.raise_for_status()
            data = r.json()
            return data.get("result", [])
        except Exception as e:
            if attempt == 2:
                raise
            time.sleep(1.5 * (attempt + 1))

def he_blockchain_rpc(method: str, params) -> Dict:
    payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
    for attempt in range(3):
        try:
            r = requests.post(HE_BLOCKCHAIN_RPC, json=payload, timeout=REQUEST_TIMEOUT)
            r.raise_for_status()
            return r.json()
        except Exception:
            if attempt == 2:
                raise
            time.sleep(1.5 * (attempt + 1))

# ---------- History (tokensHistory) ----------
def fetch_transfers_involving(account: str, symbol: str, limit_each: int = 500) -> List[Dict]:
    """
    Pull PEK transfers involving account from tokensHistory by querying both directions:
    - to == account
    - from == account
    Filter by operation='transfer' and symbol match.
    """
    q_common = {"operation": "transfer", "symbol": symbol}

    # Pull "to" transfers
    to_rows = he_contracts_find(
        table="tokensHistory",
        query={**q_common, "to": account},
        limit=limit_each,
        offset=0
    ) or []

    # Pull "from" transfers
    from_rows = he_contracts_find(
        table="tokensHistory",
        query={**q_common, "from": account},
        limit=limit_each,
        offset=0
    ) or []

    rows = (to_rows + from_rows)
    # Defensive: ensure only our symbol & op
    rows = [tx for tx in rows if tx.get("operation") == "transfer" and tx.get("symbol") == symbol]
    # Sort newest first by timestamp
    rows.sort(key=lambda x: int(x.get("timestamp", 0)), reverse=True)
    return rows

def print_recent_7d(account: str, symbol: str, limit_each: int = 500):
    now = int(time.time())
    seven_days_ago = now - 7 * 24 * 60 * 60
    rows = fetch_transfers_involving(account, symbol, limit_each=limit_each)
    recent = [tx for tx in rows if int(tx.get("timestamp", 0)) >= seven_days_ago]

    print(f"{symbol} transfers involving {account} in the last 7 days (count={len(recent)}):")
    for tx in recent:
        ts = int(tx.get("timestamp", 0))
        direction = "IN " if tx.get("to") == account else "OUT"
        print(
            f"[{ts}] {direction} qty={tx.get('quantity')} from={tx.get('from')} to={tx.get('to')} "
            f"memo={tx.get('memo','')} txid={tx.get('transactionId') or tx.get('txId')}"
        )

def print_all_history(account: str, symbol: str, limit_each: int = 200):
    rows = fetch_transfers_involving(account, symbol, limit_each=limit_each)
    print(f"All {symbol} transfers involving {account} (up to ~{2*limit_each} rows):")
    for tx in rows:
        ts = int(tx.get("timestamp", 0))
        direction = "IN " if tx.get("to") == account else "OUT"
        print(
            f"[{ts}] {direction} qty={tx.get('quantity')} from={tx.get('from')} to={tx.get('to')} "
            f"memo={tx.get('memo','')} txid={tx.get('transactionId') or tx.get('txId')}"
        )

# ---------- Blockchain debug ----------
def get_latest_block_number() -> int:
    """
    Try multiple shapes:
    - {"result":{"blockNumber": N}}
    - {"blockNumber": N}
    - {"result": N}
    """
    resp = he_blockchain_rpc("blockchain.getLatestBlock", [])
    # Shape #1
    if isinstance(resp, dict):
        if "result" in resp:
            r = resp["result"]
            if isinstance(r, dict) and "blockNumber" in r:
                return int(r["blockNumber"])
            if isinstance(r, int):
                return int(r)
        # Some nodes respond with flat field (rare)
        if "blockNumber" in resp:
            return int(resp["blockNumber"])
    raise RuntimeError(f"Unexpected latest block response shape: {json.dumps(resp, ensure_ascii=False)[:4000]}")

def get_block(block_num: int) -> Dict:
    resp = he_blockchain_rpc("blockchain.getBlock", [block_num])
    return resp.get("result", resp)

def debug_blocks(start: int, end: int):
    """
    Print raw block JSON for an inclusive range [start, end].
    """
    print(f"DEBUG: dumping raw blocks {start}..{end}")
    for b in range(start, end + 1):
        blk = get_block(b)
        print(f"\n=== BLOCK {b} ===")
        print(json.dumps(blk, indent=2, ensure_ascii=False)[:200000])  # cap very large outputs

# ---------- Monitor (optional streaming) ----------
def monitor_incoming(account: str, symbol: str):
    """
    Poll tokensHistory for new INCOMING transfers only (to == account).
    Uses a timestamp watermark + (txid, logIndex) dedupe.
    """
    print(f"Monitoring {account} for received {symbol} transfers...")
    seen: set[Tuple[str, int]] = set()
    last_seen_ts = 0

    while True:
        try:
            rows = he_contracts_find(
                table="tokensHistory",
                query={"operation": "transfer", "symbol": symbol, "to": account},
                limit=100,
                offset=0
            ) or []
            rows.sort(key=lambda x: int(x.get("timestamp", 0)), reverse=True)

            for tx in rows:
                ts = int(tx.get("timestamp", 0))
                if ts <= last_seen_ts:
                    continue
                txid = tx.get("transactionId") or tx.get("txId")
                idx = int(tx.get("logIndex", 0)) if "logIndex" in tx else 0
                key = (txid or "", idx)
                if not txid or key in seen:
                    continue

                print(
                    f"[{ts}] IN qty={tx.get('quantity')} from={tx.get('from')} "
                    f"memo={tx.get('memo','')} txid={txid}"
                )
                seen.add(key)

            if rows:
                newest_ts = int(rows[0].get("timestamp", last_seen_ts))
                if newest_ts > last_seen_ts:
                    last_seen_ts = newest_ts

            # prune the dedupe set occasionally
            if len(seen) > 5000:
                seen = set(list(seen)[-2000:])

        except Exception as e:
            print(f"ERROR: {e}")

        time.sleep(POLL_SECONDS)

# ---------- CLI ----------
def main():
    ap = argparse.ArgumentParser(description="Hive-Engine PEK history + block debug (single file).")
    ap.add_argument("--account", default=ACCOUNT, help="account to inspect (default: peakecoin.matic)")
    ap.add_argument("--symbol", default=TOKEN_SYMBOL, help="token symbol (default: PEK)")

    sub = ap.add_subparsers(dest="cmd", required=True)

    sub.add_parser("recent7d", help="print PEK transfers involving account from the last 7 days") \
        .add_argument("--limit-each", type=int, default=500)

    sub.add_parser("all", help="print all recent PEK transfers involving account (bounded by API limits)") \
        .add_argument("--limit-each", type=int, default=200)

    mon = sub.add_parser("monitor", help="stream INCOMING transfers to the account")
    mon.add_argument("--poll", type=int, default=10)

    blk = sub.add_parser("blocks", help="dump raw blocks in a range (inclusive)")
    blk.add_argument("--start", type=int, required=True)
    blk.add_argument("--end", type=int, required=True)

    latest = sub.add_parser("latest", help="print latest block number")

    args = ap.parse_args()

    if args.cmd == "recent7d":
        print_recent_7d(args.account, args.symbol, limit_each=args.limit_each)
    elif args.cmd == "all":
        print_all_history(args.account, args.symbol, limit_each=args.limit_each)
    elif args.cmd == "monitor":
        global POLL_SECONDS
        POLL_SECONDS = args.poll
        monitor_incoming(args.account, args.symbol)
    elif args.cmd == "blocks":
        debug_blocks(args.start, args.end)
    elif args.cmd == "latest":
        n = get_latest_block_number()
        print(f"Latest block number: {n}")

if __name__ == "__main__":
    main()


0
0
0.000
8 comments
avatar

It's not working because there is no contract rpc calls for what you are trying to do. If you are wanting to parse the account history, you have to call a history api, as far as I know there is only one: https://accounts.hive-engine.com/accountHistory you can read more about it on the github for the history api

It should look something like this:

response = requests.get(f"https://accounts.hive-engine.com/accountHistory?account={account}&limit={limit}&offset={offset}&symbol={symbol}")

And it should give you a response that looks something like:

image.png

Unfortunately this means you are going to have to start this script from scratch as the only call that looks valid in it is the one to fetch the latest block (but you are not parsing the output correctly so you will always get the exception in that.)

0
0
0.000