My Problem
While there’s a slew of automated trading solutions for both stocks and crypto, I wanted to try building my own as a learning exercise and to give myself more control. A few of the solutions boast some form of ML-driven logic but most use classic standards (RSI, MACD, etc). This is fine but it really makes me question their value. Most of these solutions aren’t very customizable or are priced beyond their worth for lower-volume traders who aren’t moving hundreds of thousands of dollars per month.
My (experimental) Solution: A Simple CryptoBot
I took a stab at building a bot which uses the Coinbase Advanced SDK & API to gather price information, perform some common calculations, apply some weights and confidence to the calculated score, and then place a trade if conditions are right. If a trade is triggered, I receive an instant message to my Synology Chat client and all details are logged to a few MySQL tables. All of this runs on AWS (Lambda, EventBridge, and a MySQL database) and, minus the database, likely falls in standard AWS Free Tier. At a high level, the bot takes four actions:
Action 1: Fetch Pricing and Volume Data
The bot fetches candle data (open, high, low, close, volume) from the Coinbase API. It also fetches the current spot price for the pair of my choosing (I’m using BTC-USD for this example but I suggest picking something that has a bit more volatility if experimenting). Next, it queries recent trade decisions and counts trades it has already made within the last hour to enforce limits I set in the configuration of the script as part of risk mitigation.
Action 2: Perform Simple Technical Analysis
Using the data collected in Action 1, I compute the following:
- Simple Moving Averages (SMA): Short and long periods.
- Relative Strength Index (RSI): Identifies overbought or oversold conditions.
- MACD (Moving Average Convergence Divergence): Trend strength and momentum.
- Bollinger Bands: Volatility-based upper and lower price levels.
- Volume Analysis: Compares current and average trading volume.
Not all of these are high value signals for all scenarios. As such, each can be weighted or configured independently within the config section of the bot.
Action 3: Make a Trading Decision
Next, I analyze the indicators above to calculate a “confidence score” for BUY or SELL actions. Candle data has an exponential decay applied to it based on the age of the data. Each of the indicators are awarded a +/- impact to the overall trade recommendation (via a “confidence score”). In addition to dynamically adjusting the dollar value of the trade, this allows me the freedom to adjust the importance applied to each indicator based on the window of time I want to evaluate. In less volatile markets, I can adjust the time window to a week; in high volatility markets, I can adjust the window to only the last hour.
The output of this is a BUY/SELL/HOLD recommendation with a dynamic dollar value based on the calculated confidence.
Action 4: Execute the Trade
If the conditions are satisfied based on the thresholds defined in the script, we execute the trade and log everything to our database. A notification is sent to my phone and, hopefully, I made a few bucks.
Disclaimer
I make no claims this will work. If you copy this and something goes wrong resulting in you losing everything, that’s not my fault and I take no responsibility.
How I Implemented
Step 1: Create a dedicated Coinbase Advanced portfolio
This technically isn’t necessary but allows me to cleanly isolate all of my bot transactions from any other transactions as well as track P&L separately. Navigate to Coinbase Advanced on Desktop (not supported in app) and click “New Portfolio”:

