Mautic api to create batch contact not working

Version - v3.2.4

I am trying to send data in batches from my database to mautic website to create contacts. Sometimes, the contact is being created on the website, but the other times, its not being created. Also, when I am trying to send the data in batches, only the one contact is being created. I was checking in the mautic logs, there is some error regarding the campaing-

[2023-11-22 14:25:02] mautic.ERROR: CAMPAIGN: Campaign Limit exceeded
[2023-11-22 14:30:02] mautic.ERROR: CAMPAIGN: Campaign Limit exceeded
[2023-11-22 14:41:02] mautic.ERROR: CAMPAIGN: Campaign Limit exceeded
[2023-11-22 14:52:22] mautic.ERROR: CAMPAIGN: Campaign Limit exceeded

could this error be the reason, the contacts are not being created ?

Hi there,

Could you please share what you’re using to create the contacts (eg the actual commands/code)? Without that we can’t really help you to troubleshoot.

@periodic_task(run_every=datetime.timedelta(minutes=settings.UPDATE_TO_MAUTIC_INTERVAL))
def update_data_to_mautic() → str:
current_time = datetime.datetime.now()
one_hour_before_current_time = current_time - datetime.timedelta(minutes=settings.UPDATE_TO_MAUTIC_INTERVAL)
user_data = MauticBigTable.objects.filter(modified_at__gt=one_hour_before_current_time)
if user_data:
create_payload, edit_payload = data_for_creation_and_edit(user_data)
if create_payload:
print(create_payload)
response = requests.post(
url=MAUTIC_BASE_URL + “/contacts/batch/new”,
headers=MAUTIC_HEADER,
data=json.dumps(create_payload)
)
if “contacts” in response:
mautic_ids = [contact[“id”] for contact in response[“contacts”]]
update_mautic_ids_in_mauticbigtable(create_payload, mautic_ids)
else:
return response.status_code()

    if edit_payload:
        batch_edit_contacts(edit_payload)

def data_for_creation_and_edit(user_data: Iterable[MauticBigTable]) → (List[Dict[str, None]], List[Dict[str, None]]):
create_payload =
edit_payload =
for data in user_data:
payload = prepare_payload_to_update_to_mautic(data)
if data.mautic_id:
payload[“id”] = data.mautic_id
edit_payload.append(payload)
else:
create_payload.append(payload)

print(create_payload)
return create_payload, edit_payload

def batch_edit_contacts(edit_payload: List[Dict[str, None]]) → dict:
response = requests.patch(
url=MAUTIC_BASE_URL + “/contacts/batch/edit”,
headers=MAUTIC_HEADER,
data=json.dumps(edit_payload)
)
return response.json()

def prepare_payload_to_update_to_mautic(instance: MauticBigTable) → Dict[str, None]:
payload = {
“email”: instance.email,
}
backend_fields_to_update_in_mautic = BackendFieldMapping.objects.all()
for field_to_update in backend_fields_to_update_in_mautic:
mautic_field = field_to_update.mautic_field_id
mautic_field_datatype = field_to_update.mautic_field_data_type
backend_field = field_to_update.backend_field
if mautic_field and mautic_field_datatype:
value_to_store = getattr(instance, backend_field)
if value_to_store:
if mautic_field_datatype == “multiselect”:
payload[mautic_field] = value_to_store
else:
if isinstance(value_to_store, list):
payload[mautic_field] = value_to_store
if mautic_field_datatype == “date”:
payload[mautic_field] = str(value_to_store)
if mautic_field_datatype == ‘datetime’:
payload[mautic_field] = str(value_to_store)
# payload[mautic_field] = value_to_store
print(payload)
return payload

def update_mautic_ids_in_mauticbigtable(create_payload: List[Dict[str, None]], mautic_ids: List[int]):
for data, mautic_id in zip(create_payload, mautic_ids):
email = data[“email”]
try:
contact = MauticBigTable.objects.get(email=email)
contact.mautic_id = mautic_id
contact.save()
except MauticBigTable.DoesNotExist:
pass

the above code is in the staging env, in my local it is working but its not working properly in the staging

can you please tell me what could be the issue?