This week I made a program to alert the @steembasicincome development team of any new memo transactions.
First I start with the logic flow of the program. First we want to get the datetime object from the last time we ran the program (saved to a file). Then, we want to get all memos to our account since that time. If there were any new memos, we want to send an email alert containing these memos. Then we want to update the file
# Get datetime of last transfer memo check
datetime_last_memo_check = get_datetime_last_memo_check()
# Get all memos since last memo check
memos = get_account_memos(account, from_date=datetime_last_memo_check)
# Send email containing transfer memos, if any
if len(memos):
send_memo_alert_email(memos, to_email=recipient)
# Record current datetime, because we just check the memos
save_datetime_of_memo_check(now)
We use the beem
python package to find the transfer transactions from the @steembasicincome account. We use the only_ops
parameter to filter for transfer transactions and the start
parameter so we only get transfer memos that were sent since the last run.
def get_account_memos(account, from_date):
hive_account = Account(account)
ignore_accounts = ['reward.app']
transactions = []
for transaction in hive_account.history(only_ops=['transfer'], start=from_date):
if transaction['from'] not in ignore_accounts:
if int(transaction['amount']['amount']) < 1000:
transactions.append("- (HIVE) From '{}': {}".format(transaction['from'], transaction['memo']))
return transactions
The collected memos are then formatted and sent to the recipient using gmail as an SMTP server.
def send_memo_alert_email(memos, to_email):
msg = MIMEText("\n\n".join(memos))
msg['Subject'] = 'Daily steembasicincome Transaction Report'
msg['From'] = 'from@example.com' # Not real
msg['To'] = 'to@example.com' # Not real
s = smtplib.SMTP('smtp.gmail.com', port=587)
s.starttls()
s.login('email', 'password') # Not real
s.sendmail('from address', ['to address'], msg.as_string()) # Not real
s.quit()
Full Code
import datetime
import pickle
from beem.account import Account
import smtplib
from email.mime.text import MIMEText
def get_datetime_last_memo_check():
with open('sbi/last_transaction_date.pkl', 'rb') as f:
return pickle.load(f)
def get_account_memos(account, from_date):
hive_account = Account(account)
ignore_accounts = ['reward.app']
transactions = []
for transaction in hive_account.history(only_ops=['transfer'], start=from_date):
if transaction['from'] not in ignore_accounts:
if int(transaction['amount']['amount']) < 1000:
transactions.append("- (HIVE) From '{}': {}".format(transaction['from'], transaction['memo']))
return transactions
def send_memo_alert_email(memos, to_email):
msg = MIMEText("\n\n".join(memos))
msg['Subject'] = 'Daily steembasicincome Transaction Report'
msg['From'] = 'from@example.com' # Not real
msg['To'] = 'to@example.com' # Not real
s = smtplib.SMTP('smtp.gmail.com', port=587)
s.starttls()
s.login('email', 'password') # Not real
s.sendmail('from address', ['to address'], msg.as_string()) # Not real
s.quit()
def save_datetime_of_memo_check(datetime):
with open('sbi/last_transaction_date.pkl', 'wb') as f:
pickle.dump(datetime.now(), f)
account = 'steembasicincome'
recipient = 'example@example.com'
now = datetime.datetime.now()
# Get datetime of last memo check
datetime_last_memo_check = get_datetime_last_memo_check()
# Get all memos since last memo check
memos = get_account_memos(account, from_date=datetime_last_memo_check)
# Send email containing memos
if len(memos):
send_memo_alert_email(memos, to_email=recipient)
# Record current datetime, because we just check the memos
save_datetime_of_memo_check(now)