How to Build an AI Sales Development Representative (SDR) in Slack - Part 2: Email Enrichment

Introduction #
In this second part of our AI SDR series, we'll enhance our AI sales assistant with email enrichment capabilities. This powerful feature transforms our free AI tool for lead generation into a comprehensive AI-powered lead qualification system. By the end of this guide, your AI SDR will not only find potential leads but also verify and enrich their email addresses.
This guide is Part 2 of a six-part series for how to build your own AI SDR (Sales Development Representative) in Slack. This template includes the code from Part 1 and the subsequent guides are linked at the bottom of this post.
Getting Started #
To begin enhancing your AI sales assistant, fork this template by clicking "Use Template" below:
Deploy a Clay Table as an API
For this enhanced version of our AI for lead generation, if you want to use Clay for your enrichment flow, you'll need to follow this Guide to learn how to turn a Clay table into a hosted API.
Once you adapt the table to return email addresses from LinkedIn profiles, add this key to your Replit Secrets as INTERNAL_CLAY_API_KEY. This should be the Authorization key used in your Clay API.
Also, your deployment URL to your Replit Secrets as CLAY_LINKEDIN_TO_EMAIL_API_URL. This is the URL that you get after deploying your Clay API.
Breaking Down the AI Sales Assistant Code #
Let's explore the key components that make our AI SDR a more powerful tool for lead generation and qualification:
Email Enrichment Process
The enrich_leads function in utils.py is the core of our email enrichment process:
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import csv
import io
import os
import asyncio
import aiohttp
import json
import requests
# Common headers for Apollo API requests
def common_headers():
api_key = os.environ['APOLLO_API_KEY']
return {
'Cache-Control': 'no-cache',
'Content-Type': 'application/json',
'X-Api-Key': api_key
}
# Search for organizations using Apollo API
def search_orgs(**kwargs):
url = "https://api.apollo.io/api/v1/mixed_companies/search"
data = {k: v for k, v in kwargs.items() if v is not None}
print("\nFINAL ORGANIZATION QUERY", data)
response = requests.post(url, headers=common_headers(), json=data)
if response.status_code == 200:
return [org.get("id") for org in response.json().get('organizations', [])]
else:
print(f"Error searching organizations: {response.status_code}")
return []
# Search for people using Apollo API
def search_people(**kwargs):
url = "https://api.apollo.io/v1/mixed_people/search"
data = {k: v for k, v in kwargs.items() if v is not None and v != 'None' and v != []}
print("\nFINAL PEOPLE QUERY", data)
response = requests.post(url, headers=common_headers(), json=data)
if response.status_code == 200:
return response.json().get('people', [])
else:
print(f"Error searching people: {response.status_code}")
return []
# Clean and structure leads data
def clean_leads_data(leads_data):
clean_data_objects = []
for lead in leads_data:
clean_data_object = {
"first_name": lead.get('first_name'),
"last_name": lead.get('last_name'),
"title": lead.get('title'),
"email": lead.get('email'),
"linkedin_url": lead.get('linkedin_url'),
"company_name": lead.get('organization', {}).get('name'),
"company_website": lead.get('organization', {}).get('website_url'),
"company_industry": lead.get('organization', {}).get('industry'),
"company_size": lead.get('organization', {}).get('size'),
"city": lead.get('city'),
"state": lead.get('state'),
"country": lead.get('country'),
"seniority": lead.get('seniority'),
"departments": ', '.join(lead.get('departments', [])),
"functions": ', '.join(lead.get('functions', []))
}
clean_data_objects.append(clean_data_object)
return clean_data_objects
# Convert JSON data to CSV format
def json_to_csv(json_data):
if not json_data:
return ""
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=json_data[0].keys())
writer.writeheader()
for row in json_data:
writer.writerow(row)
return output.getvalue()
# Enrich a single Apollo lead with email address using external Clay API
async def enrich_lead(session, lead, clay_api_url):
clay_api_url = f"{clay_api_url}/clay/linkedin-to-email"
headers = {
'Authorization': os.environ['INTERNAL_CLAY_API_KEY'],
'Content-Type': 'application/json'
}
# Convert all values to strings to ensure JSON serializability
payload = {k: str(v) if v is not None else '' for k, v in lead.items()}
print(f"\nFinding email for: {json.dumps(payload, indent=2)}")
try:
async with session.post(clay_api_url, json=payload, headers=headers, allow_redirects=False) as response:
if response.status == 200:
result = await response.json()
print(f"Email enrichment response: {json.dumps(result, indent=2)}")
lead['email_found'] = result.get('email_found', 'false')
if lead['email_found'] == 'true':
lead['email'] = result.get('email')
else:
print(f"Error enriching lead: {response.status}")
print(f"Email enrichment error: {await response.text()}")
raise Exception(f"API error: {response.status}")
except Exception as e:
print(f"Exception while enriching lead: {str(e)}")
raise
return lead
# Enrich multiple leads with email information
async def enrich_leads(csv_content):
reader = csv.DictReader(io.StringIO(csv_content))
leads = list(reader)
clay_api_base_url = os.environ.get('CLAY_LINKEDIN_TO_EMAIL_API_URL')
if not clay_api_base_url:
print("Warning: CLAY_LINKEDIN_TO_EMAIL_API_URL environment variable is not set. Skipping enrichment.")
return json_to_csv(leads)
async with aiohttp.ClientSession() as session:
enriched_leads = []
tasks = []
for lead in leads:
task = asyncio.create_task(enrich_lead_with_retry(session, lead, clay_api_base_url))
tasks.append(task)
await asyncio.sleep(0.1) # Rate limiting: 10 requests per second
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
enriched_leads = [lead for lead in results if lead.get('email_found') == 'true']
return json_to_csv(enriched_leads)
# Retry mechanism for lead enrichment
async def enrich_lead_with_retry(session, lead, clay_api_url, max_retries=3):
for attempt in range(max_retries):
try:
enriched_lead = await enrich_lead(session, lead, clay_api_url)
if enriched_lead['email_found'] == 'true':
return enriched_lead
except Exception as e:
print(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(0.5) # Wait 0.5 seconds before retrying
return lead # Return original lead if all attempts fail
This function takes the initial leads data and enriches it with verified email addresses, a crucial step in qualifying leads for your sales team.
Handling Enrichment Requests
In main.py, we've added a new function to handle lead enrichment requests:
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import asyncio
import io
import os
import time
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from query_analyzer import analyze_query
from utils import (
clean_leads_data,
enrich_leads,
json_to_csv,
search_orgs,
search_people,
)
# Initialize Slack app
app = App(token=os.environ["SLACK_BOT_TOKEN"])
# Store CSV contents in memory for later use
app.csv_contents = {}
def run_leads_search(query, slack_ts):
try:
# Analyze the user's query to extract Apollo API parameters
analysis = analyze_query(query)
if not analysis:
raise ValueError("Failed to analyze the query")
# Perform organization search if needed
org_ids = []
if analysis['needs_org_search']:
org_ids = search_orgs(**analysis['organization_search'])
# Perform people search with organization IDs if available
people_search_params = analysis['people_search']
if org_ids:
people_search_params['org_ids'] = org_ids
leads_data = search_people(**people_search_params)
# Clean and format the leads data
cleaned_leads = clean_leads_data(leads_data)
# Generate CSV from cleaned leads data
csv_content = json_to_csv(cleaned_leads)
if csv_content:
return csv_content
else:
raise ValueError("Failed to generate CSV content")
except Exception as e:
print(f"Error in run_leads_search: {str(e)}")
return None
@app.event("message")
def handle_message_events(body, logger):
logger.info(body)
@app.event("app_mention")
def handle_mention(event, say):
user = event["user"]
ts = event["ts"]
channel = event["channel"]
ack = "On it! Processing..."
say(text=ack, channel=channel, thread_ts=ts)
try:
user_query = event['text']
csv_content = run_leads_search(user_query, ts)
if csv_content:
try:
# Upload the inital leads CSV file to Slack
result = app.client.files_upload_v2(
channel=channel,
content=csv_content,
filename=f"Leads-{ts}.csv",
title=f"Leads-{ts}.csv",
initial_comment=f"Hey <@{user}>, here are the leads you requested:",
thread_ts=ts
)
if not result["ok"]:
raise Exception(f"Error uploading file: {result['error']}")
# Store CSV content in memory for potential enrichment
app.csv_contents[ts] = csv_content
# Ensure file upload is complete before sending follow-up message
time.sleep(1)
# Ask user if they want to enrich the leads with email addresses
app.client.chat_postMessage(
channel=channel,
thread_ts=ts,
text="Would you like to proceed with an enrichment process to find the valid email addresses?",
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Would you like to proceed with an enrichment process to find the valid email addresses?"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Enrich Leads 🔎"},
"action_id": "enrich_leads",
"value": ts,
"style": "primary"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Cancel ❌"},
"action_id": "cancel_enrichment",
"value": ts,
}
]
}
]
)
except Exception as e:
error_message = f"Hey <@{user}>, I encountered an error while uploading the file: {str(e)}"
say(text=error_message, channel=channel, thread_ts=ts)
else:
error_message = f"Hey <@{user}>, I'm sorry, but I encountered an error while processing your request. Please try again later or contact support if the issue persists."
say(text=error_message, channel=channel, thread_ts=ts)
except Exception as e:
print(e)
error_message = f"Hey <@{user}>, I'm sorry, but I encountered an error while processing your request. Please try again later or contact support if the issue persists."
say(text=error_message, channel=channel, thread_ts=ts)
@app.action("enrich_leads")
def handle_enrich_leads(ack, body, say):
ack()
user = body["user"]["id"]
channel = body["channel"]["id"]
ts = body["message"]["ts"]
csv_ts = body["actions"][0]["value"]
say(text="Starting the lead enrichment process. This may take a while...", channel=channel, thread_ts=ts)
try:
# Retrieve stored CSV content
csv_content = app.csv_contents.get(csv_ts)
if not csv_content:
raise Exception("CSV content not found")
# Run the enrichment process
enriched_csv_content = asyncio.run(enrich_leads(csv_content))
if enriched_csv_content:
# Upload the enriched CSV file to Slack
result = app.client.files_upload_v2(
channel=channel,
content=enriched_csv_content,
filename=f"Enriched-Leads-{ts}.csv",
title=f"Enriched-Leads-{ts}.csv",
initial_comment=f"Hey <@{user}>, here are your enriched leads (only including leads with found emails):",
thread_ts=ts
)
if not result["ok"]:
raise Exception(f"Error uploading enriched file: {result['error']}")
else:
say(text=f"Hey <@{user}>, the enrichment process completed, but no leads with valid emails were found.", channel=channel, thread_ts=ts)
except Exception as e:
error_message = f"Hey <@{user}>, I encountered an error while enriching the leads: {str(e)}"
say(text=error_message, channel=channel, thread_ts=ts)
@app.action("cancel_enrichment")
def handle_cancel_enrichment(ack, body, say):
ack()
user = body["user"]["id"]
channel = body["channel"]["id"]
ts = body["message"]["ts"]
csv_ts = body["actions"][0]["value"]
# Clear the CSV from memory to free up resources
if csv_ts in app.csv_contents:
del app.csv_contents[csv_ts]
say(text="Got it! We'll conclude the session here. Let me know if you need anything else.", channel=channel, thread_ts=ts)
if __name__ == "__main__":
# Start the Socket Mode handler
handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
handler.start()
This function allows users to initiate the email enrichment process directly from Slack, making our AI sales assistant more interactive and user-friendly.
Improved Lead Generation Flow
We've updated the main lead generation function to include an option for email enrichment:
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@app.event("app_mention")
def handle_mention(event, say):
user = event["user"]
ts = event["ts"]
channel = event["channel"]
ack = "On it! Processing..."
say(text=ack, channel=channel, thread_ts=ts)
try:
user_query = event['text']
csv_content = run_leads_search(user_query, ts)
if csv_content:
try:
# Upload the initial leads CSV file to Slack
result = app.client.files_upload_v2(
channel=channel,
content=csv_content,
filename=f"Leads-{ts}.csv",
title=f"Leads-{ts}.csv",
initial_comment=f"Hey <@{user}>, here are the leads you requested:",
thread_ts=ts
)
if not result["ok"]:
raise Exception(f"Error uploading file: {result['error']}")
# Store CSV content in memory for potential enrichment
app.csv_contents[ts] = csv_content
# Ask user if they want to enrich the leads with email addresses
app.client.chat_postMessage(
channel=channel,
thread_ts=ts,
text="Would you like to proceed with an enrichment process to find the valid email addresses?",
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Would you like to proceed with an enrichment process to find the valid email addresses?"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Enrich Leads 🔎"},
"action_id": "enrich_leads",
"value": ts,
"style": "primary"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Cancel ❌"},
"action_id": "cancel_enrichment",
"value": ts,
}
]
}
]
)
except Exception as e:
error_message = f"Hey <@{user}>, I encountered an error while uploading the file: {str(e)}"
say(text=error_message, channel=channel, thread_ts=ts)
else:
error_message = f"Hey <@{user}>, I'm sorry, but I encountered an error while processing your request. Please try again later or contact support if the issue persists."
say(text=error_message, channel=channel, thread_ts=ts)
except Exception as e:
print(e)
error_message = f"Hey <@{user}>, I'm sorry, but I encountered an error while processing your request. Please try again later or contact support if the issue persists."
say(text=error_message, channel=channel, thread_ts=ts)
This updated flow allows the AI sales assistant to not only generate leads but also offer immediate email enrichment, enhancing the lead qualification process.
Deploying Your AI SDR #
In order to keep your AI SDR running 24/7 and receive requests whenever someone mentions it in Slack, you'll need to deploy it on a hosted server.
Open a new tab in the Workspace and search for “Deployments” or open the control console by typing ⌘ + K (or Ctrl + K) and type "deploy". You should find a screen like this.

For bots like this that need always need to be up listening to requests, we recommend using a Reserved VM. On the next screen, click Approve and configure build settings most internal bots work fine with the default machine settings but if you need more power later, you can always come back and change these settings later. You can monitor your usage and billing at any time at: replit.com/usage.
On the next screen, you’ll be able to set your primary domain and edit the Secrets that will be in your production deployment. Usually, we keep these settings as they are.
Finally, click Deploy and watch your bot go live!
What's Next for Your AI Sales Assistant #
In the next part of this series, we'll be adding the ability for the assistant to create and draft an email campaign for your generated lead list using OpenAI. The other parts of the series include:
- AI SDR - Part 1 - Lead List Creation
- AI SDR - Part 2 (this guide) - Lead Enrichment
- AI SDR - Part 3 - Draft Email Copy
- AI SDR - Part 4 - Create Smartlead campaign
- AI SDR - Part 5 - Add Leads to HubSpot
- AI SDR - Part 6 (Final) - Agent Mode
If you'd like to discuss how to enable your team to build and implement tools like these, feel free to schedule some time with the Replit team for a quick demo of our product.
Happy coding and selling!