Once created, you can fund this portfolio and have a dedicated “partition” to track trades made by the bot.
Step 2: Create Coinbase Advanced API keys
Navigate to the Coinbase Developer Platform and select “Create API Key”. We’ll be using the “Secret API Keys” for this.
Step 3: Create CryptoBot AWS Lambda
I chose to split the CryptoBot Lambda from the actual Lambda function which authenticates with the Coinbase API. This isn’t necessary; it was merely a personal choice as I intend to use the second Lambda for other projects. I added Requests and MySQL modules as layers to your Lambda but everything else should be available out of the box with Lambda. Here’s my full CryptoBot Lambda with comments throughout to explain the logic in more detail:
import requests import mysql.connector import logging import json import os from datetime import datetime import time import math from decimal import Decimal import boto3 # ===================== # CONFIGURATION # ===================== CONFIG = { "product_id": "BTC-USDC", # The Coinbase pair to trade "function_name": "", # The ARN of the Coinbase transacting Lambda function (described in step 4 below) "coinbase_candles_url": "https://api.exchange.coinbase.com/products/BTC-USD/candles", "coinbase_price_url": "https://api.coinbase.com/api/v3/brokerage/market/products/BTC-USD", # -------------------------------------------------------------------------- # Trading Bot Settings # -------------------------------------------------------------------------- "granularity": 60, # Candle granularity in seconds (300 = 5 min) "sma_short_period": 5, # Short-period for SMA "sma_long_period": 15, # Long-period for SMA "rsi_period": 9, # RSI period "macd_short_period": 12, # MACD short EMA period "macd_long_period": 26, # MACD long EMA period "macd_signal_period": 9, # MACD signal line period "bollinger_period": 20, # Bollinger band period "bollinger_std_dev": 2, # Bollinger band standard deviation "volume_check_period": 20, # # of candles for average volume calculation "volume_multiplier": 1.0, # Volume threshold multiplier (current vs. average) "buy_threshold": 30, # RSI below indicates oversold "sell_threshold": 70, # RSI above indicates overbought "max_trade_amount": 1000, # Max trade amount in USD "trade_limit_per_hour": 5, # Max trades allowed per hour "recent_trade_window": 60, # Minutes to check for recent similar trades "stop_loss_percent": -2.0, # e.g. -2% from current price "take_profit_percent": 3.0, # e.g. +3% from current price "buy_confidence_threshold": 20, "sell_confidence_threshold": -20 } logger = logging.getLogger() logger.setLevel(logging.INFO) def get_db_connection(): db_host = os.environ.get("DB_HOST", "") db_user = os.environ.get("DB_USER", "") db_password = os.environ.get("DB_PASSWORD", "") db_name = os.environ.get("DB_NAME", "") return mysql.connector.connect( host=db_host, user=db_user, password=db_password, database=db_name ) def log_trade_decision(timestamp, action, amount, confidence, reason, additional_info): """ Inserts a trade decision record into the 'cryptobot_trade_decisions' table. :param timestamp: UTC datetime of the trade decision :param action: 'BUY', 'SELL', or 'HOLD' :param amount: The USD value allocated for the trade :param confidence: Final confidence score :param reason: Explanation or reason for the trade :param additional_info: JSON-serialized extra info (indicators, spot price, etc.) """ conn = get_db_connection() cursor = conn.cursor() query = """ INSERT INTO cryptobot_trade_decisions (timestamp, action, amount, confidence_score, reason, additional_info) VALUES (%s, %s, %s, %s, %s, %s) """ try: cursor.execute(query, (timestamp, action, amount, confidence, reason, additional_info)) conn.commit() logger.info("Trade decision logged successfully.") except Exception as e: logger.error(f"Error logging trade decision: {str(e)}", exc_info=True) finally: cursor.close() conn.close() def log_candle_data(candles): """ Logs each candle into the 'cryptobot_candle_data' table. Candle format from Coinbase: [time, low, high, open, close, volume] """ if not candles: logger.warning("No candles provided to log_candle_data; skipping insert.") return conn = get_db_connection() cursor = conn.cursor() query = """ INSERT INTO cryptobot_candle_data (candle_time, low, high, open_price, close_price, volume) VALUES (%s, %s, %s, %s, %s, %s) """ insert_values = [] for candle in candles: candle_ts = datetime.utcfromtimestamp(candle[0]) low = float(candle[1]) high = float(candle[2]) open_p = float(candle[3]) close_p = float(candle[4]) vol = float(candle[5]) insert_values.append((candle_ts, low, high, open_p, close_p, vol)) try: cursor.executemany(query, insert_values) conn.commit() logger.info(f"Logged {len(candles)} candles to the database.") except Exception as e: logger.error(f"Error logging candle data: {str(e)}", exc_info=True) finally: cursor.close() conn.close() def fetch_candle_data(): """ Retrieves recent candle data from Coinbase using the configured granularity. """ now = int(time.time()) start = now - (CONFIG["granularity"] * 300) # 300 candles url = f"{CONFIG['coinbase_candles_url']}?granularity={CONFIG['granularity']}&start={start}&end={now}" logger.info(f"Fetching candle data from URL: {url}") try: response = requests.get(url) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error fetching candle data: {str(e)}", exc_info=True) raise def fetch_spot_price(): """ Retrieves the current spot price from Coinbase. """ logger.info(f"Fetching spot price from URL: {CONFIG['coinbase_price_url']}") try: response = requests.get(CONFIG["coinbase_price_url"]) response.raise_for_status() data = response.json() return float(data["price"]) except requests.exceptions.RequestException as e: logger.error(f"Error fetching spot price: {str(e)}", exc_info=True) raise def fetch_recent_trades(): """ Retrieves trades from 'cryptobot_trade_decisions' table that occurred within the last CONFIG['recent_trade_window'] minutes. """ conn = get_db_connection() cursor = conn.cursor(dictionary=True) query = """ SELECT action, amount, timestamp, confidence_score, reason FROM cryptobot_trade_decisions WHERE timestamp >= NOW() - INTERVAL %s MINUTE """ try: cursor.execute(query, (CONFIG["recent_trade_window"],)) trades = cursor.fetchall() return trades except Exception as e: logger.error(f"Error fetching recent trades: {str(e)}", exc_info=True) return [] finally: cursor.close() conn.close() def count_hourly_trades(): """ Counts how many BUY or SELL trades occurred within the last hour. """ conn = get_db_connection() cursor = conn.cursor() query = """ SELECT COUNT(*) FROM cryptobot_trade_decisions WHERE action IN ('BUY', 'SELL') AND timestamp >= NOW() - INTERVAL 1 HOUR """ try: cursor.execute(query) count = cursor.fetchone()[0] return count except Exception as e: logger.error(f"Error counting hourly trades: {str(e)}", exc_info=True) return 0 finally: cursor.close() conn.close() def calculate_sma(candles, period): """ Calculates the Simple Moving Average (SMA) for a given period. :param candles: A list of candle arrays :param period: The lookback period for SMA :return: SMA value as float """ closes = [float(candle[4]) for candle in candles[:period]] return sum(closes) / len(closes) if closes else 0 def calculate_rsi(candles, period): """ Calculates the RSI. :param candles: A list of candle arrays :param period: The standard RSI period (e.g. 14) :return: RSI value as float """ closes = [float(candle[4]) for candle in candles] gains = [max(0, closes[i] - closes[i - 1]) for i in range(1, len(closes))] losses = [max(0, closes[i - 1] - closes[i]) for i in range(1, len(closes))] avg_gain = sum(gains[:period]) / period if period <= len(gains) else 0 avg_loss = sum(losses[:period]) / period if period <= len(losses) else 0 for i in range(period, len(closes) - 1): avg_gain = (avg_gain * (period - 1) + gains[i]) / period avg_loss = (avg_loss * (period - 1) + losses[i]) / period if avg_loss == 0: return 100 rs = avg_gain / avg_loss return 100 - (100 / (1 + rs)) def calculate_macd(candles, short_period, long_period, signal_period): """ Calculates MACD (line, signal, histogram). :param candles: A list of candle arrays :param short_period: Short EMA period :param long_period: Long EMA period :param signal_period: Signal line EMA period :return: (macd_line_value, signal_line_value, histogram_value) """ closes = [float(candle[4]) for candle in reversed(candles)] if len(closes) < long_period + signal_period: return 0, 0, 0 def ema(values, p): k = 2 / (p + 1) ema_list = [] ema_current = sum(values[:p]) / p ema_list.append(ema_current) for price in values[p:]: ema_current = (price - ema_current) * k + ema_current ema_list.append(ema_current) return ema_list ema_short = ema(closes, short_period) ema_long = ema(closes, long_period) # Align lengths ema_short = ema_short[-len(ema_long):] if len(ema_short) > len(ema_long) else ema_short macd_line = [a - b for a, b in zip(ema_short, ema_long)] if len(macd_line) < signal_period: return 0, 0, 0 signal_list = ema(macd_line, signal_period) signal_list = signal_list[-len(macd_line):] histogram = [m - s for m, s in zip(macd_line[-len(signal_list):], signal_list)] return macd_line[-1], signal_list[-1], histogram[-1] def calculate_bollinger_bands(candles, period, num_std_dev): """ Calculates Bollinger Bands: upper, mid (SMA), and lower. :param candles: A list of candle arrays :param period: Bollinger lookback period :param num_std_dev: # of standard deviations from mid :return: (upper_band, middle_band, lower_band) """ closes = [float(candle[4]) for candle in candles[:period]] if not closes: return (0, 0, 0) sma = sum(closes) / len(closes) variance = sum((c - sma) ** 2 for c in closes) / len(closes) stddev = math.sqrt(variance) upper_band = sma + (num_std_dev * stddev) lower_band = sma - (num_std_dev * stddev) return (upper_band, sma, lower_band) def average_volume(candles, period): """ Calculates the average trading volume. :param candles: A list of candle arrays :param period: # of candles to include :return: Average volume as float """ vols = [float(candle[5]) for candle in candles[:period]] return sum(vols) / len(vols) if vols else 0 def apply_confidence_decay(confidence, time_delta_minutes, half_life=15): """ Applies exponential decay to the confidence score. :param confidence: Raw confidence score :param time_delta_minutes: Age of the most recent candle in minutes :param half_life: Decay half-life in minutes :return: Decayed confidence score """ if half_life <= 0: return confidence decay_factor = math.exp(-time_delta_minutes / half_life) return confidence * decay_factor def parse_reason_prefix(reason): """ Returns the substring of reason before ' | ', useful for deduping trades. """ return reason.split(" | ")[0].strip() def fetch_recent_similar_trades(action, decayed_confidence, reason_prefix, confidence_threshold=5): """ Checks if a similar trade was executed recently to avoid duplicates. :param action: 'BUY' or 'SELL' :param decayed_confidence: Current confidence score (float) :param reason_prefix: Intro portion of reason text :param confidence_threshold: Allowed +/- difference in confidence :return: List of matching trades """ conn = get_db_connection() cursor = conn.cursor(dictionary=True) query = """ SELECT action, amount, timestamp, confidence_score, reason FROM cryptobot_trade_decisions WHERE action = %s AND timestamp >= NOW() - INTERVAL %s MINUTE AND confidence_score BETWEEN %s AND %s AND reason LIKE %s LIMIT 1 """ c_low = decayed_confidence - confidence_threshold c_high = decayed_confidence + confidence_threshold reason_like = reason_prefix + "%" try: cursor.execute(query, (action, CONFIG["recent_trade_window"], c_low, c_high, reason_like)) return cursor.fetchall() except Exception as e: logger.error(f"Error in fetch_recent_similar_trades: {str(e)}", exc_info=True) return [] finally: cursor.close() conn.close() def trading_decision(candles, spot_price, recent_trades, hourly_trade_count): """ Core logic to determine the trading action (BUY, SELL, or HOLD), the trade amount, and the reason for the decision. :param candles: List of candle arrays :param spot_price: Current BTC-USD price :param recent_trades: Recent trades within the configured window :param hourly_trade_count: # of trades in the last hour :return: (action, amount, confidence, reason, signal_info) """ # 1. Check hourly trade limits if hourly_trade_count >= CONFIG["trade_limit_per_hour"]: reason = f"Trade limit reached: {hourly_trade_count} trades in the last hour." logger.info(reason) return ("HOLD", 0, 0, reason, {}) # 2. Calculate technical indicators sma_short = calculate_sma(candles, CONFIG["sma_short_period"]) sma_long = calculate_sma(candles, CONFIG["sma_long_period"]) rsi_value = calculate_rsi(candles, CONFIG["rsi_period"]) macd_line, signal_line, macd_hist = calculate_macd( candles, CONFIG["macd_short_period"], CONFIG["macd_long_period"], CONFIG["macd_signal_period"] ) bb_upper, bb_middle, bb_lower = calculate_bollinger_bands( candles, CONFIG["bollinger_period"], CONFIG["bollinger_std_dev"] ) avg_vol = average_volume(candles, CONFIG["volume_check_period"]) curr_vol = float(candles[0][5]) if candles else 0 # Collate indicators for logging & messaging signal_info = { "RSI": round(rsi_value, 2), "SMA_Short": round(sma_short, 2), "SMA_Long": round(sma_long, 2), "MACD_Line": round(macd_line, 2), "MACD_Signal": round(signal_line, 2), "MACD_Hist": round(macd_hist, 2), "Boll_Upper": round(bb_upper, 2), "Boll_Middle": round(bb_middle, 2), "Boll_Lower": round(bb_lower, 2), "Avg_Vol": round(avg_vol, 2), "Curr_Vol": round(curr_vol, 2) } logger.info( f"Indicators -> SMA Short: {sma_short}, SMA Long: {sma_long}, " f"RSI: {rsi_value}, MACD: {macd_line} (Signal: {signal_line}, Hist: {macd_hist}), " f"Bollinger: (U:{bb_upper}, M:{bb_middle}, L:{bb_lower}), " f"AvgVol: {avg_vol}, CurrVol: {curr_vol}" ) logger.info(f"Recent Trades in last {CONFIG['recent_trade_window']} mins: {recent_trades}") logger.info(f"Hourly Trade Count: {hourly_trade_count}") # 3. Avoid redundant trades in extreme RSI conditions for rt in recent_trades: if rt["action"] == "BUY" and rsi_value < CONFIG["buy_threshold"]: reason = "Skipping: Similar BUY trade recently in oversold RSI." logger.info(reason) return ("HOLD", 0, 0, reason, signal_info) if rt["action"] == "SELL" and rsi_value > CONFIG["sell_threshold"]: reason = "Skipping: Similar SELL trade recently in overbought RSI." logger.info(reason) return ("HOLD", 0, 0, reason, signal_info) # 4. Base Weighted Confidence base_confidence = 0 # RSI weighting if rsi_value < CONFIG["buy_threshold"]: base_confidence += 15 elif rsi_value > CONFIG["sell_threshold"]: base_confidence -= 15 # SMA weighting if sma_short > sma_long: base_confidence += 15 else: base_confidence -= 15 # MACD weighting if macd_line > signal_line: base_confidence += 10 else: base_confidence -= 10 # Bollinger weighting latest_close = float(candles[0][4]) if candles else 0 if latest_close < bb_lower: base_confidence += 15 elif latest_close > bb_upper: base_confidence -= 15 # Volume weighting if avg_vol > 0 and curr_vol > (avg_vol * CONFIG["volume_multiplier"]): base_confidence += 10 else: base_confidence -= 5 logger.info(f"Base confidence (pre-decay) = {base_confidence}") # 4.1 Apply Confidence Decay latest_candle_time = datetime.utcfromtimestamp(candles[0][0]) if candles else datetime.utcnow() time_delta_minutes = (datetime.utcnow() - latest_candle_time).total_seconds() / 60.0 decayed_confidence = apply_confidence_decay(base_confidence, time_delta_minutes, half_life=15) logger.info(f"Decayed confidence = {decayed_confidence:.2f}") # 5. Decide action action = "HOLD" reason = "No strong signal." if decayed_confidence >= CONFIG["buy_confidence_threshold"]: action = "BUY" reason = f"Positive signals align (confidence={decayed_confidence:.2f})." elif decayed_confidence <= CONFIG["sell_confidence_threshold"]: action = "SELL" reason = f"Negative signals align (confidence={decayed_confidence:.2f})." # 6. Determine trade amount based on confidence tiers max_amt = CONFIG["max_trade_amount"] amount = 0.0 if action == "BUY": if decayed_confidence >= 40: amount = max_amt elif decayed_confidence >= 35: amount = max_amt * 0.75 elif decayed_confidence >= 30: amount = max_amt * 0.50 elif decayed_confidence >= 25: amount = max_amt * 0.25 else: amount = max_amt * 0.1 elif action == "SELL": # never 0 if thresholds met if decayed_confidence <= -40: amount = max_amt elif decayed_confidence <= -35: amount = max_amt * 0.75 elif decayed_confidence <= -30: amount = max_amt * 0.50 elif decayed_confidence <= -25: amount = max_amt * 0.25 else: amount = max_amt * 0.1 # 7. Stop-Loss & Take-Profit stop_loss_price = 0 take_profit_price = 0 if action == "BUY": stop_loss_price = spot_price * (1.0 + (CONFIG["stop_loss_percent"] / 100.0)) take_profit_price = spot_price * (1.0 + (CONFIG["take_profit_percent"] / 100.0)) elif action == "SELL": stop_loss_price = spot_price * (1.0 - (CONFIG["stop_loss_percent"] / 100.0)) take_profit_price = spot_price * (1.0 - (CONFIG["take_profit_percent"] / 100.0)) reason += f" | SL={stop_loss_price:.2f}, TP={take_profit_price:.2f}" # 8. Check for recent similar trades if action in ["BUY", "SELL"]: reason_prefix = parse_reason_prefix(reason) similar_recent = fetch_recent_similar_trades(action, decayed_confidence, reason_prefix) if similar_recent: skip_reason = "Skipping trade due to recent similar trade with same reason prefix & confidence." logger.info(skip_reason) return ("HOLD", 0, 0, skip_reason, signal_info) logger.info(f"Final Decision: {action}, Amount={amount}, Confidence={decayed_confidence:.2f}, Reason={reason}") return (action, amount, decayed_confidence, reason, signal_info) def lambda_handler(event, context): """ The steps executed: 1) Fetch & log candles 2) Fetch spot price 3) Fetch recent trades & count hourly trades 4) Compute trading decision 5) Log the decision 6) [Optionally] Send alert/notification alert if BUY or SELL 7) Auto-execute the trade via another Lambda if BUY or SELL """ try: # 1. Fetch & log candle data candles = fetch_candle_data() log_candle_data(candles) # 2. Fetch current spot price spot_price = fetch_spot_price() # 3. Retrieve recent trades & hourly count recent_trades = fetch_recent_trades() hourly_trade_count = count_hourly_trades() # 4. Make the trading decision action, amount, confidence, reason, signal_info = trading_decision( candles, spot_price, recent_trades, hourly_trade_count ) # 5. Log the decision to MySQL additional_info = { "spot_price": spot_price, "hourly_trade_count": hourly_trade_count, "recent_trades": recent_trades, "decayed_confidence": confidence, "signals": signal_info } log_trade_decision( timestamp=datetime.utcnow(), action=action, amount=amount, confidence=confidence, reason=reason, additional_info=json.dumps(additional_info, default=str) ) # 6. [Optional] Send a notification/message to you if we have a BUY or SELL if action in ["BUY", "SELL"]: message_text = ( f"*{action}*!\n" f"Amount: *${amount:.2f}*\n" f"Confidence: *{confidence:.2f}*\n" f"Reason: *{reason}*\n" f"Current Price: ${spot_price:.2f}\n\n" f"--- Indicators ---\n" f"RSI: {signal_info['RSI']}\n" f"SMA (Short/Long): {signal_info['SMA_Short']} / {signal_info['SMA_Long']}\n" f"MACD (Line/Signal/Hist): {signal_info['MACD_Line']} / {signal_info['MACD_Signal']} / {signal_info['MACD_Hist']}\n" f"Bollinger (U/M/L): {signal_info['Boll_Upper']} / {signal_info['Boll_Middle']} / {signal_info['Boll_Lower']}\n" f"Avg Vol: {signal_info['Avg_Vol']}, CurrVol: {signal_info['Curr_Vol']}" ) # Insert your code to send the message_text above. I have mine sending messages to my Synology Chat instance but you can have it email or SMS you, if desired. # 7. Automatically execute the trade if BUY or SELL if action == "BUY" and amount > 0: client_order_id = str(int(time.time())) buy_payload = { "action": "execute_buy_order", "params": { "client_order_id": client_order_id, "product_id": CONFIG["product_id"], "quote_size": str(amount), # The USD value "base_size": None } } lambda_client = boto3.client("lambda") logger.info(f"Invoking {CONFIG['function_name']} with buy_payload: {buy_payload}") try: response = lambda_client.invoke( FunctionName=CONFIG["function_name"], InvocationType="RequestResponse", Payload=json.dumps(buy_payload) ) resp_payload = response["Payload"].read().decode("utf-8", errors="ignore") logger.info(f"Lambda BUY response: {resp_payload}") except Exception as e: logger.error(f"Error invoking Lambda for BUY: {str(e)}", exc_info=True) elif action == "SELL" and amount > 0: # Convert USD to BTC as sell orders must be in base currency. base_size_btc = round(amount / spot_price, 8) client_order_id = str(int(time.time())) sell_payload = { "action": "execute_sell_order", "params": { "client_order_id": client_order_id, "product_id": CONFIG["product_id"], "quote_size": None, "base_size": str(base_size_btc) } } lambda_client = boto3.client("lambda") logger.info(f"Invoking {CONFIG['function_name']} with sell_payload: {sell_payload}") try: response = lambda_client.invoke( FunctionName=CONFIG["function_name"], InvocationType="RequestResponse", Payload=json.dumps(sell_payload) ) resp_payload = response["Payload"].read().decode("utf-8", errors="ignore") logger.info(f"Lambda SELL response: {resp_payload}") except Exception as e: logger.error(f"Error invoking Lambda for SELL: {str(e)}", exc_info=True) # 8. Return final response for debugging return { "action": action, "amount": amount, "confidence": confidence, "reason": reason } except Exception as e: logger.error(f"Unhandled error in lambda_handler: {e}", exc_info=True) raise
Step 4: Create Coinbase API Lambda
Create the CoinbaseAPI Lambda which will perform the actual actions against the Coinbase API. Again, this could be merged into the prior Lambda but I chose to split it out as I will be using it for other projects. For this, you’ll need to add the Coinbase Advanced SDK as a layer as well as MySQL module. Be sure to add your Coinbase API credentials created in step 2 + your MySQL credentials as environmental variables like noted in the script:
import os import json import logging import mysql.connector from coinbase.rest import RESTClient from datetime import datetime import time logger = logging.getLogger() logger.setLevel(logging.INFO) def log_with_function_name(func_name, message): logger.info(f"[{func_name}] {message}") def get_mysql_connection(): func_name = "get_mysql_connection" try: connection = mysql.connector.connect( host=os.getenv("MYSQL_HOST"), user=os.getenv("MYSQL_USER"), password=os.getenv("MYSQL_PASSWORD"), database=os.getenv("MYSQL_DATABASE"), charset="utf8mb4" ) return connection except Exception as e: log_with_function_name(func_name, f"Error connecting to MySQL: {str(e)}") raise def log_api_call(event_body, endpoint, payload, response): """ Logs details of an API call into 'coinbase_api_log' table. """ func_name = "log_api_call" try: connection = get_mysql_connection() cursor = connection.cursor() sql = """ INSERT INTO coinbase_api_log ( event_body, timestamp, endpoint, payload, response ) VALUES (%s, %s, %s, %s, %s) """ cursor.execute(sql, ( json.dumps(event_body) if event_body else None, datetime.utcnow(), endpoint, json.dumps(payload) if payload else None, json.dumps(response) if response else None )) connection.commit() cursor.close() log_with_function_name(func_name, "API call logged successfully.") except Exception as e: log_with_function_name(func_name, f"Error logging API call: {str(e)}") finally: if 'connection' in locals() and connection.is_connected(): connection.close() def get_rest_client(): api_key = os.getenv("COINBASE_API_KEY") api_secret = os.getenv("COINBASE_API_SECRET") if not api_key or not api_secret: raise ValueError("Missing API key or secret in environment variables.") return RESTClient(api_key=api_key, api_secret=api_secret) def list_accounts(event_body, limit=250): """ Lists brokerage accounts from Coinbase via /api/v3/brokerage/accounts. :param event_body: The event data triggering this action :param limit: The maximum number of accounts to retrieve (default 250) :return: A dict with a status and a list of 'accounts' """ func_name = "list_accounts" try: log_with_function_name(func_name, f"Initializing REST client.") client = get_rest_client() log_with_function_name(func_name, f"Calling /brokerage/accounts with limit={limit}.") response = client.get("/api/v3/brokerage/accounts", params={"limit": limit}) log_with_function_name(func_name, "Logging API call details.") log_api_call(event_body, "/api/v3/brokerage/accounts", {"limit": limit}, response) accounts = response.get("accounts", []) log_with_function_name(func_name, f"Retrieved {len(accounts)} accounts.") for account in accounts: log_with_function_name(func_name, f"Account: {json.dumps(account, indent=4)}") return {"status": "success", "accounts": accounts} except Exception as e: log_with_function_name(func_name, f"Error occurred: {str(e)}") log_api_call(event_body, "/api/v3/brokerage/accounts", {"limit": limit}, {"error": str(e)}) raise def execute_order(event_body, side, client_order_id, product_id, quote_size=None, base_size=None, preview_id=None): """ Executes a BUY or SELL order via /api/v3/brokerage/orders. :param event_body: The event data :param side: "BUY" or "SELL" :param client_order_id: Unique ID for this order (string) :param product_id: The product ID, e.g. "BTC-USDC" :param quote_size: The notional (USD) amount to trade (optional) :param base_size: The BTC amount to trade (optional for buy but required for sell) :param preview_id: An optional preview ID if you do preview-based orders :return: A dict with "status" and "order_response" data :raises: ValueError if side is invalid, or if both quote_size and base_size are set """ func_name = "execute_order" try: log_with_function_name(func_name, "Initializing REST client.") client = get_rest_client() if side not in ["BUY", "SELL"]: raise ValueError("Invalid order side. Must be 'BUY' or 'SELL'.") # Ensure only one of `quote_size` or `base_size` is set if not ((quote_size and not base_size) or (base_size and not quote_size)): raise ValueError("Specify either `quote_size` or `base_size`, but not both.") market_config = {} if quote_size: market_config["quote_size"] = quote_size elif base_size: market_config["base_size"] = base_size order_payload = { "client_order_id": client_order_id, "product_id": product_id, "side": side, "order_configuration": { "market_market_ioc": market_config }, "preview_id": preview_id } log_with_function_name(func_name, f"Placing {side} order: {json.dumps(order_payload, indent=4)}") response = client.post("/api/v3/brokerage/orders", data=order_payload) log_with_function_name(func_name, "Logging API call.") log_api_call(event_body, "/api/v3/brokerage/orders", order_payload, response) log_with_function_name(func_name, f"{side} order placed. Response: {json.dumps(response, indent=4)}") return {"status": "success", "order_response": response} except Exception as e: log_with_function_name(func_name, f"Error occurred: {str(e)}") order_payload = locals().get("order_payload", {}) log_api_call(event_body, "/api/v3/brokerage/orders", order_payload, {"error": str(e)}) raise def list_historical_orders(event_body, product_ids=None, limit=100, cursor=None): """ Lists historical orders via /api/v3/brokerage/orders/historical/batch. :param event_body: The event data :param product_ids: List or string of product IDs (e.g., ["BTC-USDC"]) :param limit: # of orders to retrieve :param cursor: For paginated requests :return: A dict with "status" and "orders" """ func_name = "list_historical_orders" try: log_with_function_name(func_name, f"Initializing REST client.") client = get_rest_client() params = {"limit": limit} if cursor: params["cursor"] = cursor if product_ids: params["product_ids"] = product_ids log_with_function_name(func_name, f"Calling /api/v3/brokerage/orders/historical/batch with {params}.") response = client.get("/api/v3/brokerage/orders/historical/batch", params=params) log_with_function_name(func_name, "Logging API call.") log_api_call(event_body, "/api/v3/brokerage/orders/historical/batch", params, response) orders = response.get("orders", []) log_with_function_name(func_name, f"Response: {json.dumps(response, indent=4)}") return {"status": "success", "orders": orders} except Exception as e: log_with_function_name(func_name, f"Error occurred: {str(e)}") log_api_call(event_body, "/api/v3/brokerage/orders/historical/batch", params, {"error": str(e)}) raise def fetch_and_store_order_details(event_body, limit=500, sort_by="TRADE_TIME"): """ Retrieves order IDs from the db, calls Coinbase for detailed fills, and stores them. :param event_body: The event data :param limit: Maximum # of records to return per request :param sort_by: Sorting method for results :return: A dict describing the final status or message """ func_name = "fetch_and_store_order_details" try: log_with_function_name(func_name, "Querying MySQL for order IDs to fetch their fill details.") connection = get_mysql_connection() cursor = connection.cursor(dictionary=True) sql_query = """ SELECT DISTINCT JSON_UNQUOTE(response->>'$.success_response.side') AS side, JSON_UNQUOTE(response->>'$.success_response.order_id') AS order_id, JSON_UNQUOTE(response->>'$.success_response.product_id') AS product_id FROM coinbase_api_log WHERE JSON_UNQUOTE(event_body->>'$.action') IN ('execute_buy_order', 'execute_sell_order') AND JSON_UNQUOTE(response->>'$.success') = 'true' """ cursor.execute(sql_query) rows = cursor.fetchall() cursor.close() connection.close() order_ids = [row['order_id'] for row in rows if row['order_id']] if not order_ids: log_with_function_name(func_name, "No successful order IDs found in coinbase_api_log.") return {"status": "success", "message": "No order IDs found."} # Retrieve detailed fills for these orders return get_order_fills(event_body, order_ids, limit=limit, sort_by=sort_by) except Exception as e: log_with_function_name(func_name, f"Error occurred: {str(e)}") raise def get_order_fills(event_body, order_ids, limit=500, sort_by="TRADE_TIME"): """ Fetches fill details for each order ID using /api/v3/brokerage/orders/historical/fills, handles pagination, and then logs them to MySQL. :param event_body: The event data :param order_ids: List of order IDs to fetch fills for :param limit: # of records per page :param sort_by: Sorting method, default 'TRADE_TIME' :return: A dict with status=success and a list of "fills" """ func_name = "get_order_fills" try: log_with_function_name(func_name, f"Fetching fills for order IDs: {order_ids}") client = get_rest_client() all_fills = [] has_more = True next_cursor = None while has_more: params = {"limit": limit, "sort_by": sort_by} for oid in order_ids: params.setdefault("order_ids", []).append(oid) if next_cursor: params["cursor"] = next_cursor log_with_function_name(func_name, f"Calling /brokerage/orders/historical/fills with params: {json.dumps(params, indent=4)}.") response = client.get("/api/v3/brokerage/orders/historical/fills", params=params) fills = response.get("fills", []) all_fills.extend(fills) log_api_call(event_body, "/api/v3/brokerage/orders/historical/fills", params, response) has_more = response.get("has_next", False) next_cursor = response.get("cursor") if has_more else None if has_more: log_with_function_name(func_name, "Pagination detected. Waiting 1 second before next call.") time.sleep(1) log_with_function_name(func_name, f"Fetched {len(all_fills)} total fills. Storing in MySQL.") log_fills_to_mysql(all_fills) return {"status": "success", "fills": all_fills} except Exception as e: log_with_function_name(func_name, f"Error occurred: {str(e)}") log_api_call(event_body, "/api/v3/brokerage/orders/historical/fills", {"order_ids": order_ids, "limit": limit}, {"error": str(e)}) raise def log_fills_to_mysql(fills): """ Persists fill details into 'coinbase_orders' table, using an ON DUPLICATE KEY update clause to avoid duplicate records. :param fills: List of fill objects from Coinbase's /fills endpoint """ func_name = "log_fills_to_mysql" if not fills: log_with_function_name(func_name, "No fills to log. Skipping DB insert.") return try: connection = get_mysql_connection() cursor = connection.cursor() sql = """ INSERT INTO coinbase_orders ( order_id, trade_id, product_id, side, size, price, fee, trade_time, settled, entry_id, trade_type, commission, sequence_timestamp, liquidity_indicator, size_in_quote, user_id, retail_portfolio_id ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE product_id = VALUES(product_id), side = VALUES(side), size = VALUES(size), price = VALUES(price), fee = VALUES(fee), trade_time = VALUES(trade_time), settled = VALUES(settled), entry_id = VALUES(entry_id), trade_type = VALUES(trade_type), commission = VALUES(commission), sequence_timestamp = VALUES(sequence_timestamp), liquidity_indicator = VALUES(liquidity_indicator), size_in_quote = VALUES(size_in_quote), user_id = VALUES(user_id), retail_portfolio_id = VALUES(retail_portfolio_id) """ for fill in fills: cursor.execute(sql, ( fill.get("order_id"), fill.get("trade_id"), fill.get("product_id"), fill.get("side"), fill.get("size"), fill.get("price"), fill.get("commission"), fill.get("trade_time"), None, # 'settled' not in response, stored for backward compatibility fill.get("entry_id"), fill.get("trade_type"), fill.get("commission"), fill.get("sequence_timestamp"), fill.get("liquidity_indicator"), fill.get("size_in_quote"), fill.get("user_id"), fill.get("retail_portfolio_id") )) connection.commit() cursor.close() log_with_function_name(func_name, f"{len(fills)} fills logged/updated in coinbase_orders table.") except Exception as e: log_with_function_name(func_name, f"Error logging fills to MySQL: {str(e)}") finally: if 'connection' in locals() and connection.is_connected(): connection.close() def lambda_handler(event, context): """ Supported Actions: - list_accounts - execute_buy_order - execute_sell_order - list_historical_orders - fetch_and_store_order_details """ func_name = "lambda_handler" try: log_with_function_name(func_name, f"Received event: {json.dumps(event)}") action = event.get("action") params = event.get("params", {}) if action == "list_accounts": return list_accounts(event, limit=params.get("limit", 248)) elif action in ["execute_buy_order", "execute_sell_order"]: side = "BUY" if action == "execute_buy_order" else "SELL" return execute_order( event_body=event, side=side, client_order_id=params["client_order_id"], product_id=params["product_id"], quote_size=params.get("quote_size"), base_size=params.get("base_size"), preview_id=params.get("preview_id") ) elif action == "list_historical_orders": return list_historical_orders( event_body=event, product_ids=params.get("product_ids"), limit=params.get("limit", 100), cursor=params.get("cursor") ) elif action == "fetch_and_store_order_details": return fetch_and_store_order_details(event) else: raise ValueError(f"Unsupported action: {action}") except Exception as e: log_with_function_name(func_name, f"Error occurred: {str(e)}") return { "statusCode": 500, "body": json.dumps({"error": str(e)}) }
Step 5: Modify the IAM Role for your CryptoBot Lambda
To allow the CryptoBot Lambda to invoke the CoinbaseAPI Lambda, we must modify the IAM policy for the CrpyptoBot Lambda user (or use the same IAM user for both). I opted for two unique users and modified my IAM user policy like so:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowLambdaInvokeCoinbaseAPI", "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": "<ARM OF THE COINBASEAPI LAMBDA HERE>" } ] }
Step 6: Create MySQL tables
Here are the four tables I utilize. This could be simplified, for sure. Storing the full historical candle data, for example, is rather wasteful. I opt to log all of the API calls just as a precaution so I can see exactly what happened if something goes wrong.
CREATE TABLE `cryptobot_trade_decisions` ( `id` int NOT NULL AUTO_INCREMENT, `timestamp` timestamp NULL DEFAULT NULL, `action` varchar(10) DEFAULT NULL, `amount` decimal(18,8) DEFAULT NULL, `confidence_score` decimal(5,2) DEFAULT NULL, `reason` text, `additional_info` json DEFAULT NULL, `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; CREATE TABLE `cryptobot_candle_data` ( `id` int NOT NULL AUTO_INCREMENT, `candle_time` timestamp NOT NULL, `low` double DEFAULT NULL, `high` double DEFAULT NULL, `open_price` double DEFAULT NULL, `close_price` double DEFAULT NULL, `volume` double DEFAULT NULL, `inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; CREATE TABLE `coinbase_api_log` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT, `event_body` json DEFAULT NULL, `timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, `endpoint` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `payload` json DEFAULT NULL, `response` json DEFAULT NULL, `error` text COLLATE utf8mb4_unicode_ci, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; CREATE TABLE `coinbase_orders` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT, `order_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `trade_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `product_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `side` enum('BUY','SELL') COLLATE utf8mb4_unicode_ci DEFAULT NULL, `size` decimal(18,8) DEFAULT NULL, `price` decimal(18,8) DEFAULT NULL, `fee` decimal(18,8) DEFAULT NULL, `trade_time` datetime DEFAULT NULL, `settled` tinyint(1) DEFAULT NULL, `entry_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `trade_type` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `commission` decimal(18,8) DEFAULT NULL, `sequence_timestamp` datetime DEFAULT NULL, `liquidity_indicator` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `size_in_quote` tinyint(1) DEFAULT NULL, `user_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `retail_portfolio_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `unique_trade` (`trade_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Step 7: Creating the EventBridge Schedules
I have two EventBridge schedules running:
- Schedule 1 simply invokes the CryptoBot every 15 minutes with an empty payload of {}. I can increase or decrease the 15 minute schedule based on how often I want the bot to evaluate trade decisions. Given the simplicity of this logic, 15 minutes is likely as low as I’d go (and that being overly aggressive).
- Schedule 2 invokes my CoinbaseAPI Lambda every hour and loops through any new orders that have been placed and stores that order info in the coinbase_orders table. This isn’t needed but, again, I like the additional tracking and monitoring in the event something unexpected happens. The payload (to trigger the proper function) for this schedule is:
{ "action": "fetch_and_store_order_details", "params": { "limit": 500, "sort_by": "TRADE_TIME" } }
Conclusion
That’s it! You should see your trades happening and you can monitor the trades for the specific portfolio directly from the Orders page on Coinbase by filtering to the dedicated portfolio you created in Step 1:

Again, I make no claims this will work. If you copy this and something goes wrong resulting in you losing everything, that’s not my fault and I take no responsibility.
Leave a Reply