Scheduled SMS notifications service
by Afanasy Barbarov
The problem: Send an SMS on a regular basis
This might be straightforward at first glance - just create an SNS topic, subscribe and send notifications. You'll end with something like this:
import random
import boto3
all_texts = {
1: u'Message 1',
2: u'Message 2',
3: u'Message 3',
4: u'Message 4',
5: u'Message 5',
}
random.seed()
text_of_the_day = random.randint(1, 5)
session = boto3.Session(region_name="eu-west-1")
sns_client = session.client('sns')
def lambda_handler(event, context):
topic_arn = '<YOUR_ARN>'
sns_client.subscribe(TopicArn=topic_arn, Protocol='sms',
Endpoint=event["phone_number"])
sns_client.publish(Message=all_texts[text_of_the_day],
TopicArn=topic_arn)
return 'OK'But for some reason AWS cannot send SMS to my phone operator - despite the lambda worked perfectly, the logs said that "Phone is currently unreachable/unavailable" - and, of course, that wasn't true. I didn't want to investigate what's wrong with AWS or my phone, or make that once more for a new phone number - just wanted to send an SMS. I ended up with a third-party service, called twilio, which worked out the box. I'll skip registering for an account and obtaining a token since there was no problem at all. So let me stop talking and just show the code:
import base64
import json
import os
import random
import urllib
from urllib import request, parse
TWILIO_SMS_URL = "https://api.twilio.com/2010-04-01/
Accounts/{}/Messages.json"
TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID")
TWILIO_AUTH_TOKEN = os.environ.get("TWILIO_AUTH_TOKEN")
all_texts = {
1: u'Message 1',
2: u'Message 2',
3: u'Message 3',
4: u'Message 4',
5: u'Message 5',
}
random.seed()
def lambda_handler(event, context):
text_of_the_day = random.randint(1, 5)
to_number = '<YOUR_NUMBER>'
from_number = '<YOUR_TWILIO_NUMBER>'
body = all_texts[text_of_the_day]
if not TWILIO_ACCOUNT_SID:
return "Unable to access Twilio Account SID."
elif not TWILIO_AUTH_TOKEN:
return "Unable to access Twilio Auth Token."
populated_url = TWILIO_SMS_URL.format(TWILIO_ACCOUNT_SID)
post_params = {"To": to_number, "From": from_number, "Body": body}
data = parse.urlencode(post_params).encode()
req = request.Request(populated_url)
authentication = "{}:{}".
format(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
base64string = base64.b64encode(authentication.encode('utf-8'))
req.add_header("Authorization",
"Basic %s" % base64string.decode('ascii'))
try:
with request.urlopen(req, data) as f:
print("Twilio returned {}".
format(str(f.read().decode('utf-8'))))
except Exception as e:
return e
return "SMS sent successfully!"Besides creating a new lambda function, I needed to set the environment variables for the lambda for security reasons.
And another easy part was creating a recurring event. I registered a new CloudWatch event with cron expression 0 6 ? * * * more about scheduled events in AWS - which gives me everyday notification at 6 AM UTC. Note, that there is one more field in the pattern, comparing the traditional cron in Unix.
That's all, folks!